diff --git a/services/workflow/src/workflow/server.py b/services/workflow/src/workflow/server.py index ef17dffcbd6e5ca4d5a2f2e3e47fb0e5e03caf39..260ed488bfe5afb8c2c8aef08f8098b892bdaaa1 100644 --- a/services/workflow/src/workflow/server.py +++ b/services/workflow/src/workflow/server.py @@ -1,6 +1,7 @@ import zope.sqlalchemy from pyramid.config import Configurator from pyramid.renderers import JSONP +from pyramid.request import Request from pyramid.view import view_config, view_defaults from pyramid_beaker import session_factory_from_settings from workspaces.services import WorkflowInfo, WorkflowService, get_session_factory, get_engine @@ -34,7 +35,7 @@ class WorkflowRestService: TODO: rename this to requests and add new service about workflow definitions with this name """ - def __init__(self, request): + def __init__(self, request: Request): self.request = request @view_config(request_method='GET') @@ -47,8 +48,8 @@ class WorkflowRestService: """ return self.request.info.all_workflows() - @view_config(request_method='POST') - def create_workflow(self): + @view_config(request_method='POST', route_name='create_workflow') + def create_workflow(self, name: str, args: str): """ Create a new workflow request from the name/arguments supplied. @@ -58,7 +59,9 @@ class WorkflowRestService: # all we should have to do here is take the WorkflowRequest from the context and # hand it to WorkflowInfo to save it, but we're still conflating # workflows and workflow requests right now - raise NotImplementedError + req = self.request.context + request = self.request.info.create_workflow_request(workflow_name=name, argument=args) + return request @view_config(request_method='GET', route_name='workflow') def get_workflow(self): @@ -79,6 +82,7 @@ class WorkflowRestService: :return: """ print(f"Submitting workflow {self.request.context.workflow_name}") + return self.request.workflows.execute(self.request.context) @view_defaults(route_name='workflow_files', renderer='json') @@ -108,7 +112,7 @@ class WorkflowFilesRestService: """ return self.request.context['files'] - @view_config(request_method='GET', route_name='workflow_file', accept='text/plain', renderer='string') + @view_config(request_method='GET', route_name='workflow_file', accept='json', renderer='string') def get_file_text(self): """ Get the text contents of this file @@ -184,6 +188,7 @@ def main(global_config, **settings): config.add_route('workflows', '/workflows') config.add_route('workflow', '/workflows/{name}', factory=lookup_workflow) + config.add_route('create_workflow', '/workflows/create', factory=lookup_workflow) config.add_route('submit_workflow', '/workflows/{name}/submit', factory=lookup_workflow) config.add_route('workflow_files', '/workflows/{name}/files', factory=lookup_workflow) config.add_route('workflow_file', '/workflows/{name}/files/{filename}', factory=lookup_file) diff --git a/shared/workspaces/src/workspaces/schema.py b/shared/workspaces/src/workspaces/schema.py index 0fc2f00a69b28622f2a234a3917c662a99550f6c..0d3df573dacf527714a056d2ca601faa367953ec 100644 --- a/shared/workspaces/src/workspaces/schema.py +++ b/shared/workspaces/src/workspaces/schema.py @@ -38,7 +38,7 @@ class AbstractFile: (directory / self.filename).write_bytes(self.content) def __json__(self, request): - return {"filename": self.filename, "content": self.content} + return {"filename": self.filename, "content": self.content.decode("utf8")} @classmethod def from_path(cls, path: Path) -> "AbstractFile": @@ -50,14 +50,14 @@ class AbstractRequest: Abstract File is an abstract concept that is used to create requests """ - workflow_name: str + name: str argument: json - def __init__(self, workflow_name: str, argument: json): - self.workflow_name, self.argument = workflow_name, argument + def __init__(self, name: str, argument: json): + self.name, self.argument = name, argument def __json__(self, request): - return {"workflow_name": self.workflow_name, "argument": self.argument} + return {"workflow_name": self.name, "argument": self.argument} class WorkflowEventType(enum.Enum): @@ -299,8 +299,6 @@ class CapabilityExecution(Base, CapabilityExecutionIF): __tablename__ = "capability_executions" - __tablename__ = "capability_executions" - id = sa.Column("execution_id", sa.Integer, primary_key=True) state = sa.Column("state", sa.String) capability_request_id = sa.Column( @@ -354,8 +352,23 @@ class Workflow(Base): :param files: the files to be processed :return: a list of rendered templates """ + rendered_files = [] + + for f in files: + rendered_files.append(AbstractFile.from_path(f)) + + if self.workflow_name == 'null': + for template in self.templates: + filename = template.filename + contents = template.contents + if filename == 'null.condor': + contents = contents.decode("utf8") + contents.replace("{{arguments}}", argument) + file = AbstractFile(filename, contents) + rendered_files.append(file) + + return rendered_files - raise NotImplementedError class WorkflowTemplate(Base): diff --git a/shared/workspaces/src/workspaces/services.py b/shared/workspaces/src/workspaces/services.py index 9fdab0961a133588b965d7122a98d5876f8d4fc3..dfb53c99e5e2b9939ce8ce33614493b22db54f33 100644 --- a/shared/workspaces/src/workspaces/services.py +++ b/shared/workspaces/src/workspaces/services.py @@ -299,13 +299,14 @@ class WorkflowService(WorkflowServiceIF): """ # 1. look up workflow, returns workflow - info = WorkflowInfo(self).lookup_workflow_definition(workflow_name) + info = WorkflowInfo(self) + definition = info.lookup_workflow_definition(workflow_name) # 2. create and save request, return request id - record = WorkflowRequest().create_request(workflow_name, argument) + record = info.create_workflow_request(workflow_name, argument) # 3. render templates to files, returns list of rendered files - contents = info.render_templates(argument, files) + contents = definition.render_templates(argument, files) # 4. prepare files for condor execution temp_folder = self._prepare_files_for_condor(contents) @@ -331,6 +332,7 @@ class WorkflowService(WorkflowServiceIF): :return: a Path """ # 1. create a temporary directory + # TODO: where to put temp directory? temp_folder = Path(mkdtemp(dir=str(Path.home() / "tmp"))) # 2. spool each of the temp files to it @@ -373,8 +375,8 @@ class WorkflowService(WorkflowServiceIF): # in this class def __init__(self): # 1. Start listening for events from the wf_monitor stream - # self.channel = channels.workflow_events.listen(self.on_workflow_event) - raise NotImplementedError + # self.channel = workflow_events.listen(self.on_workflow_event) + pass def on_workflow_event( self, event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path @@ -434,8 +436,7 @@ class WorkflowService(WorkflowServiceIF): def __exit__(self, exc_type, exc_val, exc_tb): # this method lets us use "with WorkflowService as ws" type syntax # we should close our AMQP channel gracefully here - # self.channel.close() - pass + self.channel.close() class WorkflowInfo(WorkflowInfoIF): @@ -451,3 +452,30 @@ class WorkflowInfo(WorkflowInfoIF): def all_workflows(self) -> List[Workflow]: return self.session.query(Workflow).all() + + def create_workflow_request( + self, + workflow_name: str, + argument: Dict) -> WorkflowRequest: + """ + Create new workflow request and save to database + :param workflow_name: name of workflow to run + :param argument: workflow arguments + :return: new WorkflowRequest + """ + workflow = self.lookup_workflow_definition(workflow_name) + request = WorkflowRequest(workflow_name=workflow_name, argument=argument) + request.id = self.save_request(request) + return request + + def save_request( + self, request: WorkflowRequest + ) -> int: + """ + Save a given entity and return an integer identifier for it + :param entity: the entity to save + :return: the entity's identifier + """ + self.session.add(request) + self.session.flush() + return request.workflow_request_id