Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
workspaces
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
ssa
workspaces
Commits
893ecce5
Commit
893ecce5
authored
3 years ago
by
Nathan Hertz
Committed by
Nathan Hertz
3 years ago
Browse files
Options
Downloads
Patches
Plain Diff
Formatting changes (changed line length to 120)
parent
d3d8539c
No related branches found
No related tags found
1 merge request
!504
WS-677: Multi-stage workflow research & development
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
shared/workspaces/workspaces/workflow/services/workflow_service.py
+28
-62
28 additions, 62 deletions
...rkspaces/workspaces/workflow/services/workflow_service.py
with
28 additions
and
62 deletions
shared/workspaces/workspaces/workflow/services/workflow_service.py
+
28
−
62
View file @
893ecce5
...
@@ -47,8 +47,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
...
@@ -47,8 +47,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
:return: the response as JSON
:return: the response as JSON
"""
"""
response
=
requests
.
post
(
response
=
requests
.
post
(
f
"
{
self
.
url
}
/workflows/
{
request
.
workflow_name
}
"
f
"
{
self
.
url
}
/workflows/
{
request
.
workflow_name
}
"
f
"
/requests/
{
request
.
workflow_request_id
}
/submit
"
f
"
/requests/
{
request
.
workflow_request_id
}
/submit
"
)
)
logger
.
info
(
logger
.
info
(
"
Got result %s with type %s and body %s
"
,
"
Got result %s with type %s and body %s
"
,
...
@@ -58,9 +57,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
...
@@ -58,9 +57,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
)
)
return
response
.
json
()
return
response
.
json
()
def
attach_file_to_request
(
def
attach_file_to_request
(
self
,
request
:
WorkflowRequestIF
,
filename
:
str
,
content
:
bytes
)
->
Response
:
self
,
request
:
WorkflowRequestIF
,
filename
:
str
,
content
:
bytes
)
->
Response
:
"""
"""
Add a file to this workflow request.
Add a file to this workflow request.
...
@@ -86,14 +83,10 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
...
@@ -86,14 +83,10 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
:return: dict containing file content
:return: dict containing file content
"""
"""
response
=
requests
.
get
(
response
=
requests
.
get
(
f
"
{
self
.
url
}
/workflows/
{
name
}
/requests/
{
request_id
}
/files/
{
filename
}
"
)
f
"
{
self
.
url
}
/workflows/
{
name
}
/requests/
{
request_id
}
/files/
{
filename
}
"
)
return
response
.
content
.
decode
()
return
response
.
content
.
decode
()
def
create_workflow_request
(
def
create_workflow_request
(
self
,
workflow
:
Union
[
str
,
WorkflowIF
],
argument
:
Dict
)
->
WorkflowRequestIF
:
self
,
workflow
:
Union
[
str
,
WorkflowIF
],
argument
:
Dict
)
->
WorkflowRequestIF
:
"""
"""
Create a workflow request using the supplied arguments.
Create a workflow request using the supplied arguments.
...
@@ -129,9 +122,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
...
@@ -129,9 +122,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
:param request: completed workflow request to ingest
:param request: completed workflow request to ingest
:return:
:return:
"""
"""
requests
.
post
(
requests
.
post
(
f
"
{
self
.
url
}
/workflows/
{
request
.
workflow_name
}
/requests/
{
request
.
workflow_request_id
}
/ingest
"
)
f
"
{
self
.
url
}
/workflows/
{
request
.
workflow_name
}
/requests/
{
request
.
workflow_request_id
}
/ingest
"
)
class
WorkflowService
(
WorkflowServiceIF
):
class
WorkflowService
(
WorkflowServiceIF
):
...
@@ -163,10 +154,7 @@ class WorkflowService(WorkflowServiceIF):
...
@@ -163,10 +154,7 @@ class WorkflowService(WorkflowServiceIF):
"""
"""
forbidden
=
self
.
_get_forbidden_templates_list
(
request
.
workflow_name
)
forbidden
=
self
.
_get_forbidden_templates_list
(
request
.
workflow_name
)
if
filename
not
in
forbidden
:
if
filename
not
in
forbidden
:
if
(
if
ArchiveWorkflows
.
is_archive_wf
(
request
.
workflow_name
)
and
filename
==
"
metadata.json
"
:
ArchiveWorkflows
.
is_archive_wf
(
request
.
workflow_name
)
and
filename
==
"
metadata.json
"
):
content_dict
=
json
.
loads
(
content
.
decode
())
content_dict
=
json
.
loads
(
content
.
decode
())
content_dict
[
"
workflowName
"
]
=
request
.
workflow_name
content_dict
[
"
workflowName
"
]
=
request
.
workflow_name
content_dict
[
"
data_location
"
]
=
request
.
argument
[
"
data_location
"
]
content_dict
[
"
data_location
"
]
=
request
.
argument
[
"
data_location
"
]
...
@@ -199,9 +187,7 @@ class WorkflowService(WorkflowServiceIF):
...
@@ -199,9 +187,7 @@ class WorkflowService(WorkflowServiceIF):
:param workflow_request_id: ID of carta workflow that wants to send this message
:param workflow_request_id: ID of carta workflow that wants to send this message
:param carta_url: JSON blob with CARTA URL
:param carta_url: JSON blob with CARTA URL
"""
"""
logger
.
info
(
logger
.
info
(
f
"
SENDING CARTA MESSAGE to AAT Request Handler for request #
{
workflow_request_id
}
!
"
)
f
"
SENDING CARTA MESSAGE to AAT Request Handler for request #
{
workflow_request_id
}
!
"
)
wf_request
=
self
.
info
.
lookup_workflow_request
(
workflow_request_id
)
wf_request
=
self
.
info
.
lookup_workflow_request
(
workflow_request_id
)
routing_key
=
f
"
ws-workflow.carta-instance-ready.
{
workflow_request_id
}
"
routing_key
=
f
"
ws-workflow.carta-instance-ready.
{
workflow_request_id
}
"
carta_url_msg
=
ArchiveMessageArchitect
(
carta_url_msg
=
ArchiveMessageArchitect
(
...
@@ -242,11 +228,7 @@ class WorkflowService(WorkflowServiceIF):
...
@@ -242,11 +228,7 @@ class WorkflowService(WorkflowServiceIF):
# create a temporary directory if processing directory is not supplied,
# create a temporary directory if processing directory is not supplied,
# needs to exist before template rendering
# needs to exist before template rendering
temp_folder
=
(
temp_folder
=
self
.
_make_temp_directory
(
request
)
if
not
request
.
results_dir
else
Path
(
request
.
results_dir
)
self
.
_make_temp_directory
(
request
)
if
not
request
.
results_dir
else
Path
(
request
.
results_dir
)
)
# if remote is true, create a capo subspace file in the request's directory
# if remote is true, create a capo subspace file in the request's directory
if
remote
:
if
remote
:
...
@@ -326,9 +308,7 @@ class WorkflowService(WorkflowServiceIF):
...
@@ -326,9 +308,7 @@ class WorkflowService(WorkflowServiceIF):
)
)
request
.
argument
[
"
ramInGb
"
]
=
self
.
processing_settings
.
ramInGb
request
.
argument
[
"
ramInGb
"
]
=
self
.
processing_settings
.
ramInGb
def
_render_with_metadata
(
def
_render_with_metadata
(
self
,
wf_request
:
WorkflowRequestIF
,
tempdir
:
Path
,
wf_definition
:
WorkflowIF
):
self
,
wf_request
:
WorkflowRequestIF
,
tempdir
:
Path
,
wf_definition
:
WorkflowIF
):
name
=
wf_request
.
workflow_name
name
=
wf_request
.
workflow_name
if
"
calibration
"
in
name
:
if
"
calibration
"
in
name
:
wrest_type
=
"
-sc
"
wrest_type
=
"
-sc
"
...
@@ -348,9 +328,7 @@ class WorkflowService(WorkflowServiceIF):
...
@@ -348,9 +328,7 @@ class WorkflowService(WorkflowServiceIF):
argument2
=
[]
argument2
=
[]
elif
"
ingest
"
in
name
:
elif
"
ingest
"
in
name
:
wrest_type
=
"
-aux
"
wrest_type
=
"
-aux
"
parent_req
=
self
.
info
.
lookup_workflow_request
(
parent_req
=
self
.
info
.
lookup_workflow_request
(
int
(
wf_request
.
argument
[
"
parent_wf_request_id
"
]))
int
(
wf_request
.
argument
[
"
parent_wf_request_id
"
])
)
eb
=
(
eb
=
(
parent_req
.
argument
[
"
product_locator
"
]
parent_req
.
argument
[
"
product_locator
"
]
if
"
product_locator
"
in
parent_req
.
argument
if
"
product_locator
"
in
parent_req
.
argument
...
@@ -381,15 +359,11 @@ class WorkflowService(WorkflowServiceIF):
...
@@ -381,15 +359,11 @@ class WorkflowService(WorkflowServiceIF):
else
:
else
:
logger
.
error
(
wf_json
.
decode
())
logger
.
error
(
wf_json
.
decode
())
logger
.
info
(
"
SENDING WORKFLOW FAIL MESSAGE!
"
)
logger
.
info
(
"
SENDING WORKFLOW FAIL MESSAGE!
"
)
failed_msg
=
WorkflowMessageArchitect
(
request
=
wf_request
).
compose_message
(
failed_msg
=
WorkflowMessageArchitect
(
request
=
wf_request
).
compose_message
(
"
workflow_failed
"
)
"
workflow_failed
"
)
self
.
messenger
.
send_message
(
**
failed_msg
)
self
.
messenger
.
send_message
(
**
failed_msg
)
return
wf_request
return
wf_request
def
_determine_usable_files
(
def
_determine_usable_files
(
self
,
request
:
WorkflowRequestIF
,
templated_files
:
List
[
WorkflowRequestFile
]):
self
,
request
:
WorkflowRequestIF
,
templated_files
:
List
[
WorkflowRequestFile
]
):
# Override templates if user supplied file has same name and is a valid input file
# Override templates if user supplied file has same name and is a valid input file
usable_templates
=
[]
usable_templates
=
[]
usable_files
=
[]
usable_files
=
[]
...
@@ -614,9 +588,9 @@ class WorkflowMessageHandler:
...
@@ -614,9 +588,9 @@ class WorkflowMessageHandler:
logger
.
info
(
"
SENDING INGESTION COMPLETE MESSAGE!
"
)
logger
.
info
(
"
SENDING INGESTION COMPLETE MESSAGE!
"
)
subject
[
"
execution_wf_id
"
]
=
wf_request
.
argument
[
"
parent_wf_request_id
"
]
subject
[
"
execution_wf_id
"
]
=
wf_request
.
argument
[
"
parent_wf_request_id
"
]
ingestion_complete_msg
=
WorkflowMessageArchitect
(
ingestion_complete_msg
=
WorkflowMessageArchitect
(
previous_info
=
subject
).
compose_message
(
previous_info
=
subject
"
ingestion_complete
"
)
.
compose_message
(
"
ingestion_complete
"
)
)
self
.
messenger
.
send_message
(
**
ingestion_complete_msg
)
self
.
messenger
.
send_message
(
**
ingestion_complete_msg
)
@on_message
(
service
=
"
workflow
"
)
@on_message
(
service
=
"
workflow
"
)
...
@@ -639,9 +613,7 @@ class WorkflowMessageHandler:
...
@@ -639,9 +613,7 @@ class WorkflowMessageHandler:
if
htcondor_job_id
:
=
int
(
message
[
"
condor_metadata
"
][
"
condor_job_id
"
]):
if
htcondor_job_id
:
=
int
(
message
[
"
condor_metadata
"
][
"
condor_job_id
"
]):
# Workflow has corresponding condor job ID
# Workflow has corresponding condor job ID
logger
.
info
(
logger
.
info
(
f
"
Workflow request has an HTCondor job ID of
{
htcondor_job_id
}
. Setting DB column!
"
)
f
"
Workflow request has an HTCondor job ID of
{
htcondor_job_id
}
. Setting DB column!
"
)
request
.
htcondor_job_id
=
htcondor_job_id
request
.
htcondor_job_id
=
htcondor_job_id
elif
message
[
"
type
"
]
==
"
workflow-complete
"
:
elif
message
[
"
type
"
]
==
"
workflow-complete
"
:
status
=
WorkflowRequestState
.
Complete
.
name
status
=
WorkflowRequestState
.
Complete
.
name
...
@@ -676,8 +648,7 @@ class WorkflowMessageHandler:
...
@@ -676,8 +648,7 @@ class WorkflowMessageHandler:
except
Exception
as
exc
:
except
Exception
as
exc
:
transaction
.
abort
()
transaction
.
abort
()
logger
.
error
(
logger
.
error
(
f
"
Failed to update status on workflow request
"
f
"
Failed to update status on workflow request
"
f
"
{
request
.
workflow_request_id
}
to
{
status
}
:
{
exc
}
"
f
"
{
request
.
workflow_request_id
}
to
{
status
}
:
{
exc
}
"
)
)
else
:
else
:
logger
.
warning
(
f
"
Message
{
message
}
does not concern a workflow request. Ignoring.
"
)
logger
.
warning
(
f
"
Message
{
message
}
does not concern a workflow request. Ignoring.
"
)
...
@@ -687,24 +658,19 @@ class WorkflowMessageHandler:
...
@@ -687,24 +658,19 @@ class WorkflowMessageHandler:
wf_id
=
subject
[
"
workflow_request_id
"
]
wf_id
=
subject
[
"
workflow_request_id
"
]
wf_request
=
self
.
info
.
lookup_workflow_request
(
wf_id
)
wf_request
=
self
.
info
.
lookup_workflow_request
(
wf_id
)
if
(
if
wf_request
.
workflow_name
==
ArchiveWorkflows
.
CARTA
.
value
and
wf_request
.
argument
[
"
notify_ready
"
]
is
False
:
wf_request
.
workflow_name
==
ArchiveWorkflows
.
CARTA
.
value
logger
.
info
(
f
"
SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #
{
wf_id
}
!
"
)
and
wf_request
.
argument
[
"
notify_ready
"
]
is
False
):
logger
.
info
(
f
"
SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #
{
wf_id
}
!
"
)
routing_key
=
f
"
ws-workflow.carta-instance-ready.
{
wf_id
}
"
routing_key
=
f
"
ws-workflow.carta-instance-ready.
{
wf_id
}
"
carta_url_msg
=
ArchiveMessageArchitect
(
carta_url_msg
=
ArchiveMessageArchitect
(
routing_key
=
routing_key
,
request
=
wf_request
).
compose_message
(
routing_key
=
routing_key
,
request
=
wf_request
"
carta_failed
"
)
.
compose_message
(
"
carta_failed
"
)
)
self
.
archive_messenger
.
send_message
(
**
carta_url_msg
)
self
.
archive_messenger
.
send_message
(
**
carta_url_msg
)
if
wf_request
.
workflow_name
==
ArchiveWorkflows
.
SECI
.
value
:
if
wf_request
.
workflow_name
==
ArchiveWorkflows
.
SECI
.
value
:
logger
.
info
(
f
"
SENDING FAILED SECI MESSAGE to VLASS Manager for request #
{
wf_id
}
!
"
)
logger
.
info
(
f
"
SENDING FAILED SECI MESSAGE to VLASS Manager for request #
{
wf_id
}
!
"
)
routing_key
=
f
"
ws-workflow.seci.
{
wf_id
}
"
routing_key
=
f
"
ws-workflow.seci.
{
wf_id
}
"
seci_msg
=
ArchiveMessageArchitect
(
seci_msg
=
ArchiveMessageArchitect
(
routing_key
=
routing_key
,
request
=
wf_request
).
compose_message
(
routing_key
=
routing_key
,
request
=
wf_request
"
seci_failed
"
)
.
compose_message
(
"
seci_failed
"
)
)
self
.
archive_messenger
.
send_message
(
**
seci_msg
)
self
.
archive_messenger
.
send_message
(
**
seci_msg
)
def
send_archive_complete_event
(
self
,
**
message
:
Dict
):
def
send_archive_complete_event
(
self
,
**
message
:
Dict
):
...
@@ -715,9 +681,9 @@ class WorkflowMessageHandler:
...
@@ -715,9 +681,9 @@ class WorkflowMessageHandler:
if
wf_request
.
workflow_name
==
ArchiveWorkflows
.
SECI
.
value
:
if
wf_request
.
workflow_name
==
ArchiveWorkflows
.
SECI
.
value
:
logger
.
info
(
f
"
SENDING SECI COMPLETE MESSAGE to VLASS Manager for request #
{
wf_id
}
!
"
)
logger
.
info
(
f
"
SENDING SECI COMPLETE MESSAGE to VLASS Manager for request #
{
wf_id
}
!
"
)
routing_key
=
f
"
ws-workflow.seci.
{
wf_id
}
"
routing_key
=
f
"
ws-workflow.seci.
{
wf_id
}
"
seci_msg
=
ArchiveMessageArchitect
(
seci_msg
=
ArchiveMessageArchitect
(
routing_key
=
routing_key
,
request
=
wf_request
).
compose_message
(
routing_key
=
routing_key
,
request
=
wf_request
"
seci_complete
"
)
.
compose_message
(
"
seci_complete
"
)
)
self
.
archive_messenger
.
send_message
(
**
seci_msg
)
self
.
archive_messenger
.
send_message
(
**
seci_msg
)
def
clean_remote_workflow
(
self
,
request
:
WorkflowRequestIF
):
def
clean_remote_workflow
(
self
,
request
:
WorkflowRequestIF
):
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment