Skip to content
Snippets Groups Projects
Commit fcc8bbea authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Merge branch 'SSA-6320-workflow-service' of...

Merge branch 'SSA-6320-workflow-service' of https://open-bitbucket.nrao.edu/scm/ssa/data into SSA-6320-workflow-service
parents 6f284ee9 344025e4
No related branches found
No related tags found
No related merge requests found
......@@ -17,19 +17,22 @@ class WorkflowService(WorkflowServiceIF):
# 1. look up the workflow info for this workflow name
info = self.db.lookup_workflow_definition(workflow_name)
# 2. render the templates to files
# 2. create and save this request to the database
record = WorkflowRequest(...)
# 3. render the templates to files
contents = info.render_templates(argument, files)
# 3. serialize the templated files
# 4. serialize the templated files
temp_folder = self._prepare_files_for_condor(contents)
# 4. execute condor and get the log file
# 5. execute condor and get the log file
log_file = self._execute_prepared(temp_folder)
# 5. start reading the logs
# 6. start reading the logs
return HTCondorWorkflowEventStream(log_file)
Of these, step 5 is going to have to be the most different, because
Of these, step 6 is going to have to be the most different, because
that's launching the new CLI wf_monitor program that Nathan wrote.
The other steps are more similar than different, though figuring out
where to put the temp files will be interesting.
......@@ -81,6 +84,45 @@ class WorkflowService(WorkflowServiceIF):
# return the logfile
return logfile
# The next few things that need to happen here are in response to
# WorkflowEvents that we receive from wf_monitor as the workflow
# execution evolves. So we have to set up listening at some point
# in this class
def __init__(self):
# 1. Start listening for events from the wf_monitor stream
# self.channel = channels.workflow_events.listen(self.on_workflow_event)
raise NotImplementedError
def on_workflow_event(self, event: WorkflowEvent):
# 1. log that we received this event, somehow
# 2. update the WorkflowRequest record with the state we got
# 3. do per-event-type stuff, such as level change events, database
# updates, and logging
if event.type == WorkflowEventType.SUBMITTED:
# this would be a good place to emit a level-changed event
# for the capability system, such as:
#
# channels.capability_events.send(...)
#
# however, for now we can pass
pass
elif event.type == WorkflowEventType.EXECUTING:
# another good place to emit a level-changed event
pass
elif event.type == WorkflowEventType.TERMINATED:
# another level-change event opportunity
# also, we must now locate the temporary directory and
# remove it
pass
# there is an event type of OTHER which we can basically
# ignore for now
def __exit__(self, exc_type, exc_val, exc_tb):
# this method lets us use "with WorkflowService as ws" type syntax
# we should close our AMQP channel gracefully here
# self.channel.close()
pass
class WorkflowInfo(WorkflowInfoIF):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment