Skip to content
Snippets Groups Projects
Commit fd49f5d0 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Many changes to table structure, ORM mapping and interface layer

 - Redefined the tables based on what we know currently
 - Brought over interfaces from wksp0
 - Fleshed out object model some
 - Replaced request object of SQL Alchemy session with WorkflowInfo instance
parent 83724984
No related branches found
No related tags found
No related merge requests found
......@@ -16,20 +16,22 @@ depends_on = None
def upgrade():
op.create_table('workflows',
sa.Column('workflow_id', sa.Integer, primary_key=True, autoincrement=True,
comment='the unique id of the workflow. auto-generated.'),
sa.Column('name', sa.String, unique=True, comment='a human-readable name for the workflow.'),
sa.Column('workflow_name', sa.String, primary_key=True, comment='the human-readable name for the workflow.'),
comment='A listing of the available workflows in the system.')
op.create_table('workflow_templates',
sa.Column('filename', sa.String, primary_key=True, comment='the filename of the template'),
sa.Column('content', sa.String, nullable=False, comment='the content of the template'),
sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name'), primary_key=True,
comment='the name of the workflow this template belongs to'),
comment='Templates associated with workflows')
op.create_table('workflow_requests',
sa.Column('workflow_request_id', sa.Integer, primary_key=True, autoincrement=True,
comment='the unique id of the request. auto-generated'),
sa.Column('job_id', sa.Integer,
comment='the id of the job that this request generted in the HTCondor system.'),
sa.Column('workflow_id', sa.Integer, sa.ForeignKey('workflows.workflow_id'),
comment='the id of the workflow used in this request.'),
sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name')),
sa.Column('argument', sa.JSON, comment='the argument(s) used for the workflow in this request.'),
comment='A listing of requests for workflows and te resulting job ids.')
comment='A listing of requests for workflow execution.')
op.create_table('workflow_request_files',
sa.Column('workflow_request_id', sa.Integer, sa.ForeignKey('workflow_requests.workflow_request_id'),
......@@ -37,11 +39,17 @@ def upgrade():
sa.Column('filename', sa.String, comment='the name of this file', nullable=False),
sa.Column('content', sa.String, comment='the contents of the file', nullable=False),
comment='A man-to-many mapping table tracking which files were used for workflow requests.')
op.create_unique_constraint('workflow_request_filenames_uniq', 'workflow_request_files',
['workflow_request_id', 'filename'])
# populate with some initial data
op.execute("INSERT INTO workflows (workflow_name) VALUES ('null')")
op.execute("INSERT INTO workflow_templates (workflow_name, filename, content) VALUES ('null', 'null.sh', 'null {{arguments}}')")
def downgrade():
op.drop_table('workflow_request_files')
op.drop_table('workflow_requests')
op.drop_table('workflow_templates')
op.drop_table('workflows')
\ No newline at end of file
import sqlalchemy
import inspect
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from typing import List, Dict
import sqlalchemy as sa
import zope.sqlalchemy
from pycapo import CapoConfig
from pyramid.view import view_config, view_defaults
......@@ -7,20 +13,19 @@ from pyramid_beaker import session_factory_from_settings
from pyramid.renderers import JSONP
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import sessionmaker, relationship
"""
Work done:
☑ Initial sketch of using SQL Alchemy with Pyramid
☑ Sketch of first routes we need
☑ Bring over interfaces from wksp0 project
☑ Need to flesh out the object model—requests have their own files, workflows have templates
☑ Separate REST hierarchy for workflow definitions
Work to do:
☐ Bring over interfaces from wksp0 project
☐ Need to flesh out the object model—requests have their own files, workflows have templates
☐ Separate REST hierarchy for workflow definitions
☐ Actually do the work, preferably in model classes, not REST API directly
☐ Separate this into separate modules, once it makes sense to people how it works
☐ Workflow initiation CLI
......@@ -31,6 +36,52 @@ To consider:
"""
# ---------------------------------------------------------
#
# I N T E R F A C E S
#
# ---------------------------------------------------------
class WorkflowService(ABC):
"""
Executes workflows; should be a freestanding service.
"""
@abstractmethod
def execute(self, workflow_name: str, argument: Dict, files: List[Path]):
"""
Execute this workflow against these files.
:param workflow_name: name of the workflow to run
:param argument: extra argument (a JSON object)
:param files: some extra files the workflow should consider
:return: a stream of events from this workflow
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class WorkflowInfo(ABC):
"""
Holds information about workflows.
"""
@abstractmethod
def lookup_workflow_definition(self, name: str) -> 'Workflow':
"""
Look up the workflow with this name.
:param name: Workflow name
:return: Workflow instance
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class AbstractFile:
def __init__(self, filename, content):
self.filename, self.content = filename, content
def write_to(self, directory: Path):
raise NotImplementedError('Actually save the file into the directory passed')
# ---------------------------------------------------------
#
......@@ -38,24 +89,87 @@ To consider:
#
# ---------------------------------------------------------
Base = declarative_base()
class Workflow(Base):
"""
A Workflow is a suite of tasks defined by templates that must be executed together with some user-submitted files.
A Workflow is a suite of tasks defined by templates that must be
executed together with some user-submitted files.
"""
__tablename__ = 'workflows'
id = sqlalchemy.Column('workflow_id', sqlalchemy.String, primary_key=True, autoincrement=True)
name = sqlalchemy.Column('name', sqlalchemy.String)
workflow_name = sa.Column('workflow_name', sa.String, primary_key=True)
templates = relationship('WorkflowTemplate', backref='workflow')
requests = relationship('WorkflowRequest', backref='workflow')
def __json__(self, request):
return {'id': self.id, 'name': self.name}
return {'workflow_name': self.workflow_name}
def render_templates(self, argument: Dict, files: List[Path]) -> List[AbstractFile]:
"""
Render the templates associated with this workflow
:param argument: the workflow argument JSON
:param files: the files to be processed
:return: a list of rendered templates
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class WorkflowTemplate(Base):
"""
Workflow Templates are Mustache-formatted files associated with a given Workflow
"""
__tablename__ = 'workflow_templates'
filename = sa.Column('filename', sa.String, primary_key=True)
content = sa.Column('content', sa.String, nullable=False)
workflow_name = sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name'), primary_key=True)
def render(self, argument: Dict) -> AbstractFile:
raise NotImplementedError('Add mustache to render this template')
class WorkflowRequest(Base):
"""
Workflow Requests invoke a particular workflow with a particular argument and some user-supplied files.
"""
__tablename__ = 'workflow_requests'
workflow_request_id = sa.Column('workflow_request_id', sa.Integer, primary_key=True, autoincrement=True)
workflow_name = sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name'))
argument = sa.Column('argument', sa.JSON)
files = relationship('WorkflowRequestFile', backref='request')
class WorkflowRequestFile(Base):
__tablename__ = 'workflow_request_files'
workflow_request_id = sa.Column('workflow_request_id', sa.Integer, sa.ForeignKey('workflow_requests.workflow_request_id'),
primary_key=True)
filename = sa.Column('filename', sa.String, primary_key=True)
content = sa.Column('content', sa.String, nullable=False)
@property
def file(self) -> AbstractFile:
return AbstractFile(self.filename, self.content)
WORKFLOWS = [{'id': 1, 'name': 'foo', 'files': {'file.txt': {'id': 1, 'name': 'file.txt', 'content': 'Hello, world!'}}}]
class DBWorkflowInfo(WorkflowInfo):
"""
Implements WorkflowInfo, backed by a relational database, using SQL Alchemy.
"""
def __init__(self, session):
self.session = session
def lookup_workflow_definition(self, name: str) -> Workflow:
return self.session.query(Workflow).get(name)
def all_workflows(self) -> List[Workflow]:
return self.session.query(Workflow).all()
# ---------------------------------------------------------
#
# S E R V I C E S
......@@ -65,7 +179,7 @@ WORKFLOWS = [{'id': 1, 'name': 'foo', 'files': {'file.txt': {'id': 1, 'name': 'f
def lookup_workflow(request):
# this should change to use SQL Alchemy
return WORKFLOWS[int(request.matchdict['id'])]
return request.info.lookup_workflow_definition(request.matchdict['id'])
def lookup_file(request):
......@@ -91,7 +205,7 @@ class WorkflowService:
Audience: front-end
:return:
"""
return self.request.dbsession.query(Workflow).all()
return self.request.info.all_workflows()
@view_config(request_method='POST')
def create_workflow(self):
......@@ -220,7 +334,6 @@ def get_tm_session(session_factory, transaction_manager):
# ---------------------------------------------------------
def main(global_config, **settings):
with Configurator(settings=settings) as config:
session_factory = session_factory_from_settings(settings)
......@@ -241,8 +354,8 @@ def main(global_config, **settings):
# make request.dbsession available for use in Pyramid
config.add_request_method(
# r.tm is the transaction manager used by pyramid_tm
lambda r: get_tm_session(session_factory, r.tm),
'dbsession',
lambda request: DBWorkflowInfo(get_tm_session(session_factory, request.tm)),
'info',
reify=True
)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment