diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index 6898b1a4133bfa1437bd3b526c3aaf7e185c994b..9a28fd00be93091549fdbd734d147fd2fe8c1af8 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -1,3 +1,5 @@ +import logging + import zope.sqlalchemy from pyramid.config import Configurator from pyramid.renderers import JSONP @@ -9,6 +11,9 @@ from workspaces.system.schema import get_engine, get_session_factory from workspaces.workflow.services.workflow_info import WorkflowInfo from workspaces.workflow.services.workflow_service import WorkflowService +logger = logging.getLogger(__name__) + + """ To consider: ☠Do we want to use Marshmallow to validate inputs/generate API documentation? @@ -128,8 +133,10 @@ class WorkflowFilesRestService: Audience: front-end and CLI """ - print( - f"Adding file {self.request.matchdict['filename']} to {self.request.context}" + logger.info( + "Adding file %s to %s", + self.request.matchdict["filename"], + self.request.context, ) file = self.request.info.save_file( request=self.request.context, diff --git a/shared/workspaces/workspaces/capability/services/capability_service.py b/shared/workspaces/workspaces/capability/services/capability_service.py index 9519ee4c3d65663e1ec05e8aff671fb47fb8a1f5..dcdb681d744ed149637fb1fc690d986c6785d48b 100644 --- a/shared/workspaces/workspaces/capability/services/capability_service.py +++ b/shared/workspaces/workspaces/capability/services/capability_service.py @@ -1,3 +1,4 @@ +import logging import threading from typing import List @@ -20,13 +21,17 @@ from workspaces.workflow.enum import WorkflowEventType from workspaces.workflow.schema import WorkflowEvent from workspaces.workflow.services.interfaces import WorkflowServiceIF +logger = logging.getLogger(__name__) + class CapabilityService(CapabilityServiceIF): """ The capability service: clients access this to request capability runs """ - def __init__(self, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF): + def __init__( + self, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF + ): self.execution_manager = ExecutionManager(capability_info, workflow_service) self.capability_info = capability_info # self.workflow_service = workflow_service @@ -54,7 +59,10 @@ class CapabilityService(CapabilityServiceIF): :param products: Products the request will create upon success :return: Capability request """ - return self.capability_info.create_capability_request(capability_name, parameters, products) + logger.info("in create_request") + return self.capability_info.create_capability_request( + capability_name, parameters, products + ) def run_capability(self, request: CapabilityRequest) -> CapabilityExecutionIF: """ @@ -87,7 +95,9 @@ class CapabilityService(CapabilityServiceIF): event.workflow_request_id ) - return CapabilityEvent(event_type, execution.capability_request.id, execution.id) + return CapabilityEvent( + event_type, execution.capability_request.id, execution.id + ) def process_workflow_event(self, event: WorkflowEvent): """ diff --git a/shared/workspaces/workspaces/workflow/schema.py b/shared/workspaces/workspaces/workflow/schema.py index 1eeb5eb59f18c9f9edd26fbc765b13c5dfb78a36..8ab0a539b2eb11d7175666754e4860e786bf7bfb 100644 --- a/shared/workspaces/workspaces/workflow/schema.py +++ b/shared/workspaces/workspaces/workflow/schema.py @@ -1,4 +1,5 @@ import json +import logging from pathlib import Path from typing import Dict, List @@ -15,6 +16,9 @@ from workspaces.workflow.schema_interfaces import ( WorkflowRequestIF, ) + +logger = logging.getLogger(__name__) + Base = declarative_base() @@ -139,14 +143,13 @@ class WorkflowTemplate(Base): :param argument: argument(s) needed for template render :return: an AbstractFile rendered from mustache template """ - - # TODO: if template has more than one tag, figure out how pass argument parameter - # in as JSON and replace 'data' section accordingly - args = { - "template": self.content.decode("ascii"), - "data": {"arguments": "".join(map(str, argument))}, - } - contents = chevron.render(**args) + contents = chevron.render(self.content.decode("ascii"), argument) + logger.info( + "rendering %s with argument %s to %s", + self.content.decode("ascii"), + argument, + contents, + ) return AbstractFile(self.filename, contents.encode("ascii")) @property diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 2edc5c8c03202a4e29b56cfeda775bea7715adbe..ee4a1127d0fa7ed6b2116a138a2651ba55b9e707 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -1,3 +1,5 @@ +import json +import logging import os import stat import subprocess @@ -25,6 +27,8 @@ import logging logger = logging.getLogger(__name__) +logger = logging.getLogger(__name__) + class WorkflowServiceRESTClient(WorkflowServiceIF): def __init__(self): @@ -111,9 +115,12 @@ class WorkflowService(WorkflowServiceIF): :return: """ # 1. look up workflow, returns workflow + logger.info("Looking up workflow %s", request.workflow_name) definition = self.info.lookup_workflow_definition(request.workflow_name) # 2. render templates to files, returns list of rendered files + logger.info("Rendering files") + logger.info("Type of request.argument is %s", type(request.argument)) templated_files = definition.render_templates(request.argument) for file in templated_files: self.info.save_file( @@ -123,7 +130,7 @@ class WorkflowService(WorkflowServiceIF): # 3. Combine the workflow_files = templated_files + request.files - # 4. prepare files for condor execution + # 3. prepare files for condor execution if not request.results_dir: # Create temp results directory if the request doesn't already have one (from a previous execution attempt) request.results_dir = str( @@ -131,10 +138,10 @@ class WorkflowService(WorkflowServiceIF): ) self.info.save_request(request) - # 5. execute condor and retrieve log file + # 4. execute condor and retrieve log file log_file = self._execute_prepared(Path(request.results_dir)) - # 6. start reading the log file (background it) + # 5. start reading the log file (background it) subprocess.run( ["wf_monitor", log_file, str(request.workflow_request_id)], cwd=request.results_dir, @@ -155,13 +162,17 @@ class WorkflowService(WorkflowServiceIF): if not os.path.isdir(str(Path.home()) + "/workspaces_tmp"): os.mkdir(str(Path.home()) + "/workspaces_tmp") temp_folder = Path(mkdtemp(dir=str(Path.home()) + "/workspaces_tmp")) + logger.info("Settled on temp folder %s", temp_folder) # 2. spool each of the temp files to it for file in files: - (temp_folder / file.filename).write_bytes(file.content) + path = temp_folder / file.filename + logger.info("Writing %s", path) + path.write_bytes(file.content) # 3. make any scripts in there executable for file in temp_folder.glob("*.sh"): + logger.info("Making %s executable", file) file.chmod(file.stat().st_mode | stat.S_IEXEC) # finished, return folder @@ -175,20 +186,20 @@ class WorkflowService(WorkflowServiceIF): :param folder: the path to the folder to execute :return: the path to the log file """ - print(f"executing on folder {folder}") + logger.info("executing on folder %s", folder) # some file in here should end in .dag; that file is our dagman input # TODO: not currently using DAG files for vulture # dagman = list(folder.glob("*.dag"))[0] - # print(f"dagman file {dagman} exists.") + # logger.info("dagman file %s exists.", dagman) # need .condor file for vulture submission condor = list(folder.glob("*.condor"))[0] - print(f"condor file {condor} exists.") + logger.info("condor file %s exists.", condor) # ensure the log file exists logfile = folder / "condor.log" - print(f"log file {logfile} exists.") + logger.info("log file %s exists.", logfile) logfile.touch() # submit @@ -196,7 +207,7 @@ class WorkflowService(WorkflowServiceIF): # subprocess.run(["condor_submit_dag", str(dagman)], cwd=str(folder.absolute())) # vulture is a workaround for testing locally without submitting to condor - print("submitting to vulture...") + logger.info("submitting to vulture...") subprocess.run( ["vulture", "job", "execute", str(condor)], cwd=str(folder.absolute()) ) @@ -219,7 +230,9 @@ class WorkflowService(WorkflowServiceIF): else: status = WorkflowRequestState.Running.name - print( - f"Updating state on workflow request {request.workflow_request_id} to {status}..." + logger.info( + "Updating state on workflow request %s to %s...", + request.workflow_request_id, + status, ) request.update_status(status)