diff --git a/shared/workspaces/src/workspaces/services.py b/shared/workspaces/src/workspaces/services.py index 3f6d37f31567400776499c7141eef44970a4523b..53c249fc23b25f8665012280c51ba50e3d12f9e6 100644 --- a/shared/workspaces/src/workspaces/services.py +++ b/shared/workspaces/src/workspaces/services.py @@ -17,19 +17,22 @@ class WorkflowService(WorkflowServiceIF): # 1. look up the workflow info for this workflow name info = self.db.lookup_workflow_definition(workflow_name) - # 2. render the templates to files + # 2. create and save this request to the database + record = WorkflowRequest(...) + + # 3. render the templates to files contents = info.render_templates(argument, files) - # 3. serialize the templated files + # 4. serialize the templated files temp_folder = self._prepare_files_for_condor(contents) - # 4. execute condor and get the log file + # 5. execute condor and get the log file log_file = self._execute_prepared(temp_folder) - # 5. start reading the logs + # 6. start reading the logs return HTCondorWorkflowEventStream(log_file) - Of these, step 5 is going to have to be the most different, because + Of these, step 6 is going to have to be the most different, because that's launching the new CLI wf_monitor program that Nathan wrote. The other steps are more similar than different, though figuring out where to put the temp files will be interesting. @@ -81,6 +84,45 @@ class WorkflowService(WorkflowServiceIF): # return the logfile return logfile + # The next few things that need to happen here are in response to + # WorkflowEvents that we receive from wf_monitor as the workflow + # execution evolves. So we have to set up listening at some point + # in this class + def __init__(self): + # 1. Start listening for events from the wf_monitor stream + # self.channel = channels.workflow_events.listen(self.on_workflow_event) + raise NotImplementedError + + def on_workflow_event(self, event: WorkflowEvent): + # 1. log that we received this event, somehow + # 2. update the WorkflowRequest record with the state we got + # 3. do per-event-type stuff, such as level change events, database + # updates, and logging + if event.type == WorkflowEventType.SUBMITTED: + # this would be a good place to emit a level-changed event + # for the capability system, such as: + # + # channels.capability_events.send(...) + # + # however, for now we can pass + pass + elif event.type == WorkflowEventType.EXECUTING: + # another good place to emit a level-changed event + pass + elif event.type == WorkflowEventType.TERMINATED: + # another level-change event opportunity + # also, we must now locate the temporary directory and + # remove it + pass + # there is an event type of OTHER which we can basically + # ignore for now + + def __exit__(self, exc_type, exc_val, exc_tb): + # this method lets us use "with WorkflowService as ws" type syntax + # we should close our AMQP channel gracefully here + # self.channel.close() + pass + class WorkflowInfo(WorkflowInfoIF): """