Skip to content
Snippets Groups Projects
Commit 344025e4 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Add more commentary to WorkflowService about what needs to be implemented

parent 19a660d4
No related branches found
No related tags found
No related merge requests found
...@@ -17,19 +17,22 @@ class WorkflowService(WorkflowServiceIF): ...@@ -17,19 +17,22 @@ class WorkflowService(WorkflowServiceIF):
# 1. look up the workflow info for this workflow name # 1. look up the workflow info for this workflow name
info = self.db.lookup_workflow_definition(workflow_name) info = self.db.lookup_workflow_definition(workflow_name)
# 2. render the templates to files # 2. create and save this request to the database
record = WorkflowRequest(...)
# 3. render the templates to files
contents = info.render_templates(argument, files) contents = info.render_templates(argument, files)
# 3. serialize the templated files # 4. serialize the templated files
temp_folder = self._prepare_files_for_condor(contents) temp_folder = self._prepare_files_for_condor(contents)
# 4. execute condor and get the log file # 5. execute condor and get the log file
log_file = self._execute_prepared(temp_folder) log_file = self._execute_prepared(temp_folder)
# 5. start reading the logs # 6. start reading the logs
return HTCondorWorkflowEventStream(log_file) return HTCondorWorkflowEventStream(log_file)
Of these, step 5 is going to have to be the most different, because Of these, step 6 is going to have to be the most different, because
that's launching the new CLI wf_monitor program that Nathan wrote. that's launching the new CLI wf_monitor program that Nathan wrote.
The other steps are more similar than different, though figuring out The other steps are more similar than different, though figuring out
where to put the temp files will be interesting. where to put the temp files will be interesting.
...@@ -81,6 +84,45 @@ class WorkflowService(WorkflowServiceIF): ...@@ -81,6 +84,45 @@ class WorkflowService(WorkflowServiceIF):
# return the logfile # return the logfile
return logfile return logfile
# The next few things that need to happen here are in response to
# WorkflowEvents that we receive from wf_monitor as the workflow
# execution evolves. So we have to set up listening at some point
# in this class
def __init__(self):
# 1. Start listening for events from the wf_monitor stream
# self.channel = channels.workflow_events.listen(self.on_workflow_event)
raise NotImplementedError
def on_workflow_event(self, event: WorkflowEvent):
# 1. log that we received this event, somehow
# 2. update the WorkflowRequest record with the state we got
# 3. do per-event-type stuff, such as level change events, database
# updates, and logging
if event.type == WorkflowEventType.SUBMITTED:
# this would be a good place to emit a level-changed event
# for the capability system, such as:
#
# channels.capability_events.send(...)
#
# however, for now we can pass
pass
elif event.type == WorkflowEventType.EXECUTING:
# another good place to emit a level-changed event
pass
elif event.type == WorkflowEventType.TERMINATED:
# another level-change event opportunity
# also, we must now locate the temporary directory and
# remove it
pass
# there is an event type of OTHER which we can basically
# ignore for now
def __exit__(self, exc_type, exc_val, exc_tb):
# this method lets us use "with WorkflowService as ws" type syntax
# we should close our AMQP channel gracefully here
# self.channel.close()
pass
class WorkflowInfo(WorkflowInfoIF): class WorkflowInfo(WorkflowInfoIF):
""" """
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment