From fd49f5d0651a3f5415c895d800926550be3dbbfb Mon Sep 17 00:00:00 2001
From: Daniel K Lyons <dlyons@nrao.edu>
Date: Thu, 3 Sep 2020 11:42:25 -0600
Subject: [PATCH] Many changes to table structure, ORM mapping and interface
 layer

 - Redefined the tables based on what we know currently
 - Brought over interfaces from wksp0
 - Fleshed out object model some
 - Replaced request object of SQL Alchemy session with WorkflowInfo instance
---
 .../versions/44d5bbbf2615_workspaces_init.py  |  24 ++-
 services/workflow/src/workflow/server.py      | 143 ++++++++++++++++--
 2 files changed, 144 insertions(+), 23 deletions(-)

diff --git a/schema/versions/44d5bbbf2615_workspaces_init.py b/schema/versions/44d5bbbf2615_workspaces_init.py
index c2a58a44c..0b3922aa3 100644
--- a/schema/versions/44d5bbbf2615_workspaces_init.py
+++ b/schema/versions/44d5bbbf2615_workspaces_init.py
@@ -16,20 +16,22 @@ depends_on = None
 
 def upgrade():
     op.create_table('workflows',
-                    sa.Column('workflow_id', sa.Integer, primary_key=True, autoincrement=True,
-                              comment='the unique id of the workflow. auto-generated.'),
-                    sa.Column('name', sa.String, unique=True, comment='a human-readable name for the workflow.'),
+                    sa.Column('workflow_name', sa.String, primary_key=True, comment='the human-readable name for the workflow.'),
                     comment='A listing of the available workflows in the system.')
 
+    op.create_table('workflow_templates',
+                    sa.Column('filename', sa.String, primary_key=True, comment='the filename of the template'),
+                    sa.Column('content', sa.String, nullable=False, comment='the content of the template'),
+                    sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name'), primary_key=True,
+                              comment='the name of the workflow this template belongs to'),
+                    comment='Templates associated with workflows')
+
     op.create_table('workflow_requests',
                     sa.Column('workflow_request_id', sa.Integer, primary_key=True, autoincrement=True,
                               comment='the unique id of the request. auto-generated'),
-                    sa.Column('job_id', sa.Integer,
-                              comment='the id of the job that this request generted in the HTCondor system.'),
-                    sa.Column('workflow_id', sa.Integer, sa.ForeignKey('workflows.workflow_id'),
-                              comment='the id of the workflow used in this request.'),
+                    sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name')),
                     sa.Column('argument', sa.JSON, comment='the argument(s) used for the workflow in this request.'),
-                    comment='A listing of requests for workflows and te resulting job ids.')
+                    comment='A listing of requests for workflow execution.')
 
     op.create_table('workflow_request_files',
                     sa.Column('workflow_request_id', sa.Integer, sa.ForeignKey('workflow_requests.workflow_request_id'),
@@ -37,11 +39,17 @@ def upgrade():
                     sa.Column('filename', sa.String, comment='the name of this file', nullable=False),
                     sa.Column('content', sa.String, comment='the contents of the file', nullable=False),
                     comment='A man-to-many mapping table tracking which files were used for workflow requests.')
+
     op.create_unique_constraint('workflow_request_filenames_uniq', 'workflow_request_files',
                                 ['workflow_request_id', 'filename'])
 
+    # populate with some initial data
+    op.execute("INSERT INTO workflows (workflow_name) VALUES ('null')")
+    op.execute("INSERT INTO workflow_templates (workflow_name, filename, content) VALUES ('null', 'null.sh', 'null {{arguments}}')")
+
 
 def downgrade():
     op.drop_table('workflow_request_files')
     op.drop_table('workflow_requests')
+    op.drop_table('workflow_templates')
     op.drop_table('workflows')
\ No newline at end of file
diff --git a/services/workflow/src/workflow/server.py b/services/workflow/src/workflow/server.py
index b5f8d0495..f4c8c0147 100644
--- a/services/workflow/src/workflow/server.py
+++ b/services/workflow/src/workflow/server.py
@@ -1,4 +1,10 @@
-import sqlalchemy
+import inspect
+from abc import ABC, abstractmethod
+from dataclasses import dataclass
+from pathlib import Path
+from typing import List, Dict
+
+import sqlalchemy as sa
 import zope.sqlalchemy
 from pycapo import CapoConfig
 from pyramid.view import view_config, view_defaults
@@ -7,20 +13,19 @@ from pyramid_beaker import session_factory_from_settings
 from pyramid.renderers import JSONP
 from sqlalchemy import create_engine
 from sqlalchemy.ext.declarative import declarative_base
-from sqlalchemy.orm import sessionmaker
-
+from sqlalchemy.orm import sessionmaker, relationship
 
 """
 Work done:
 
  ☑ Initial sketch of using SQL Alchemy with Pyramid
  ☑ Sketch of first routes we need
+ ☑ Bring over interfaces from wksp0 project
+ ☑ Need to flesh out the object model—requests have their own files, workflows have templates
+ ☑ Separate REST hierarchy for workflow definitions
 
 Work to do:
 
- ☐ Bring over interfaces from wksp0 project
- ☐ Need to flesh out the object model—requests have their own files, workflows have templates
- ☐ Separate REST hierarchy for workflow definitions
  ☐ Actually do the work, preferably in model classes, not REST API directly
  ☐ Separate this into separate modules, once it makes sense to people how it works
  ☐ Workflow initiation CLI
@@ -31,6 +36,52 @@ To consider:
 
 """
 
+# ---------------------------------------------------------
+#
+#    I N T E R F A C E S
+#
+# ---------------------------------------------------------
+
+
+class WorkflowService(ABC):
+    """
+    Executes workflows; should be a freestanding service.
+    """
+    @abstractmethod
+    def execute(self, workflow_name: str, argument: Dict, files: List[Path]):
+        """
+        Execute this workflow against these files.
+
+        :param workflow_name:  name of the workflow to run
+        :param argument:       extra argument (a JSON object)
+        :param files:          some extra files the workflow should consider
+        :return:               a stream of events from this workflow
+        """
+        raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
+
+
+class WorkflowInfo(ABC):
+    """
+    Holds information about workflows.
+    """
+    @abstractmethod
+    def lookup_workflow_definition(self, name: str) -> 'Workflow':
+        """
+        Look up the workflow with this name.
+
+        :param name:  Workflow name
+        :return:      Workflow instance
+        """
+        raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
+
+
+class AbstractFile:
+    def __init__(self, filename, content):
+        self.filename, self.content = filename, content
+
+    def write_to(self, directory: Path):
+        raise NotImplementedError('Actually save the file into the directory passed')
+
 
 # ---------------------------------------------------------
 #
@@ -38,24 +89,87 @@ To consider:
 #
 # ---------------------------------------------------------
 
+
 Base = declarative_base()
 
 
 class Workflow(Base):
     """
-    A Workflow is a suite of tasks defined by templates that must be executed together with some user-submitted files.
+    A Workflow is a suite of tasks defined by templates that must be
+    executed together with some user-submitted files.
     """
     __tablename__ = 'workflows'
-    id = sqlalchemy.Column('workflow_id', sqlalchemy.String, primary_key=True, autoincrement=True)
-    name = sqlalchemy.Column('name', sqlalchemy.String)
+    workflow_name = sa.Column('workflow_name', sa.String, primary_key=True)
+    templates = relationship('WorkflowTemplate', backref='workflow')
+    requests = relationship('WorkflowRequest', backref='workflow')
 
     def __json__(self, request):
-        return {'id': self.id, 'name': self.name}
+        return {'workflow_name': self.workflow_name}
+
+    def render_templates(self, argument: Dict, files: List[Path]) -> List[AbstractFile]:
+        """
+        Render the templates associated with this workflow
+        :param argument: the workflow argument JSON
+        :param files:    the files to be processed
+        :return:         a list of rendered templates
+        """
+        raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
+
+
+class WorkflowTemplate(Base):
+    """
+    Workflow Templates are Mustache-formatted files associated with a given Workflow
+    """
+    __tablename__ = 'workflow_templates'
+    filename = sa.Column('filename', sa.String, primary_key=True)
+    content = sa.Column('content', sa.String, nullable=False)
+    workflow_name = sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name'), primary_key=True)
+
+    def render(self, argument: Dict) -> AbstractFile:
+        raise NotImplementedError('Add mustache to render this template')
+
+
+class WorkflowRequest(Base):
+    """
+    Workflow Requests invoke a particular workflow with a particular argument and some user-supplied files.
+    """
+    __tablename__ = 'workflow_requests'
+    workflow_request_id = sa.Column('workflow_request_id', sa.Integer, primary_key=True, autoincrement=True)
+    workflow_name = sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name'))
+    argument = sa.Column('argument', sa.JSON)
+    files = relationship('WorkflowRequestFile', backref='request')
+
+
+class WorkflowRequestFile(Base):
+    __tablename__ = 'workflow_request_files'
+    workflow_request_id = sa.Column('workflow_request_id', sa.Integer, sa.ForeignKey('workflow_requests.workflow_request_id'),
+                                    primary_key=True)
+    filename = sa.Column('filename', sa.String, primary_key=True)
+    content = sa.Column('content', sa.String, nullable=False)
+
+    @property
+    def file(self) -> AbstractFile:
+        return AbstractFile(self.filename, self.content)
 
 
 WORKFLOWS = [{'id': 1, 'name': 'foo', 'files': {'file.txt': {'id': 1, 'name': 'file.txt', 'content': 'Hello, world!'}}}]
 
 
+class DBWorkflowInfo(WorkflowInfo):
+    """
+    Implements WorkflowInfo, backed by a relational database, using SQL Alchemy.
+    """
+
+    def __init__(self, session):
+        self.session = session
+
+    def lookup_workflow_definition(self, name: str) -> Workflow:
+        return self.session.query(Workflow).get(name)
+
+    def all_workflows(self) -> List[Workflow]:
+        return self.session.query(Workflow).all()
+
+
 # ---------------------------------------------------------
 #
 #    S E R V I C E S
@@ -65,7 +179,7 @@ WORKFLOWS = [{'id': 1, 'name': 'foo', 'files': {'file.txt': {'id': 1, 'name': 'f
 
 def lookup_workflow(request):
     # this should change to use SQL Alchemy
-    return WORKFLOWS[int(request.matchdict['id'])]
+    return request.info.lookup_workflow_definition(request.matchdict['id'])
 
 
 def lookup_file(request):
@@ -91,7 +205,7 @@ class WorkflowService:
         Audience: front-end
         :return:
         """
-        return self.request.dbsession.query(Workflow).all()
+        return self.request.info.all_workflows()
 
     @view_config(request_method='POST')
     def create_workflow(self):
@@ -220,7 +334,6 @@ def get_tm_session(session_factory, transaction_manager):
 # ---------------------------------------------------------
 
 
-
 def main(global_config, **settings):
     with Configurator(settings=settings) as config:
         session_factory = session_factory_from_settings(settings)
@@ -241,8 +354,8 @@ def main(global_config, **settings):
         # make request.dbsession available for use in Pyramid
         config.add_request_method(
             # r.tm is the transaction manager used by pyramid_tm
-            lambda r: get_tm_session(session_factory, r.tm),
-            'dbsession',
+            lambda request: DBWorkflowInfo(get_tm_session(session_factory, request.tm)),
+            'info',
             reify=True
         )
 
-- 
GitLab