From 08815f3eecd2beb71c02863bff3046eff0cd9189 Mon Sep 17 00:00:00 2001 From: Daniel K Lyons <dlyons@nrao.edu> Date: Wed, 2 Sep 2020 14:49:52 -0600 Subject: [PATCH] Add sqlalchemy to the Workflow service. Show a basic query. --- environment.yml | 1 + services/workflow/src/workflow/server.py | 152 +++++++++++++++++++++-- support/conda/pyramid_retry/meta.yaml | 43 +++++++ 3 files changed, 189 insertions(+), 7 deletions(-) create mode 100644 support/conda/pyramid_retry/meta.yaml diff --git a/environment.yml b/environment.yml index 0b38e3f5d..7db446c17 100644 --- a/environment.yml +++ b/environment.yml @@ -25,6 +25,7 @@ dependencies: - pyramid=1.10 - pyramid_beaker=0.8 - pyramid_debugtoolbar=4.5 + - pyramid_retry=2.1.1 - pyramid_tm=2.2.1 - pysftp=0.2.9 - pytest=5.4 diff --git a/services/workflow/src/workflow/server.py b/services/workflow/src/workflow/server.py index 160fc288f..6dec8ebea 100644 --- a/services/workflow/src/workflow/server.py +++ b/services/workflow/src/workflow/server.py @@ -1,11 +1,46 @@ +import sqlalchemy +import zope.sqlalchemy +from pycapo import CapoConfig from pyramid.view import view_config, view_defaults from pyramid.config import Configurator from pyramid_beaker import session_factory_from_settings from pyramid.renderers import JSONP +from sqlalchemy import create_engine +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker + + +# --------------------------------------------------------- +# +# M O D E L +# +# --------------------------------------------------------- + +Base = declarative_base() + + +class Workflow(Base): + """ + A Workflow is a suite of tasks defined by templates that must be executed together with some user-submitted files. + """ + __tablename__ = 'workflows' + id = sqlalchemy.Column('workflow_id', sqlalchemy.String, primary_key=True, autoincrement=True) + name = sqlalchemy.Column('name', sqlalchemy.String) + + def __json__(self, request): + return {'id': self.id, 'name': self.name} + WORKFLOWS = [{'id': 1, 'name': 'foo', 'files': {'file.txt': {'id': 1, 'name': 'file.txt', 'content': 'Hello, world!'}}}] +# --------------------------------------------------------- +# +# S E R V I C E S +# +# --------------------------------------------------------- + + def lookup_workflow(request): return WORKFLOWS[int(request.matchdict['id'])] @@ -21,48 +56,151 @@ class WorkflowService: @view_config(request_method='GET') def list_workflows(self): - return WORKFLOWS + """ + List the workflow requests that we know about + :return: + """ + return self.request.dbsession.query(Workflow).all() @view_config(request_method='POST') def create_workflow(self): + """ + Create a new workflow request + :return: + """ WORKFLOWS.append(self.request.json_body) return True @view_config(request_method='GET', route_name='workflow') def get_workflow(self): + """ + Look up a workflow request + :return: + """ return self.request.context @view_config(request_method='POST', route_name='submit_workflow') def submit_workflow(self): - # submit the workflow for processing + """ + Submit this workflow request for processing. + :return: + """ print(f"Submitting workflow {self.request.context['id']}") - @view_config(request_method='POST', route_name='workflow_files') + +@view_defaults(route_name='workflow_files', renderer='json') +class WorkflowFilesService: + """ + Services for the user-submitted files attached to workflows. + """ + def __init__(self, request): + self.request = request + + @view_config(request_method='POST') def add_file(self): - # add a file to this workflow request + """ + Add a file to this workflow request + """ print('Adding a file') - @view_config(request_method='GET', route_name='workflow_files') + @view_config(request_method='GET') def get_files(self): + """ + Get the files associated with this workflow request + :return: + """ return self.request.context['files'] @view_config(request_method='GET', route_name='workflow_file', accept='text/plain', renderer='string') def get_file_text(self): + """ + Get the text contents of this file + :return: + """ return self.request.context['content'] @view_config(request_method='GET', route_name='workflow_file', accept='application/json') def get_file_json(self): + """ + Get a JSON representation of this file + :return: + """ return self.request.context -def main(global_config, **settings): - # DB['SDM'] = scoped_session(sessionmaker(extension=ZopeTransactionExtension())) +# --------------------------------------------------------- +# +# F R A M E W O R K S E T U P +# +# --------------------------------------------------------- + +def get_engine(): + """ + Generate the SQL Alchemy engine for us, using Capo. + :return: + """ + capo = CapoConfig().settings('metadataDatabase') + url = capo.jdbcUrl.replace('jdbc:', '').replace('://', f'://{capo.jdbcUsername}:{capo.jdbcPassword}@') + return create_engine(url) + + +def get_session_factory(engine): + """ + Generate the SQL Alchemy session factory for us, using the supplied engine + :param engine: + :return: + """ + factory = sessionmaker() + factory.configure(bind=engine) + return factory + + +def get_tm_session(session_factory, transaction_manager): + """ + Enable Zope's transaction manager on our session + :param session_factory: + :param transaction_manager: + :return: + """ + dbsession = session_factory() + zope.sqlalchemy.register(dbsession, transaction_manager=transaction_manager) + return dbsession + + +# --------------------------------------------------------- +# +# M A I N E N T R Y P O I N T +# +# --------------------------------------------------------- + + + +def main(global_config, **settings): with Configurator(settings=settings) as config: session_factory = session_factory_from_settings(settings) config.set_session_factory(session_factory) config.add_renderer('jsonp', JSONP(param_name='callback')) + settings['tm.manager_hook'] = 'pyramid_tm.explicit_manager' + + # use pyramid_tm to hook the transaction lifecycle to the request + config.include('pyramid_tm') + + # use pyramid_retry to retry a request when transient exceptions occur + config.include('pyramid_retry') + + session_factory = get_session_factory(get_engine()) + config.registry['dbsession_factory'] = session_factory + + # make request.dbsession available for use in Pyramid + config.add_request_method( + # r.tm is the transaction manager used by pyramid_tm + lambda r: get_tm_session(session_factory, r.tm), + 'dbsession', + reify=True + ) + config.add_route('workflows', '/workflows') config.add_route('workflow', '/workflows/{id}', factory=lookup_workflow) config.add_route('submit_workflow', '/workflows/{id}/submit', factory=lookup_workflow) diff --git a/support/conda/pyramid_retry/meta.yaml b/support/conda/pyramid_retry/meta.yaml new file mode 100644 index 000000000..d8bb956fd --- /dev/null +++ b/support/conda/pyramid_retry/meta.yaml @@ -0,0 +1,43 @@ +{% set name = "pyramid_retry" %} +{% set version = "2.1.1" %} + +package: + name: "{{ name|lower }}" + version: "{{ version }}" + +source: + url: "https://pypi.io/packages/source/{{ name[0] }}/{{ name }}/{{ name }}-{{ version }}.tar.gz" + sha256: baa8276ae68babad09e5f2f94efc4f7421f3b8fb526151df522052f8cd3ec0c9 + +build: + noarch: python + number: 0 + script: "{{ PYTHON }} -m pip install . -vv" + +requirements: + host: + - pip + - pyramid >=1.9 + - python + - zope.interface + run: + - pyramid >=1.9 + - python + - zope.interface + +test: + imports: + - pyramid_retry + +about: + home: "https://github.com/Pylons/pyramid_retry" + license: MIT + license_family: MIT + license_file: + summary: "An execution policy for Pyramid that supports retrying requests after certain failure exceptions." + doc_url: + dev_url: + +extra: + recipe-maintainers: + - your-github-id-here -- GitLab