diff --git a/schema/versions/44d5bbbf2615_workspaces_init.py b/schema/versions/44d5bbbf2615_workspaces_init.py index c2a58a44c81c6220fbbfb45bcf48cdc1bffd4cef..0b3922aa3d64c1ac96c2eae9e82eef3c62c7cbea 100644 --- a/schema/versions/44d5bbbf2615_workspaces_init.py +++ b/schema/versions/44d5bbbf2615_workspaces_init.py @@ -16,20 +16,22 @@ depends_on = None def upgrade(): op.create_table('workflows', - sa.Column('workflow_id', sa.Integer, primary_key=True, autoincrement=True, - comment='the unique id of the workflow. auto-generated.'), - sa.Column('name', sa.String, unique=True, comment='a human-readable name for the workflow.'), + sa.Column('workflow_name', sa.String, primary_key=True, comment='the human-readable name for the workflow.'), comment='A listing of the available workflows in the system.') + op.create_table('workflow_templates', + sa.Column('filename', sa.String, primary_key=True, comment='the filename of the template'), + sa.Column('content', sa.String, nullable=False, comment='the content of the template'), + sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name'), primary_key=True, + comment='the name of the workflow this template belongs to'), + comment='Templates associated with workflows') + op.create_table('workflow_requests', sa.Column('workflow_request_id', sa.Integer, primary_key=True, autoincrement=True, comment='the unique id of the request. auto-generated'), - sa.Column('job_id', sa.Integer, - comment='the id of the job that this request generted in the HTCondor system.'), - sa.Column('workflow_id', sa.Integer, sa.ForeignKey('workflows.workflow_id'), - comment='the id of the workflow used in this request.'), + sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name')), sa.Column('argument', sa.JSON, comment='the argument(s) used for the workflow in this request.'), - comment='A listing of requests for workflows and te resulting job ids.') + comment='A listing of requests for workflow execution.') op.create_table('workflow_request_files', sa.Column('workflow_request_id', sa.Integer, sa.ForeignKey('workflow_requests.workflow_request_id'), @@ -37,11 +39,17 @@ def upgrade(): sa.Column('filename', sa.String, comment='the name of this file', nullable=False), sa.Column('content', sa.String, comment='the contents of the file', nullable=False), comment='A man-to-many mapping table tracking which files were used for workflow requests.') + op.create_unique_constraint('workflow_request_filenames_uniq', 'workflow_request_files', ['workflow_request_id', 'filename']) + # populate with some initial data + op.execute("INSERT INTO workflows (workflow_name) VALUES ('null')") + op.execute("INSERT INTO workflow_templates (workflow_name, filename, content) VALUES ('null', 'null.sh', 'null {{arguments}}')") + def downgrade(): op.drop_table('workflow_request_files') op.drop_table('workflow_requests') + op.drop_table('workflow_templates') op.drop_table('workflows') \ No newline at end of file diff --git a/services/workflow/src/workflow/server.py b/services/workflow/src/workflow/server.py index b5f8d0495e0ed6d15ff1ce869311cdaef0d944c5..f4c8c0147d0ecae33fc3cfb0abaf5cd2ebbc8b22 100644 --- a/services/workflow/src/workflow/server.py +++ b/services/workflow/src/workflow/server.py @@ -1,4 +1,10 @@ -import sqlalchemy +import inspect +from abc import ABC, abstractmethod +from dataclasses import dataclass +from pathlib import Path +from typing import List, Dict + +import sqlalchemy as sa import zope.sqlalchemy from pycapo import CapoConfig from pyramid.view import view_config, view_defaults @@ -7,20 +13,19 @@ from pyramid_beaker import session_factory_from_settings from pyramid.renderers import JSONP from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker - +from sqlalchemy.orm import sessionmaker, relationship """ Work done: ☑ Initial sketch of using SQL Alchemy with Pyramid ☑ Sketch of first routes we need + ☑ Bring over interfaces from wksp0 project + ☑ Need to flesh out the object model—requests have their own files, workflows have templates + ☑ Separate REST hierarchy for workflow definitions Work to do: - ☠Bring over interfaces from wksp0 project - ☠Need to flesh out the object model—requests have their own files, workflows have templates - ☠Separate REST hierarchy for workflow definitions ☠Actually do the work, preferably in model classes, not REST API directly ☠Separate this into separate modules, once it makes sense to people how it works ☠Workflow initiation CLI @@ -31,6 +36,52 @@ To consider: """ +# --------------------------------------------------------- +# +# I N T E R F A C E S +# +# --------------------------------------------------------- + + +class WorkflowService(ABC): + """ + Executes workflows; should be a freestanding service. + """ + @abstractmethod + def execute(self, workflow_name: str, argument: Dict, files: List[Path]): + """ + Execute this workflow against these files. + + :param workflow_name: name of the workflow to run + :param argument: extra argument (a JSON object) + :param files: some extra files the workflow should consider + :return: a stream of events from this workflow + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +class WorkflowInfo(ABC): + """ + Holds information about workflows. + """ + @abstractmethod + def lookup_workflow_definition(self, name: str) -> 'Workflow': + """ + Look up the workflow with this name. + + :param name: Workflow name + :return: Workflow instance + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +class AbstractFile: + def __init__(self, filename, content): + self.filename, self.content = filename, content + + def write_to(self, directory: Path): + raise NotImplementedError('Actually save the file into the directory passed') + # --------------------------------------------------------- # @@ -38,24 +89,87 @@ To consider: # # --------------------------------------------------------- + Base = declarative_base() class Workflow(Base): """ - A Workflow is a suite of tasks defined by templates that must be executed together with some user-submitted files. + A Workflow is a suite of tasks defined by templates that must be + executed together with some user-submitted files. """ __tablename__ = 'workflows' - id = sqlalchemy.Column('workflow_id', sqlalchemy.String, primary_key=True, autoincrement=True) - name = sqlalchemy.Column('name', sqlalchemy.String) + workflow_name = sa.Column('workflow_name', sa.String, primary_key=True) + templates = relationship('WorkflowTemplate', backref='workflow') + requests = relationship('WorkflowRequest', backref='workflow') def __json__(self, request): - return {'id': self.id, 'name': self.name} + return {'workflow_name': self.workflow_name} + + def render_templates(self, argument: Dict, files: List[Path]) -> List[AbstractFile]: + """ + Render the templates associated with this workflow + :param argument: the workflow argument JSON + :param files: the files to be processed + :return: a list of rendered templates + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +class WorkflowTemplate(Base): + """ + Workflow Templates are Mustache-formatted files associated with a given Workflow + """ + __tablename__ = 'workflow_templates' + filename = sa.Column('filename', sa.String, primary_key=True) + content = sa.Column('content', sa.String, nullable=False) + workflow_name = sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name'), primary_key=True) + + def render(self, argument: Dict) -> AbstractFile: + raise NotImplementedError('Add mustache to render this template') + + +class WorkflowRequest(Base): + """ + Workflow Requests invoke a particular workflow with a particular argument and some user-supplied files. + """ + __tablename__ = 'workflow_requests' + workflow_request_id = sa.Column('workflow_request_id', sa.Integer, primary_key=True, autoincrement=True) + workflow_name = sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name')) + argument = sa.Column('argument', sa.JSON) + files = relationship('WorkflowRequestFile', backref='request') + + +class WorkflowRequestFile(Base): + __tablename__ = 'workflow_request_files' + workflow_request_id = sa.Column('workflow_request_id', sa.Integer, sa.ForeignKey('workflow_requests.workflow_request_id'), + primary_key=True) + filename = sa.Column('filename', sa.String, primary_key=True) + content = sa.Column('content', sa.String, nullable=False) + + @property + def file(self) -> AbstractFile: + return AbstractFile(self.filename, self.content) WORKFLOWS = [{'id': 1, 'name': 'foo', 'files': {'file.txt': {'id': 1, 'name': 'file.txt', 'content': 'Hello, world!'}}}] +class DBWorkflowInfo(WorkflowInfo): + """ + Implements WorkflowInfo, backed by a relational database, using SQL Alchemy. + """ + + def __init__(self, session): + self.session = session + + def lookup_workflow_definition(self, name: str) -> Workflow: + return self.session.query(Workflow).get(name) + + def all_workflows(self) -> List[Workflow]: + return self.session.query(Workflow).all() + + # --------------------------------------------------------- # # S E R V I C E S @@ -65,7 +179,7 @@ WORKFLOWS = [{'id': 1, 'name': 'foo', 'files': {'file.txt': {'id': 1, 'name': 'f def lookup_workflow(request): # this should change to use SQL Alchemy - return WORKFLOWS[int(request.matchdict['id'])] + return request.info.lookup_workflow_definition(request.matchdict['id']) def lookup_file(request): @@ -91,7 +205,7 @@ class WorkflowService: Audience: front-end :return: """ - return self.request.dbsession.query(Workflow).all() + return self.request.info.all_workflows() @view_config(request_method='POST') def create_workflow(self): @@ -220,7 +334,6 @@ def get_tm_session(session_factory, transaction_manager): # --------------------------------------------------------- - def main(global_config, **settings): with Configurator(settings=settings) as config: session_factory = session_factory_from_settings(settings) @@ -241,8 +354,8 @@ def main(global_config, **settings): # make request.dbsession available for use in Pyramid config.add_request_method( # r.tm is the transaction manager used by pyramid_tm - lambda r: get_tm_session(session_factory, r.tm), - 'dbsession', + lambda request: DBWorkflowInfo(get_tm_session(session_factory, request.tm)), + 'info', reify=True )