From 054b24cf0c23159659f45d44e38d0548e7d5fc66 Mon Sep 17 00:00:00 2001 From: Daniel K Lyons <dlyons@nrao.edu> Date: Tue, 13 Oct 2020 14:26:16 -0600 Subject: [PATCH] Add test_workflow_creation to tests in shared/workspaces --- shared/workspaces/src/workspaces/schema.py | 10 +-- shared/workspaces/src/workspaces/services.py | 14 ++- .../src/workspaces/workflow_interfaces.py | 88 +++++++++++++++---- .../workspaces/test/test_workflow_services.py | 47 ++++++++++ 4 files changed, 129 insertions(+), 30 deletions(-) create mode 100644 shared/workspaces/test/test_workflow_services.py diff --git a/shared/workspaces/src/workspaces/schema.py b/shared/workspaces/src/workspaces/schema.py index 700a60c13..e18f0a678 100644 --- a/shared/workspaces/src/workspaces/schema.py +++ b/shared/workspaces/src/workspaces/schema.py @@ -12,6 +12,7 @@ from pycapo import CapoConfig from sqlalchemy import create_engine from sqlalchemy.orm import relationship, sessionmaker from sqlalchemy.ext.declarative import declarative_base +from workspaces.workflow_interfaces import WorkflowIF, WorkflowRequestIF, WorkflowRequestFileIF from .capability_interfaces import ( CapabilityIF, @@ -333,7 +334,7 @@ class CapabilityExecution(Base, CapabilityExecutionIF): return self.version.capability -class Workflow(Base): +class Workflow(Base, WorkflowIF): """ A Workflow is a suite of tasks defined by templates that must be executed together with some user-submitted files. @@ -365,7 +366,7 @@ class Workflow(Base): if self.workflow_name == 'null': for template in self.templates: filename = template.filename - contents = template.contents + contents = template.content if filename == 'null.condor': contents = contents.decode("utf8") contents.replace("{{arguments}}", argument) @@ -375,7 +376,6 @@ class Workflow(Base): return rendered_files - class WorkflowTemplate(Base): """ Workflow Templates are Mustache-formatted files associated with a given Workflow @@ -410,7 +410,7 @@ class WorkflowTemplate(Base): return f"<WorkflowTemplate filename={self.filename} for workflow={self.workflow_name}>" -class WorkflowRequest(Base): +class WorkflowRequest(Base, WorkflowRequestIF): """ Workflow Requests invoke a particular workflow with a particular argument and some user-supplied files. """ @@ -453,7 +453,7 @@ class WorkflowRequest(Base): return f"<WorkflowRequest workflow_request_id= {self.workflow_request_id}>" -class WorkflowRequestFile(Base): +class WorkflowRequestFile(Base, WorkflowRequestFileIF): """ A Workflow Request File is a file supplied by the user and attached to the request they have submitted. """ diff --git a/shared/workspaces/src/workspaces/services.py b/shared/workspaces/src/workspaces/services.py index 1046bd8a3..de4ba2dae 100644 --- a/shared/workspaces/src/workspaces/services.py +++ b/shared/workspaces/src/workspaces/services.py @@ -8,11 +8,10 @@ from collections import namedtuple from pathlib import Path from queue import PriorityQueue, Queue, Empty from tempfile import mkdtemp -from typing import Dict, List, Union, Optional, Type, NamedTuple +from typing import Dict, List, Optional, NamedTuple from sqlalchemy.orm import Session from workflow.event_catcher import EventCatcher -from workspaces.capability_interfaces import CapabilityIF, CapabilityExecutionIF from .capability_interfaces import ( CapabilityServiceIF, @@ -416,7 +415,7 @@ class WorkflowService(WorkflowServiceIF): definition = self.info.lookup_workflow_definition(workflow_name) # 2. create and save request, return request id - request_id = self.info.create_workflow_request(workflow_name, argument) + request_id = self.info.create_workflow_request(definition, argument) # 3. render templates to files, returns list of rendered files workflow_files = definition.render_templates(argument, files) @@ -571,7 +570,7 @@ class WorkflowInfo(WorkflowInfoIF): argument: Dict) -> WorkflowRequest: """ Create new workflow request and save to database - :param workflow_name: name of workflow to run + :param workflow: workflow to run :param argument: workflow arguments :return: new WorkflowRequest """ @@ -583,20 +582,19 @@ class WorkflowInfo(WorkflowInfoIF): """ Save a given request and return an integer identifier for it :param request: request to save - :return: the request id """ self.session.add(request) self.session.flush() - def save_file(self, request_id: int, filename: str, content: bytes): + def save_file(self, request: WorkflowRequest, filename: str, content: bytes) -> WorkflowRequestFile: """ Save a given file for the specified request to the database - :param request_id: request id of request to update with new file + :param request: request to update with new file :param filename: filename of new file :param content: contents of new file in bytes :return: """ - wrf = WorkflowRequestFile(workflow_request_id=request_id, filename=filename, content=content) + wrf = WorkflowRequestFile(workflow_request_id=request.workflow_request_id, filename=filename, content=content) self.session.add(wrf) self.session.flush() return wrf diff --git a/shared/workspaces/src/workspaces/workflow_interfaces.py b/shared/workspaces/src/workspaces/workflow_interfaces.py index a0bdb2ddf..9263acb11 100644 --- a/shared/workspaces/src/workspaces/workflow_interfaces.py +++ b/shared/workspaces/src/workspaces/workflow_interfaces.py @@ -4,20 +4,38 @@ Interfaces used by the Workspaces' workflow system. import inspect from abc import ABC, abstractmethod -from enum import Enum from pathlib import Path -from queue import PriorityQueue -from threading import Thread -from typing import Dict, List, Optional, Type +from typing import Dict, List """ - Workflow System - """ +class WorkflowRequestFileIF: + filename: str + content: str + + +class WorkflowIF: + workflow_name: str + templates: List['WorkflowTemplate'] + requests: List['WorkflowRequestIF'] + + def render_templates(self, argument: Dict, files: List[Path]) -> List[WorkflowRequestFileIF]: + raise NotImplementedError + + +class WorkflowRequestIF: + workflow_request_id: int + workflow: WorkflowIF + name: str + argument: str # JSON-formatted + files: List[WorkflowRequestFileIF] + job_id: str + + class WorkflowServiceIF(ABC): """ Executes workflows; should be a freestanding service. @@ -39,11 +57,9 @@ class WorkflowInfoIF(ABC): """ Holds information about workflows. """ - definition: "WorkflowDefinitionIF" - request: "WorkflowRequestIF" @abstractmethod - def lookup_workflow_definition(self, name: str) -> 'Workflow': + def lookup_workflow_definition(self, name: str) -> WorkflowIF: """ Look up the workflow with this name. @@ -52,14 +68,52 @@ class WorkflowInfoIF(ABC): """ raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + @abstractmethod + def lookup_workflow_request(self, request_id: int) -> WorkflowIF: + """ + Look up the workflow request with this ID. -class WorkflowDefinitionIF(ABC): - pass + :param request_id: Request ID + :return Workflow instance + """ + raise NotImplementedError + @abstractmethod + def all_workflows(self) -> List[WorkflowIF]: + """ + Look up all the workflows that are defined by the system. -class WorkflowRequestIF(ABC): - workflow_definition: WorkflowDefinitionIF - name: str - argument: str # JSON-formatted - files: List[Path] - job_id: str + :return all the workflows + """ + raise NotImplementedError + + @abstractmethod + def create_workflow_request( + self, + workflow: WorkflowIF, + argument: Dict) -> WorkflowRequestIF: + """ + Create new workflow request and save to database + :param workflow: workflow to run + :param argument: workflow arguments + :return: new WorkflowRequest + """ + raise NotImplementedError + + def save_request(self, request: WorkflowRequestIF): + """ + Save a given request and return an integer identifier for it + :param request: request to save + :return: the request id + """ + raise NotImplementedError + + def save_file(self, request: WorkflowRequestIF, filename: str, content: bytes) -> WorkflowRequestFileIF: + """ + Save a given file for the specified request to the database + :param request: request to update with new file + :param filename: filename of new file + :param content: contents of new file in bytes + :return: + """ + raise NotImplementedError diff --git a/shared/workspaces/test/test_workflow_services.py b/shared/workspaces/test/test_workflow_services.py new file mode 100644 index 000000000..dfba65cc9 --- /dev/null +++ b/shared/workspaces/test/test_workflow_services.py @@ -0,0 +1,47 @@ +from typing import Dict, List + +from workspaces.schema import Workflow, WorkflowTemplate, WorkflowRequest, WorkflowRequestFile +from workspaces.services import WorkflowService +from workspaces.workflow_interfaces import WorkflowInfoIF, WorkflowIF, WorkflowRequestIF, WorkflowRequestFileIF + + +class FakeWorkflowInfo(WorkflowInfoIF): + def __init__(self): + self.requests = [] + + def lookup_workflow_request(self, request_id: int) -> WorkflowIF: + return self.requests[request_id-1] + + def all_workflows(self) -> List[WorkflowIF]: + return [self.lookup_workflow_definition('null')] + + def create_workflow_request(self, workflow: WorkflowIF, argument: Dict) -> WorkflowRequestIF: + request = WorkflowRequest(workflow_name=workflow.workflow_name, argument=argument) + self.save_request(request) + return request + + def save_request(self, request: WorkflowRequestIF): + request.workflow_request_id = len(self.requests)+1 + self.requests.append(request) + + def save_file(self, request: WorkflowRequestIF, filename: str, content: bytes) -> WorkflowRequestFileIF: + request.files.append(WorkflowRequestFile(filename=filename, content=content)) + + def lookup_workflow_definition(self, name: str) -> 'Workflow': + if name == 'null': + w = Workflow(workflow_name='null') + w.templates.append(WorkflowTemplate(filename='null.condor', + content=b'executable = null\narguments = "-n"\n\nqueue')) + return w + raise NotImplementedError + + +info = FakeWorkflowInfo() + + +def test_workflow_creation(): + global info + service = WorkflowService(info) + + # execute the null workflow + service.execute('null', '-n', []) -- GitLab