Skip to content
Snippets Groups Projects
Commit 22205216 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Add the sample implementations from the wksp0 project

parent fd49f5d0
No related branches found
No related tags found
No related merge requests found
import inspect
import stat
import subprocess
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from tempfile import mkdtemp
from typing import List, Dict
import sqlalchemy as sa
......@@ -43,7 +46,7 @@ To consider:
# ---------------------------------------------------------
class WorkflowService(ABC):
class WorkflowServiceIF(ABC):
"""
Executes workflows; should be a freestanding service.
"""
......@@ -60,7 +63,7 @@ class WorkflowService(ABC):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class WorkflowInfo(ABC):
class WorkflowInfoIF(ABC):
"""
Holds information about workflows.
"""
......@@ -152,10 +155,87 @@ class WorkflowRequestFile(Base):
return AbstractFile(self.filename, self.content)
WORKFLOWS = [{'id': 1, 'name': 'foo', 'files': {'file.txt': {'id': 1, 'name': 'file.txt', 'content': 'Hello, world!'}}}]
# ---------------------------------------------------------
#
# I N N E R S E R V I C E L A Y E R
#
# ---------------------------------------------------------
class WorkflowService(WorkflowServiceIF):
def execute(self, workflow_name: str, argument: Dict, files: List[Path]):
"""
Here's the implementation from wksp0 that does this:
# 1. look up the workflow info for this workflow name
info = self.db.lookup_workflow_definition(workflow_name)
# 2. render the templates to files
contents = info.render_templates(argument, files)
# 3. serialize the templated files
temp_folder = self._prepare_files_for_condor(contents)
# 4. execute condor and get the log file
log_file = self._execute_prepared(temp_folder)
# 5. start reading the logs
return HTCondorWorkflowEventStream(log_file)
Of these, step 5 is going to have to be the most different, because
that's launching the new CLI wf_monitor program that Nathan wrote.
The other steps are more similar than different, though figuring out
where to put the temp files will be interesting.
"""
raise NotImplementedError
@staticmethod
def _prepare_files_for_condor(files: Dict[str, str]) -> Path:
"""
Place the files for Condor into a new temp directory and returns the directory.
:param files: a dictionary of filename -> content
:return: a Path
"""
# 1. create a temporary directory
temp_folder = Path(mkdtemp(dir=str(Path.home() / "tmp")))
# 2. spool each of the temp files to it
for name, content in files.items():
(temp_folder / name).write_text(content)
# 3. make any scripts in there executable
for file in temp_folder.glob('*.sh'):
file.chmod(file.stat().st_mode | stat.S_IEXEC)
# finished, return folder
return temp_folder
@staticmethod
def _execute_prepared(folder: Path) -> Path:
"""
Execute HTCondor using the named folder as the source of the files.
:param folder: the path to the folder to execute
:return: the path to the log file
"""
print(f'executing on folder {folder}')
# some file in here should end in .dag; that file is our dagman input
dagman = list(folder.glob('*.dag'))[0]
# ensure the log file exists
logfile = folder / 'condor.log'
logfile.touch()
# submit
subprocess.run(['condor_submit_dag', str(dagman)], cwd=str(folder.absolute()))
# return the logfile
return logfile
class DBWorkflowInfo(WorkflowInfo):
class WorkflowInfo(WorkflowInfoIF):
"""
Implements WorkflowInfo, backed by a relational database, using SQL Alchemy.
"""
......@@ -188,7 +268,7 @@ def lookup_file(request):
@view_defaults(route_name='workflows', renderer='json')
class WorkflowService:
class WorkflowRestService:
"""
Top-level service for workflow requests.
......@@ -215,8 +295,10 @@ class WorkflowService:
Audience: front-end and CLI
:return:
"""
WORKFLOWS.append(self.request.json_body)
return True
# all we should have to do here is take the WorkflowRequest from the context and
# hand it to WorkflowInfo to save it, but we're still conflating
# workflows and workflow requests right now
raise NotImplementedError
@view_config(request_method='GET', route_name='workflow')
def get_workflow(self):
......@@ -240,7 +322,7 @@ class WorkflowService:
@view_defaults(route_name='workflow_files', renderer='json')
class WorkflowFilesService:
class WorkflowFilesRestService:
"""
Services for the user-submitted files attached to workflows.
"""
......@@ -351,13 +433,15 @@ def main(global_config, **settings):
session_factory = get_session_factory(get_engine())
config.registry['dbsession_factory'] = session_factory
# make request.dbsession available for use in Pyramid
# make workflow_info available for use in Pyramid
config.add_request_method(
# r.tm is the transaction manager used by pyramid_tm
lambda request: DBWorkflowInfo(get_tm_session(session_factory, request.tm)),
'info',
lambda request: WorkflowInfo(get_tm_session(session_factory, request.tm)),
'workflow_info',
reify=True
)
# make workflow_service available for use in Pyramid
config.add_request_method(lambda r: WorkflowService(r.workflow_info), 'workflow_service', reify=True)
config.add_route('workflows', '/workflows')
config.add_route('workflow', '/workflows/{id}', factory=lookup_workflow)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment