Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
workspaces
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
ssa
workspaces
Commits
6621fa25
Commit
6621fa25
authored
3 years ago
by
Nathan Hertz
Committed by
Nathan Hertz
3 years ago
Browse files
Options
Downloads
Patches
Plain Diff
Finalize support for DAGs in `workflow_service`
parent
71737362
No related branches found
No related tags found
No related merge requests found
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
shared/workspaces/workspaces/workflow/services/workflow_service.py
+54
-9
54 additions, 9 deletions
...rkspaces/workspaces/workflow/services/workflow_service.py
with
54 additions
and
9 deletions
shared/workspaces/workspaces/workflow/services/workflow_service.py
+
54
−
9
View file @
6621fa25
...
...
@@ -19,6 +19,7 @@ from messaging.router import Router, on_message
from
pycapo
import
CapoConfig
from
requests
import
Response
from
workspaces.system.services.remote_processing_service
import
CapoInjector
from
workspaces.workflow.enum
import
ArchiveWorkflows
,
WorkflowRequestState
from
workspaces.workflow.message_architect
import
(
ArchiveMessageArchitect
,
...
...
@@ -27,7 +28,6 @@ from workspaces.workflow.message_architect import (
from
workspaces.workflow.schema
import
WorkflowRequest
,
WorkflowRequestFile
from
workspaces.workflow.schema_interfaces
import
WorkflowIF
,
WorkflowRequestIF
from
workspaces.workflow.services.interfaces
import
WorkflowInfoIF
,
WorkflowServiceIF
from
workspaces.system.services.remote_processing_service
import
CapoInjector
logger
=
logging
.
getLogger
(
__name__
)
...
...
@@ -447,21 +447,56 @@ class WorkflowService(WorkflowServiceIF):
logger
.
info
(
"
executing on folder %s
"
,
folder
)
# some file in here should end in .dag; that file is our dagman input
# dagman = list(folder.glob("*.dag"))[0]
# logger.info("dagman file %s exists.", dagman)
dag_file
=
list
(
folder
.
glob
(
"
*.dag
"
))[
0
]
if
dag_file
:
logger
.
info
(
"
dagman file %s exists.
"
,
dag_file
)
return
self
.
_submit_dag
(
folder
,
dag_file
)
else
:
job_file
=
list
(
folder
.
glob
(
"
*.condor
"
))[
0
]
logger
.
info
(
"
condor file %s exists.
"
,
job_file
)
return
self
.
_submit_job
(
folder
,
job_file
)
def
_submit_job
(
self
,
folder
:
Path
,
job_file
:
Path
)
->
Path
:
"""
Submit job file to HTCondor
:param folder: Folder to execute workflow in
:param job_file: Path to job submit file
:return: PAth to workflow log file
"""
# ensure the log file exists
logfile
=
self
.
_get_job_logfile_name
(
job_file
)
logger
.
info
(
"
log file %s exists.
"
,
logfile
)
logfile
.
touch
()
# submit
logger
.
info
(
"
submitting job to condor...
"
)
subprocess
.
run
(
[
"
condor_submit
"
,
str
(
job_file
)],
cwd
=
str
(
folder
.
absolute
()),
preexec_fn
=
self
.
_switch_to_submituser
,
)
condor
=
list
(
folder
.
glob
(
"
*.condor
"
))[
0
]
logger
.
info
(
"
condor file %s exists.
"
,
condor
)
# return the logfile
return
logfile
def
_submit_dag
(
self
,
folder
:
Path
,
dag_file
:
Path
)
->
Path
:
"""
Submit DAG file to HTCondor
:param folder: Folder to execute workflow in
:param dag_file: Path to DAG submit file
:return: Path to workflow log file
"""
# ensure the log file exists
logfile
=
self
.
_get_logfile_name
(
condor
)
logfile
=
self
.
_get_
dag_
logfile_name
(
dag_file
)
logger
.
info
(
"
log file %s exists.
"
,
logfile
)
logfile
.
touch
()
# submit
logger
.
info
(
"
submitting to condor...
"
)
logger
.
info
(
"
submitting
DAG
to condor...
"
)
subprocess
.
run
(
[
"
condor_submit
"
,
str
(
condor
)],
[
"
condor_submit
_dag
"
,
str
(
dag_file
)],
cwd
=
str
(
folder
.
absolute
()),
preexec_fn
=
self
.
_switch_to_submituser
,
)
...
...
@@ -483,7 +518,7 @@ class WorkflowService(WorkflowServiceIF):
os
.
setuid
(
submituser_uid
)
@staticmethod
def
_get_logfile_name
(
jobfile_name
:
Path
)
->
Path
:
def
_get_
job_
logfile_name
(
jobfile_name
:
Path
)
->
Path
:
"""
Read HTCondor job file and get the log file name, if it exists
...
...
@@ -497,6 +532,16 @@ class WorkflowService(WorkflowServiceIF):
return
Path
(
logfile_name
.
strip
())
return
Path
(
f
"
{
jobfile_name
.
stem
}
.log
"
)
@staticmethod
def
_get_dag_logfile_name
(
dag_file
:
Path
)
->
Path
:
"""
Return path to DAG log file, which will always be {dag-file-name}.dagman.log
:param dag_file: Path to workflow DAG file
:return: Path to workflow DAG log
"""
return
Path
(
str
(
dag_file
)
+
"
.dagman.log
"
)
@staticmethod
def
_get_forbidden_templates_list
(
workflow_name
:
str
):
return
[
...
...
This diff is collapsed.
Click to expand it.
Preview
0%
Loading
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Save comment
Cancel
Please
register
or
sign in
to comment