diff --git a/services/workflow/setup.py b/services/workflow/setup.py index 1d6eedbc60e703a0dd80d045c1937bf0a7c96e25..35e161cca2b241e72b0550cf09245d10a4c09b96 100644 --- a/services/workflow/setup.py +++ b/services/workflow/setup.py @@ -46,6 +46,7 @@ requires = [ 'schema', 'sqlalchemy', 'waitress', + 'workspaces', 'zope.sqlalchemy' ] diff --git a/services/workflow/src/workflow/server.py b/services/workflow/src/workflow/server.py index 8ffb9ff5de2e0143334ff241b4d1b55bec6e1e62..72c1ea70c4c6fba3190960e3aece1f49a78e094d 100644 --- a/services/workflow/src/workflow/server.py +++ b/services/workflow/src/workflow/server.py @@ -1,13 +1,3 @@ -import inspect -import stat -import subprocess -from abc import ABC, abstractmethod -from dataclasses import dataclass -from pathlib import Path -from tempfile import mkdtemp -from typing import List, Dict - -import sqlalchemy as sa import zope.sqlalchemy from pycapo import CapoConfig from pyramid.view import view_config, view_defaults @@ -15,8 +5,8 @@ from pyramid.config import Configurator from pyramid_beaker import session_factory_from_settings from pyramid.renderers import JSONP from sqlalchemy import create_engine -from sqlalchemy.ext.declarative import declarative_base -from sqlalchemy.orm import sessionmaker, relationship +from sqlalchemy.orm import sessionmaker +from workspaces.services import WorkflowInfo, WorkflowService """ Work done: @@ -39,216 +29,6 @@ To consider: """ -# --------------------------------------------------------- -# -# I N T E R F A C E S -# -# --------------------------------------------------------- - - -class WorkflowServiceIF(ABC): - """ - Executes workflows; should be a freestanding service. - """ - @abstractmethod - def execute(self, workflow_name: str, argument: Dict, files: List[Path]): - """ - Execute this workflow against these files. - - :param workflow_name: name of the workflow to run - :param argument: extra argument (a JSON object) - :param files: some extra files the workflow should consider - :return: a stream of events from this workflow - """ - raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') - - -class WorkflowInfoIF(ABC): - """ - Holds information about workflows. - """ - @abstractmethod - def lookup_workflow_definition(self, name: str) -> 'Workflow': - """ - Look up the workflow with this name. - - :param name: Workflow name - :return: Workflow instance - """ - raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') - - -class AbstractFile: - def __init__(self, filename, content): - self.filename, self.content = filename, content - - def write_to(self, directory: Path): - raise NotImplementedError('Actually save the file into the directory passed') - - -# --------------------------------------------------------- -# -# M O D E L -# -# --------------------------------------------------------- - - -Base = declarative_base() - - -class Workflow(Base): - """ - A Workflow is a suite of tasks defined by templates that must be - executed together with some user-submitted files. - """ - __tablename__ = 'workflows' - workflow_name = sa.Column('workflow_name', sa.String, primary_key=True) - templates = relationship('WorkflowTemplate', backref='workflow') - requests = relationship('WorkflowRequest', backref='workflow') - - def __json__(self, request): - return {'workflow_name': self.workflow_name} - - def render_templates(self, argument: Dict, files: List[Path]) -> List[AbstractFile]: - """ - Render the templates associated with this workflow - :param argument: the workflow argument JSON - :param files: the files to be processed - :return: a list of rendered templates - """ - raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') - - -class WorkflowTemplate(Base): - """ - Workflow Templates are Mustache-formatted files associated with a given Workflow - """ - __tablename__ = 'workflow_templates' - filename = sa.Column('filename', sa.String, primary_key=True) - content = sa.Column('content', sa.String, nullable=False) - workflow_name = sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name'), primary_key=True) - - def render(self, argument: Dict) -> AbstractFile: - raise NotImplementedError('Add mustache to render this template') - - -class WorkflowRequest(Base): - """ - Workflow Requests invoke a particular workflow with a particular argument and some user-supplied files. - """ - __tablename__ = 'workflow_requests' - workflow_request_id = sa.Column('workflow_request_id', sa.Integer, primary_key=True, autoincrement=True) - workflow_name = sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name')) - argument = sa.Column('argument', sa.JSON) - files = relationship('WorkflowRequestFile', backref='request') - - -class WorkflowRequestFile(Base): - __tablename__ = 'workflow_request_files' - workflow_request_id = sa.Column('workflow_request_id', sa.Integer, sa.ForeignKey('workflow_requests.workflow_request_id'), - primary_key=True) - filename = sa.Column('filename', sa.String, primary_key=True) - content = sa.Column('content', sa.String, nullable=False) - - @property - def file(self) -> AbstractFile: - return AbstractFile(self.filename, self.content) - - -# --------------------------------------------------------- -# -# I N N E R S E R V I C E L A Y E R -# -# --------------------------------------------------------- - - -class WorkflowService(WorkflowServiceIF): - def execute(self, workflow_name: str, argument: Dict, files: List[Path]): - """ - Here's the implementation from wksp0 that does this: - - # 1. look up the workflow info for this workflow name - info = self.db.lookup_workflow_definition(workflow_name) - - # 2. render the templates to files - contents = info.render_templates(argument, files) - - # 3. serialize the templated files - temp_folder = self._prepare_files_for_condor(contents) - - # 4. execute condor and get the log file - log_file = self._execute_prepared(temp_folder) - - # 5. start reading the logs - return HTCondorWorkflowEventStream(log_file) - - Of these, step 5 is going to have to be the most different, because - that's launching the new CLI wf_monitor program that Nathan wrote. - The other steps are more similar than different, though figuring out - where to put the temp files will be interesting. - """ - raise NotImplementedError - - @staticmethod - def _prepare_files_for_condor(files: Dict[str, str]) -> Path: - """ - Place the files for Condor into a new temp directory and returns the directory. - - :param files: a dictionary of filename -> content - :return: a Path - """ - # 1. create a temporary directory - temp_folder = Path(mkdtemp(dir=str(Path.home() / "tmp"))) - - # 2. spool each of the temp files to it - for name, content in files.items(): - (temp_folder / name).write_text(content) - - # 3. make any scripts in there executable - for file in temp_folder.glob('*.sh'): - file.chmod(file.stat().st_mode | stat.S_IEXEC) - - # finished, return folder - return temp_folder - - @staticmethod - def _execute_prepared(folder: Path) -> Path: - """ - Execute HTCondor using the named folder as the source of the files. - - :param folder: the path to the folder to execute - :return: the path to the log file - """ - print(f'executing on folder {folder}') - - # some file in here should end in .dag; that file is our dagman input - dagman = list(folder.glob('*.dag'))[0] - - # ensure the log file exists - logfile = folder / 'condor.log' - logfile.touch() - - # submit - subprocess.run(['condor_submit_dag', str(dagman)], cwd=str(folder.absolute())) - - # return the logfile - return logfile - - -class WorkflowInfo(WorkflowInfoIF): - """ - Implements WorkflowInfo, backed by a relational database, using SQL Alchemy. - """ - - def __init__(self, session): - self.session = session - - def lookup_workflow_definition(self, name: str) -> Workflow: - return self.session.query(Workflow).get(name) - - def all_workflows(self) -> List[Workflow]: - return self.session.query(Workflow).all() - # --------------------------------------------------------- # @@ -258,12 +38,10 @@ class WorkflowInfo(WorkflowInfoIF): def lookup_workflow(request): - # this should change to use SQL Alchemy - return request.info.lookup_workflow_definition(request.matchdict['id']) + return request.info.lookup_workflow_definition(request.matchdict['name']) def lookup_file(request): - # this should change to use SQL Alchemy return lookup_workflow(request)['files'][request.matchdict['filename']] @@ -318,7 +96,7 @@ class WorkflowRestService: Audience: front-end and CLI :return: """ - print(f"Submitting workflow {self.request.context['id']}") + print(f"Submitting workflow {self.request.context.workflow_name}") @view_defaults(route_name='workflow_files', renderer='json') @@ -437,17 +215,17 @@ def main(global_config, **settings): config.add_request_method( # r.tm is the transaction manager used by pyramid_tm lambda request: WorkflowInfo(get_tm_session(session_factory, request.tm)), - 'workflow_info', + 'info', reify=True ) # make workflow_service available for use in Pyramid - config.add_request_method(lambda r: WorkflowService(r.workflow_info), 'workflow_service', reify=True) + config.add_request_method(lambda r: WorkflowService(r.info), 'workflows', reify=True) config.add_route('workflows', '/workflows') - config.add_route('workflow', '/workflows/{id}', factory=lookup_workflow) - config.add_route('submit_workflow', '/workflows/{id}/submit', factory=lookup_workflow) - config.add_route('workflow_files', '/workflows/{id}/files', factory=lookup_workflow) - config.add_route('workflow_file', '/workflows/{id}/files/{filename}', factory=lookup_file) + config.add_route('workflow', '/workflows/{name}', factory=lookup_workflow) + config.add_route('submit_workflow', '/workflows/{name}/submit', factory=lookup_workflow) + config.add_route('workflow_files', '/workflows/{name}/files', factory=lookup_workflow) + config.add_route('workflow_file', '/workflows/{name}/files/{filename}', factory=lookup_file) config.include('pyramid_beaker') config.scan('.') diff --git a/shared/workspaces/README.md b/shared/workspaces/README.md new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/shared/workspaces/setup.py b/shared/workspaces/setup.py new file mode 100644 index 0000000000000000000000000000000000000000..533470dd88fd7de67f7b1bf6810170cc11b0848c --- /dev/null +++ b/shared/workspaces/setup.py @@ -0,0 +1,39 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +from pathlib import Path +from setuptools import setup + +VERSION = open('src/workspaces/_version.py').readlines()[-1].split()[-1].strip("\"'") +README = Path('README.md').read_text() + +requires = [ + 'pycapo', + 'schema', + 'sqlalchemy', +] +tests_require = [ + 'pytest>=5.4,<6.0' +] + +setup( + name=Path().absolute().name, + version=VERSION, + description='Workspaces support library', + long_description=README, + author='NRAO SSA Team', + author_email='dms-ssa@nrao.edu', + url='TBD', + license="GPL", + install_requires=requires, + tests_require=tests_require, + keywords=[], + packages=['workspaces'], + package_dir={'':'src'}, + classifiers=[ + 'Programming Language :: Python :: 3.8' + ], + entry_points={ + 'console_scripts': [''] + }, +) diff --git a/shared/workspaces/src/workspaces/__init__.py b/shared/workspaces/src/workspaces/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/shared/workspaces/src/workspaces/_version.py b/shared/workspaces/src/workspaces/_version.py new file mode 100644 index 0000000000000000000000000000000000000000..f27d146a3f39885ce269bacf9ab4510254147c8d --- /dev/null +++ b/shared/workspaces/src/workspaces/_version.py @@ -0,0 +1,2 @@ +""" Version information for this package, don't put anything else here. """ +___version___ = '4.0.0a1.dev1' diff --git a/shared/workspaces/src/workspaces/interfaces.py b/shared/workspaces/src/workspaces/interfaces.py new file mode 100644 index 0000000000000000000000000000000000000000..4108d8717d6d9b96ae6c0a495ba513197c23da1e --- /dev/null +++ b/shared/workspaces/src/workspaces/interfaces.py @@ -0,0 +1,40 @@ +""" +Interfaces used by the Workspace system. +""" + +import inspect +from abc import ABC, abstractmethod +from pathlib import Path +from typing import Dict, List + + +class WorkflowServiceIF(ABC): + """ + Executes workflows; should be a freestanding service. + """ + @abstractmethod + def execute(self, workflow_name: str, argument: Dict, files: List[Path]): + """ + Execute this workflow against these files. + + :param workflow_name: name of the workflow to run + :param argument: extra argument (a JSON object) + :param files: some extra files the workflow should consider + :return: a stream of events from this workflow + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +class WorkflowInfoIF(ABC): + """ + Holds information about workflows. + """ + @abstractmethod + def lookup_workflow_definition(self, name: str) -> 'Workflow': + """ + Look up the workflow with this name. + + :param name: Workflow name + :return: Workflow instance + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') diff --git a/shared/workspaces/src/workspaces/schema.py b/shared/workspaces/src/workspaces/schema.py new file mode 100644 index 0000000000000000000000000000000000000000..41e2f8199f2af22ea30cfe104fbf5833dc3e0843 --- /dev/null +++ b/shared/workspaces/src/workspaces/schema.py @@ -0,0 +1,99 @@ +""" +Model classes used by the Workspaces system. +""" + +import inspect +from pathlib import Path +from typing import Dict, List + +import sqlalchemy as sa +from sqlalchemy.orm import relationship +from sqlalchemy.ext.declarative import declarative_base + + +class AbstractFile: + """ + Abstract file is exactly that, an abstract concept of what a file is, to be + returned from various non-filesystem places. + """ + def __init__(self, filename, content): + self.filename, self.content = filename, content + + def write_to(self, directory: Path): + raise NotImplementedError('Actually save the file into the directory passed') + + def __json__(self, request): + return {'filename': self.filename, 'content': self.content} + + +Base = declarative_base() + + +class Workflow(Base): + """ + A Workflow is a suite of tasks defined by templates that must be + executed together with some user-submitted files. + """ + __tablename__ = 'workflows' + workflow_name = sa.Column('workflow_name', sa.String, primary_key=True) + templates = relationship('WorkflowTemplate', backref='workflow') + requests = relationship('WorkflowRequest', backref='workflow') + + def __json__(self, request): + return {'workflow_name': self.workflow_name, 'templates': self.templates} + + def render_templates(self, argument: Dict, files: List[Path]) -> List[AbstractFile]: + """ + Render the templates associated with this workflow + :param argument: the workflow argument JSON + :param files: the files to be processed + :return: a list of rendered templates + """ + raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') + + +class WorkflowTemplate(Base): + """ + Workflow Templates are Mustache-formatted files associated with a given Workflow + """ + __tablename__ = 'workflow_templates' + filename = sa.Column('filename', sa.String, primary_key=True) + content = sa.Column('content', sa.String, nullable=False) + workflow_name = sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name'), primary_key=True) + + def render(self, argument: Dict) -> AbstractFile: + raise NotImplementedError('Add mustache to render this template') + + @property + def template_file(self) -> AbstractFile: + return AbstractFile(self.filename, self.content) + + def __json__(self, request): + # Defer to the actual template file's contents for JSON conversion + return self.template_file.__json__(request) + + +class WorkflowRequest(Base): + """ + Workflow Requests invoke a particular workflow with a particular argument and some user-supplied files. + """ + __tablename__ = 'workflow_requests' + workflow_request_id = sa.Column('workflow_request_id', sa.Integer, primary_key=True, autoincrement=True) + workflow_name = sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name')) + argument = sa.Column('argument', sa.JSON) + files = relationship('WorkflowRequestFile', backref='request') + + +class WorkflowRequestFile(Base): + """ + A Workflow Request File is a file supplied by the user and attached to the request they have submitted. + """ + __tablename__ = 'workflow_request_files' + workflow_request_id = sa.Column('workflow_request_id', sa.Integer, sa.ForeignKey('workflow_requests.workflow_request_id'), + primary_key=True) + filename = sa.Column('filename', sa.String, primary_key=True) + content = sa.Column('content', sa.String, nullable=False) + + @property + def file(self) -> AbstractFile: + return AbstractFile(self.filename, self.content) diff --git a/shared/workspaces/src/workspaces/services.py b/shared/workspaces/src/workspaces/services.py new file mode 100644 index 0000000000000000000000000000000000000000..3f6d37f31567400776499c7141eef44970a4523b --- /dev/null +++ b/shared/workspaces/src/workspaces/services.py @@ -0,0 +1,97 @@ +""" +Services defined by the Workspaces system, to be used by our APIs and client programs. +""" +import stat +import subprocess +from tempfile import mkdtemp + +from .interfaces import * +from .schema import * + + +class WorkflowService(WorkflowServiceIF): + def execute(self, workflow_name: str, argument: Dict, files: List[Path]): + """ + Here's the implementation from wksp0 that does this: + + # 1. look up the workflow info for this workflow name + info = self.db.lookup_workflow_definition(workflow_name) + + # 2. render the templates to files + contents = info.render_templates(argument, files) + + # 3. serialize the templated files + temp_folder = self._prepare_files_for_condor(contents) + + # 4. execute condor and get the log file + log_file = self._execute_prepared(temp_folder) + + # 5. start reading the logs + return HTCondorWorkflowEventStream(log_file) + + Of these, step 5 is going to have to be the most different, because + that's launching the new CLI wf_monitor program that Nathan wrote. + The other steps are more similar than different, though figuring out + where to put the temp files will be interesting. + """ + raise NotImplementedError + + @staticmethod + def _prepare_files_for_condor(files: Dict[str, str]) -> Path: + """ + Place the files for Condor into a new temp directory and returns the directory. + + :param files: a dictionary of filename -> content + :return: a Path + """ + # 1. create a temporary directory + temp_folder = Path(mkdtemp(dir=str(Path.home() / "tmp"))) + + # 2. spool each of the temp files to it + for name, content in files.items(): + (temp_folder / name).write_text(content) + + # 3. make any scripts in there executable + for file in temp_folder.glob('*.sh'): + file.chmod(file.stat().st_mode | stat.S_IEXEC) + + # finished, return folder + return temp_folder + + @staticmethod + def _execute_prepared(folder: Path) -> Path: + """ + Execute HTCondor using the named folder as the source of the files. + + :param folder: the path to the folder to execute + :return: the path to the log file + """ + print(f'executing on folder {folder}') + + # some file in here should end in .dag; that file is our dagman input + dagman = list(folder.glob('*.dag'))[0] + + # ensure the log file exists + logfile = folder / 'condor.log' + logfile.touch() + + # submit + subprocess.run(['condor_submit_dag', str(dagman)], cwd=str(folder.absolute())) + + # return the logfile + return logfile + + +class WorkflowInfo(WorkflowInfoIF): + """ + Implements WorkflowInfo, backed by a relational database, using SQL Alchemy. + """ + + def __init__(self, session): + self.session = session + + def lookup_workflow_definition(self, name: str) -> Workflow: + return self.session.query(Workflow).get(name) + + def all_workflows(self) -> List[Workflow]: + return self.session.query(Workflow).all()