diff --git a/services/workflow/src/workflow/server.py b/services/workflow/src/workflow/server.py index a3a066f0d26353e51f0168e441112ba32957e65c..4f5f82903f1ae1e3a28c24025b2e37d299e2c95a 100644 --- a/services/workflow/src/workflow/server.py +++ b/services/workflow/src/workflow/server.py @@ -24,12 +24,12 @@ def lookup_workflow(request): return request.info.lookup_workflow_definition(request.matchdict['name']) -# def lookup_request(request): -# return request.info.lookup_workflow_request(request.matchdict['requests']) +def lookup_request(request): + return request.info.lookup_workflow_request(request.matchdict['request_id']) def lookup_file(request): - return lookup_workflow(request)['files'][request.matchdict['filename']] + return next(file for file in lookup_request(request).files if file.filename == request.matchdict['filename']) @view_defaults(route_name='workflows', renderer='json') @@ -52,8 +52,31 @@ class WorkflowRestService: """ return self.request.info.all_workflows() - @view_config(request_method='POST', route_name='create_workflow') - def create_workflow(self): + @view_config(request_method='GET', route_name='workflow') + def get_workflow(self): + """ + Look up a workflow request + + Audience: front-end + :return: + """ + return self.request.context + + +@view_defaults(route_name='workflow_request', renderer='json') +class WorkflowRequestRestService: + """ + Services for the user-submitted files attached to workflows. + """ + def __init__(self, request): + self.request = request + + @view_config(request_method='GET') + def get_workflow_request(self): + return self.request.context + + @view_config(request_method='POST', route_name='create_workflow_request') + def create_workflow_request(self): """ Create a new workflow request from the name/arguments supplied. @@ -63,21 +86,10 @@ class WorkflowRestService: # all we should have to do here is take the WorkflowRequest from the context and # hand it to WorkflowInfo to save it, but we're still conflating # workflows and workflow requests right now - request = self.request.info.create_workflow_request( - workflow_name=self.request.context.workflow_name, argument=self.request.GET.getall('args')) + request = self.request.info.create_workflow_request(self.request.context, self.request.GET.getall('args')) return request - @view_config(request_method='GET', route_name='workflow') - def get_workflow(self): - """ - Look up a workflow request - - Audience: front-end - :return: - """ - return self.request.context - - @view_config(request_method='POST', route_name='submit_workflow') + @view_config(request_method='POST', route_name='submit_workflow_request') def submit_workflow(self): """ Submit this workflow request for processing. @@ -89,7 +101,7 @@ class WorkflowRestService: return self.request.workflows.execute(self.request.context) -@view_defaults(route_name='workflow_files', renderer='json') +@view_defaults(route_name='workflow_request_files', renderer='json') class WorkflowFilesRestService: """ Services for the user-submitted files attached to workflows. @@ -97,7 +109,7 @@ class WorkflowFilesRestService: def __init__(self, request): self.request = request - @view_config(request_method='POST', route_name='add_file') + @view_config(request_method='PUT', route_name='workflow_request_file') def add_file(self): """ Add a file to this workflow request. @@ -105,9 +117,9 @@ class WorkflowFilesRestService: Audience: front-end and CLI """ print('Adding a file') - file = self.request.info.save_file(request_id=self.request.GET.get("request"), - filename=self.request.GET.get("filename"), - content=self.request.GET.get("content")) + file = self.request.info.save_file(request=self.request.context, + filename=self.request.matchdict['filename'], + content=self.request.body) return file @view_config(request_method='GET') @@ -120,7 +132,7 @@ class WorkflowFilesRestService: """ return self.request.context['files'] - @view_config(request_method='GET', route_name='workflow_file', accept='json', renderer='string') + @view_config(request_method='GET', accept='text/plain', route_name='workflow_request_file') def get_file_text(self): """ Get the text contents of this file @@ -128,9 +140,10 @@ class WorkflowFilesRestService: Audience: ??? :return: """ - return self.request.context['content'] + self.request.response.body = lookup_file(self.request).content + return self.request.response - @view_config(request_method='GET', route_name='workflow_file', accept='application/json') + @view_config(request_method='GET', accept='application/json', route_name='workflow_request_file') def get_file_json(self): """ Get a JSON representation of this file @@ -194,13 +207,19 @@ def main(global_config, **settings): # make workflow_service available for use in Pyramid config.add_request_method(lambda r: WorkflowService(r.info), 'workflows', reify=True) + # GET /workflows <- list of workflows + # GET /workflows/null <- info about the null workflow + # POST /workflows/null/requests/create <- create a request for the null workflow + # PUT /workflows/requests/23/files/foo.txt <- attach foo.txt to request #23 on workflow null + # POST /workflows/requests/23/submit <- launch request #23 + config.add_route('workflows', '/workflows') config.add_route('workflow', '/workflows/{name}', factory=lookup_workflow) - config.add_route('create_workflow', '/workflows/{name}/create', factory=lookup_workflow) - config.add_route('submit_workflow', '/workflows/{name}/submit', factory=lookup_workflow) - config.add_route('add_file', '/workflows/file') - config.add_route('workflow_files', '/workflows/{name}/files', factory=lookup_workflow) - config.add_route('workflow_file', '/workflows/{name}/files/{filename}', factory=lookup_file) + config.add_route('create_workflow_request', '/workflows/{name}/requests/create', factory=lookup_workflow) + config.add_route('workflow_request', '/workflows/requests/{request_id}', factory=lookup_request) + config.add_route('workflow_request_files', '/workflows/requests/{request_id}/files', factory=lookup_request) + config.add_route('workflow_request_file', '/workflows/requests/{request_id}/files/{filename}', factory=lookup_request) + config.add_route('submit_workflow_request', '/workflows/requests/{request_id}/submit', factory=lookup_request) config.include('pyramid_beaker') config.scan('.') diff --git a/shared/workspaces/src/workspaces/schema.py b/shared/workspaces/src/workspaces/schema.py index 5ccca9afdf4ff6803e8198374c0890ddcc1be91e..e18f0a678c9b38ce461a10f67f3631d893506fee 100644 --- a/shared/workspaces/src/workspaces/schema.py +++ b/shared/workspaces/src/workspaces/schema.py @@ -12,6 +12,7 @@ from pycapo import CapoConfig from sqlalchemy import create_engine from sqlalchemy.orm import relationship, sessionmaker from sqlalchemy.ext.declarative import declarative_base +from workspaces.workflow_interfaces import WorkflowIF, WorkflowRequestIF, WorkflowRequestFileIF from .capability_interfaces import ( CapabilityIF, @@ -333,7 +334,7 @@ class CapabilityExecution(Base, CapabilityExecutionIF): return self.version.capability -class Workflow(Base): +class Workflow(Base, WorkflowIF): """ A Workflow is a suite of tasks defined by templates that must be executed together with some user-submitted files. @@ -365,7 +366,7 @@ class Workflow(Base): if self.workflow_name == 'null': for template in self.templates: filename = template.filename - contents = template.contents + contents = template.content if filename == 'null.condor': contents = contents.decode("utf8") contents.replace("{{arguments}}", argument) @@ -375,7 +376,6 @@ class Workflow(Base): return rendered_files - class WorkflowTemplate(Base): """ Workflow Templates are Mustache-formatted files associated with a given Workflow @@ -410,7 +410,7 @@ class WorkflowTemplate(Base): return f"<WorkflowTemplate filename={self.filename} for workflow={self.workflow_name}>" -class WorkflowRequest(Base): +class WorkflowRequest(Base, WorkflowRequestIF): """ Workflow Requests invoke a particular workflow with a particular argument and some user-supplied files. """ @@ -453,7 +453,7 @@ class WorkflowRequest(Base): return f"<WorkflowRequest workflow_request_id= {self.workflow_request_id}>" -class WorkflowRequestFile(Base): +class WorkflowRequestFile(Base, WorkflowRequestFileIF): """ A Workflow Request File is a file supplied by the user and attached to the request they have submitted. """ @@ -479,6 +479,9 @@ class WorkflowRequestFile(Base): def __repr__(self): return f"<WorkflowRequestFile filename={self.filename}>" + def __json__(self, request): + return {"filename": self.filename, "content": self.content.decode("utf8")} + def get_engine(): """ diff --git a/shared/workspaces/src/workspaces/services.py b/shared/workspaces/src/workspaces/services.py index d3b3959f5fcf651b493d9250c61ae39c570a6a47..de4ba2dae49838505d118b524d89bb18705e0473 100644 --- a/shared/workspaces/src/workspaces/services.py +++ b/shared/workspaces/src/workspaces/services.py @@ -8,11 +8,10 @@ from collections import namedtuple from pathlib import Path from queue import PriorityQueue, Queue, Empty from tempfile import mkdtemp -from typing import Dict, List, Union, Optional, Type, NamedTuple +from typing import Dict, List, Optional, NamedTuple from sqlalchemy.orm import Session from workflow.event_catcher import EventCatcher -from workspaces.capability_interfaces import CapabilityIF, CapabilityExecutionIF from .capability_interfaces import ( CapabilityServiceIF, @@ -398,6 +397,14 @@ class WorkflowService(WorkflowServiceIF): """ Executes workflows; should be a freestanding service. """ + # The next few things that need to happen here are in response to + # WorkflowEvents that we receive from wf_monitor as the workflow + # execution evolves. So we have to set up listening at some point + # in this class + def __init__(self, info: WorkflowInfoIF): + # 1. Start listening for events from the wf_monitor stream + # self.channel = workflow_events.listen(self.on_workflow_event) + self.info = info def execute(self, workflow_name: str, argument: Dict, files: List[Path]): """ @@ -405,17 +412,18 @@ class WorkflowService(WorkflowServiceIF): """ # 1. look up workflow, returns workflow - info = WorkflowInfo(self) - definition = info.lookup_workflow_definition(workflow_name) + definition = self.info.lookup_workflow_definition(workflow_name) # 2. create and save request, return request id - record = info.create_workflow_request(workflow_name, argument) + request_id = self.info.create_workflow_request(definition, argument) # 3. render templates to files, returns list of rendered files - contents = definition.render_templates(argument, files) + workflow_files = definition.render_templates(argument, files) + for file in workflow_files: + self.info.save_file(request_id, file.filename, file.content) # 4. prepare files for condor execution - temp_folder = self._prepare_files_for_condor(contents) + temp_folder = self._prepare_files_for_condor(workflow_files) # 5. execute condor and retrieve log file log_file = self._execute_prepared(temp_folder) @@ -427,7 +435,7 @@ class WorkflowService(WorkflowServiceIF): with workflow_events: for e in events: # send amqp event and update database - self.on_workflow_event(e, record, temp_folder) + self.on_workflow_event(e, request_id, temp_folder) @staticmethod def _prepare_files_for_condor(files: List[AbstractFile]) -> Path: @@ -464,26 +472,20 @@ class WorkflowService(WorkflowServiceIF): # some file in here should end in .dag; that file is our dagman input dagman = list(folder.glob("*.dag"))[0] + print(f"dagman file {dagman} exists.") # ensure the log file exists logfile = folder / "condor.log" + print("condor.log exists.") logfile.touch() # submit + print("submitting to condor...") subprocess.run(["condor_submit_dag", str(dagman)], cwd=str(folder.absolute())) # return the logfile return logfile - # The next few things that need to happen here are in response to - # WorkflowEvents that we receive from wf_monitor as the workflow - # execution evolves. So we have to set up listening at some point - # in this class - def __init__(self): - # 1. Start listening for events from the wf_monitor stream - # self.channel = workflow_events.listen(self.on_workflow_event) - pass - def on_workflow_event( self, event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path ): @@ -564,36 +566,36 @@ class WorkflowInfo(WorkflowInfoIF): def create_workflow_request( self, - workflow_name: str, + workflow: Workflow, argument: Dict) -> WorkflowRequest: """ Create new workflow request and save to database - :param workflow_name: name of workflow to run + :param workflow: workflow to run :param argument: workflow arguments :return: new WorkflowRequest """ - request = WorkflowRequest(workflow_name=workflow_name, argument=argument) + request = WorkflowRequest(workflow_name=workflow.workflow_name, argument=argument) self.save_request(request) return request def save_request(self, request: WorkflowRequest): """ - Save a given entity and return an integer identifier for it + Save a given request and return an integer identifier for it :param request: request to save - :return: the entity's identifier """ self.session.add(request) self.session.flush() - def save_file(self, request_id: int, filename: str, content: str): + def save_file(self, request: WorkflowRequest, filename: str, content: bytes) -> WorkflowRequestFile: """ - Save a given List of request files - :param request_id: - :param content: - :param filename: + Save a given file for the specified request to the database + :param request: request to update with new file + :param filename: filename of new file + :param content: contents of new file in bytes :return: """ - wrf = WorkflowRequestFile(workflow_request_id=request_id, filename=filename, content=content.encode()) + wrf = WorkflowRequestFile(workflow_request_id=request.workflow_request_id, filename=filename, content=content) self.session.add(wrf) self.session.flush() + return wrf diff --git a/shared/workspaces/src/workspaces/workflow_interfaces.py b/shared/workspaces/src/workspaces/workflow_interfaces.py index a0bdb2ddffc573118f2c17bc0cebf683b1308965..9263acb11d804f09c2d8c15ff83318b2d9d5f739 100644 --- a/shared/workspaces/src/workspaces/workflow_interfaces.py +++ b/shared/workspaces/src/workspaces/workflow_interfaces.py @@ -4,20 +4,38 @@ Interfaces used by the Workspaces' workflow system. import inspect from abc import ABC, abstractmethod -from enum import Enum from pathlib import Path -from queue import PriorityQueue -from threading import Thread -from typing import Dict, List, Optional, Type +from typing import Dict, List """ - Workflow System - """ +class WorkflowRequestFileIF: + filename: str + content: str + + +class WorkflowIF: + workflow_name: str + templates: List['WorkflowTemplate'] + requests: List['WorkflowRequestIF'] + + def render_templates(self, argument: Dict, files: List[Path]) -> List[WorkflowRequestFileIF]: + raise NotImplementedError + + +class WorkflowRequestIF: + workflow_request_id: int + workflow: WorkflowIF + name: str + argument: str # JSON-formatted + files: List[WorkflowRequestFileIF] + job_id: str + + class WorkflowServiceIF(ABC): """ Executes workflows; should be a freestanding service. @@ -39,11 +57,9 @@ class WorkflowInfoIF(ABC): """ Holds information about workflows. """ - definition: "WorkflowDefinitionIF" - request: "WorkflowRequestIF" @abstractmethod - def lookup_workflow_definition(self, name: str) -> 'Workflow': + def lookup_workflow_definition(self, name: str) -> WorkflowIF: """ Look up the workflow with this name. @@ -52,14 +68,52 @@ class WorkflowInfoIF(ABC): """ raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + @abstractmethod + def lookup_workflow_request(self, request_id: int) -> WorkflowIF: + """ + Look up the workflow request with this ID. -class WorkflowDefinitionIF(ABC): - pass + :param request_id: Request ID + :return Workflow instance + """ + raise NotImplementedError + @abstractmethod + def all_workflows(self) -> List[WorkflowIF]: + """ + Look up all the workflows that are defined by the system. -class WorkflowRequestIF(ABC): - workflow_definition: WorkflowDefinitionIF - name: str - argument: str # JSON-formatted - files: List[Path] - job_id: str + :return all the workflows + """ + raise NotImplementedError + + @abstractmethod + def create_workflow_request( + self, + workflow: WorkflowIF, + argument: Dict) -> WorkflowRequestIF: + """ + Create new workflow request and save to database + :param workflow: workflow to run + :param argument: workflow arguments + :return: new WorkflowRequest + """ + raise NotImplementedError + + def save_request(self, request: WorkflowRequestIF): + """ + Save a given request and return an integer identifier for it + :param request: request to save + :return: the request id + """ + raise NotImplementedError + + def save_file(self, request: WorkflowRequestIF, filename: str, content: bytes) -> WorkflowRequestFileIF: + """ + Save a given file for the specified request to the database + :param request: request to update with new file + :param filename: filename of new file + :param content: contents of new file in bytes + :return: + """ + raise NotImplementedError diff --git a/shared/workspaces/test/test_workflow_services.py b/shared/workspaces/test/test_workflow_services.py new file mode 100644 index 0000000000000000000000000000000000000000..dfba65cc945dfb68a34fec3ae7fe889ac002b394 --- /dev/null +++ b/shared/workspaces/test/test_workflow_services.py @@ -0,0 +1,47 @@ +from typing import Dict, List + +from workspaces.schema import Workflow, WorkflowTemplate, WorkflowRequest, WorkflowRequestFile +from workspaces.services import WorkflowService +from workspaces.workflow_interfaces import WorkflowInfoIF, WorkflowIF, WorkflowRequestIF, WorkflowRequestFileIF + + +class FakeWorkflowInfo(WorkflowInfoIF): + def __init__(self): + self.requests = [] + + def lookup_workflow_request(self, request_id: int) -> WorkflowIF: + return self.requests[request_id-1] + + def all_workflows(self) -> List[WorkflowIF]: + return [self.lookup_workflow_definition('null')] + + def create_workflow_request(self, workflow: WorkflowIF, argument: Dict) -> WorkflowRequestIF: + request = WorkflowRequest(workflow_name=workflow.workflow_name, argument=argument) + self.save_request(request) + return request + + def save_request(self, request: WorkflowRequestIF): + request.workflow_request_id = len(self.requests)+1 + self.requests.append(request) + + def save_file(self, request: WorkflowRequestIF, filename: str, content: bytes) -> WorkflowRequestFileIF: + request.files.append(WorkflowRequestFile(filename=filename, content=content)) + + def lookup_workflow_definition(self, name: str) -> 'Workflow': + if name == 'null': + w = Workflow(workflow_name='null') + w.templates.append(WorkflowTemplate(filename='null.condor', + content=b'executable = null\narguments = "-n"\n\nqueue')) + return w + raise NotImplementedError + + +info = FakeWorkflowInfo() + + +def test_workflow_creation(): + global info + service = WorkflowService(info) + + # execute the null workflow + service.execute('null', '-n', [])