From 222052161d21d32699f8b13b61bfad63155a4091 Mon Sep 17 00:00:00 2001 From: Daniel K Lyons <dlyons@nrao.edu> Date: Thu, 3 Sep 2020 11:56:38 -0600 Subject: [PATCH] Add the sample implementations from the wksp0 project --- services/workflow/src/workflow/server.py | 106 ++++++++++++++++++++--- 1 file changed, 95 insertions(+), 11 deletions(-) diff --git a/services/workflow/src/workflow/server.py b/services/workflow/src/workflow/server.py index f4c8c0147..8ffb9ff5d 100644 --- a/services/workflow/src/workflow/server.py +++ b/services/workflow/src/workflow/server.py @@ -1,7 +1,10 @@ import inspect +import stat +import subprocess from abc import ABC, abstractmethod from dataclasses import dataclass from pathlib import Path +from tempfile import mkdtemp from typing import List, Dict import sqlalchemy as sa @@ -43,7 +46,7 @@ To consider: # --------------------------------------------------------- -class WorkflowService(ABC): +class WorkflowServiceIF(ABC): """ Executes workflows; should be a freestanding service. """ @@ -60,7 +63,7 @@ class WorkflowService(ABC): raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}') -class WorkflowInfo(ABC): +class WorkflowInfoIF(ABC): """ Holds information about workflows. """ @@ -152,10 +155,87 @@ class WorkflowRequestFile(Base): return AbstractFile(self.filename, self.content) -WORKFLOWS = [{'id': 1, 'name': 'foo', 'files': {'file.txt': {'id': 1, 'name': 'file.txt', 'content': 'Hello, world!'}}}] +# --------------------------------------------------------- +# +# I N N E R S E R V I C E L A Y E R +# +# --------------------------------------------------------- + + +class WorkflowService(WorkflowServiceIF): + def execute(self, workflow_name: str, argument: Dict, files: List[Path]): + """ + Here's the implementation from wksp0 that does this: + + # 1. look up the workflow info for this workflow name + info = self.db.lookup_workflow_definition(workflow_name) + + # 2. render the templates to files + contents = info.render_templates(argument, files) + + # 3. serialize the templated files + temp_folder = self._prepare_files_for_condor(contents) + + # 4. execute condor and get the log file + log_file = self._execute_prepared(temp_folder) + + # 5. start reading the logs + return HTCondorWorkflowEventStream(log_file) + + Of these, step 5 is going to have to be the most different, because + that's launching the new CLI wf_monitor program that Nathan wrote. + The other steps are more similar than different, though figuring out + where to put the temp files will be interesting. + """ + raise NotImplementedError + + @staticmethod + def _prepare_files_for_condor(files: Dict[str, str]) -> Path: + """ + Place the files for Condor into a new temp directory and returns the directory. + + :param files: a dictionary of filename -> content + :return: a Path + """ + # 1. create a temporary directory + temp_folder = Path(mkdtemp(dir=str(Path.home() / "tmp"))) + + # 2. spool each of the temp files to it + for name, content in files.items(): + (temp_folder / name).write_text(content) + + # 3. make any scripts in there executable + for file in temp_folder.glob('*.sh'): + file.chmod(file.stat().st_mode | stat.S_IEXEC) + + # finished, return folder + return temp_folder + + @staticmethod + def _execute_prepared(folder: Path) -> Path: + """ + Execute HTCondor using the named folder as the source of the files. + + :param folder: the path to the folder to execute + :return: the path to the log file + """ + print(f'executing on folder {folder}') + + # some file in here should end in .dag; that file is our dagman input + dagman = list(folder.glob('*.dag'))[0] + + # ensure the log file exists + logfile = folder / 'condor.log' + logfile.touch() + + # submit + subprocess.run(['condor_submit_dag', str(dagman)], cwd=str(folder.absolute())) + + # return the logfile + return logfile -class DBWorkflowInfo(WorkflowInfo): +class WorkflowInfo(WorkflowInfoIF): """ Implements WorkflowInfo, backed by a relational database, using SQL Alchemy. """ @@ -188,7 +268,7 @@ def lookup_file(request): @view_defaults(route_name='workflows', renderer='json') -class WorkflowService: +class WorkflowRestService: """ Top-level service for workflow requests. @@ -215,8 +295,10 @@ class WorkflowService: Audience: front-end and CLI :return: """ - WORKFLOWS.append(self.request.json_body) - return True + # all we should have to do here is take the WorkflowRequest from the context and + # hand it to WorkflowInfo to save it, but we're still conflating + # workflows and workflow requests right now + raise NotImplementedError @view_config(request_method='GET', route_name='workflow') def get_workflow(self): @@ -240,7 +322,7 @@ class WorkflowService: @view_defaults(route_name='workflow_files', renderer='json') -class WorkflowFilesService: +class WorkflowFilesRestService: """ Services for the user-submitted files attached to workflows. """ @@ -351,13 +433,15 @@ def main(global_config, **settings): session_factory = get_session_factory(get_engine()) config.registry['dbsession_factory'] = session_factory - # make request.dbsession available for use in Pyramid + # make workflow_info available for use in Pyramid config.add_request_method( # r.tm is the transaction manager used by pyramid_tm - lambda request: DBWorkflowInfo(get_tm_session(session_factory, request.tm)), - 'info', + lambda request: WorkflowInfo(get_tm_session(session_factory, request.tm)), + 'workflow_info', reify=True ) + # make workflow_service available for use in Pyramid + config.add_request_method(lambda r: WorkflowService(r.workflow_info), 'workflow_service', reify=True) config.add_route('workflows', '/workflows') config.add_route('workflow', '/workflows/{id}', factory=lookup_workflow) -- GitLab