Skip to content
Snippets Groups Projects
Commit a654a7ec authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Add loggers to various services

parent 261c8a61
No related branches found
No related tags found
1 merge request!120Add logging
Pipeline #926 failed
import logging
import zope.sqlalchemy
from pyramid.config import Configurator
from pyramid.renderers import JSONP
......@@ -9,6 +11,9 @@ from workspaces.system.schema import get_engine, get_session_factory
from workspaces.workflow.services.workflow_info import WorkflowInfo
from workspaces.workflow.services.workflow_service import WorkflowService
logger = logging.getLogger(__name__)
"""
To consider:
☐ Do we want to use Marshmallow to validate inputs/generate API documentation?
......@@ -128,8 +133,10 @@ class WorkflowFilesRestService:
Audience: front-end and CLI
"""
print(
f"Adding file {self.request.matchdict['filename']} to {self.request.context}"
logger.info(
"Adding file %s to %s",
self.request.matchdict["filename"],
self.request.context,
)
file = self.request.info.save_file(
request=self.request.context,
......
import logging
import threading
from typing import List
......@@ -20,13 +21,17 @@ from workspaces.workflow.enum import WorkflowEventType
from workspaces.workflow.schema import WorkflowEvent
from workspaces.workflow.services.interfaces import WorkflowServiceIF
logger = logging.getLogger(__name__)
class CapabilityService(CapabilityServiceIF):
"""
The capability service: clients access this to request capability runs
"""
def __init__(self, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF):
def __init__(
self, capability_info: CapabilityInfoIF, workflow_service: WorkflowServiceIF
):
self.execution_manager = ExecutionManager(capability_info, workflow_service)
self.capability_info = capability_info
# self.workflow_service = workflow_service
......@@ -54,7 +59,10 @@ class CapabilityService(CapabilityServiceIF):
:param products: Products the request will create upon success
:return: Capability request
"""
return self.capability_info.create_capability_request(capability_name, parameters, products)
logger.info("in create_request")
return self.capability_info.create_capability_request(
capability_name, parameters, products
)
def run_capability(self, request: CapabilityRequest) -> CapabilityExecutionIF:
"""
......@@ -87,7 +95,9 @@ class CapabilityService(CapabilityServiceIF):
event.workflow_request_id
)
return CapabilityEvent(event_type, execution.capability_request.id, execution.id)
return CapabilityEvent(
event_type, execution.capability_request.id, execution.id
)
def process_workflow_event(self, event: WorkflowEvent):
"""
......
import json
import logging
from pathlib import Path
from typing import Dict, List
......@@ -15,6 +16,9 @@ from workspaces.workflow.schema_interfaces import (
WorkflowRequestIF,
)
logger = logging.getLogger(__name__)
Base = declarative_base()
......@@ -139,14 +143,13 @@ class WorkflowTemplate(Base):
:param argument: argument(s) needed for template render
:return: an AbstractFile rendered from mustache template
"""
# TODO: if template has more than one tag, figure out how pass argument parameter
# in as JSON and replace 'data' section accordingly
args = {
"template": self.content.decode("ascii"),
"data": {"arguments": "".join(map(str, argument))},
}
contents = chevron.render(**args)
contents = chevron.render(self.content.decode("ascii"), argument)
logger.info(
"rendering %s with argument %s to %s",
self.content.decode("ascii"),
argument,
contents,
)
return AbstractFile(self.filename, contents.encode("ascii"))
@property
......
import json
import logging
import os
import stat
import subprocess
......@@ -25,6 +27,8 @@ import logging
logger = logging.getLogger(__name__)
logger = logging.getLogger(__name__)
class WorkflowServiceRESTClient(WorkflowServiceIF):
def __init__(self):
......@@ -111,9 +115,12 @@ class WorkflowService(WorkflowServiceIF):
:return:
"""
# 1. look up workflow, returns workflow
logger.info("Looking up workflow %s", request.workflow_name)
definition = self.info.lookup_workflow_definition(request.workflow_name)
# 2. render templates to files, returns list of rendered files
logger.info("Rendering files")
logger.info("Type of request.argument is %s", type(request.argument))
templated_files = definition.render_templates(request.argument)
for file in templated_files:
self.info.save_file(
......@@ -123,7 +130,7 @@ class WorkflowService(WorkflowServiceIF):
# 3. Combine the
workflow_files = templated_files + request.files
# 4. prepare files for condor execution
# 3. prepare files for condor execution
if not request.results_dir:
# Create temp results directory if the request doesn't already have one (from a previous execution attempt)
request.results_dir = str(
......@@ -131,10 +138,10 @@ class WorkflowService(WorkflowServiceIF):
)
self.info.save_request(request)
# 5. execute condor and retrieve log file
# 4. execute condor and retrieve log file
log_file = self._execute_prepared(Path(request.results_dir))
# 6. start reading the log file (background it)
# 5. start reading the log file (background it)
subprocess.run(
["wf_monitor", log_file, str(request.workflow_request_id)],
cwd=request.results_dir,
......@@ -155,13 +162,17 @@ class WorkflowService(WorkflowServiceIF):
if not os.path.isdir(str(Path.home()) + "/workspaces_tmp"):
os.mkdir(str(Path.home()) + "/workspaces_tmp")
temp_folder = Path(mkdtemp(dir=str(Path.home()) + "/workspaces_tmp"))
logger.info("Settled on temp folder %s", temp_folder)
# 2. spool each of the temp files to it
for file in files:
(temp_folder / file.filename).write_bytes(file.content)
path = temp_folder / file.filename
logger.info("Writing %s", path)
path.write_bytes(file.content)
# 3. make any scripts in there executable
for file in temp_folder.glob("*.sh"):
logger.info("Making %s executable", file)
file.chmod(file.stat().st_mode | stat.S_IEXEC)
# finished, return folder
......@@ -175,20 +186,20 @@ class WorkflowService(WorkflowServiceIF):
:param folder: the path to the folder to execute
:return: the path to the log file
"""
print(f"executing on folder {folder}")
logger.info("executing on folder %s", folder)
# some file in here should end in .dag; that file is our dagman input
# TODO: not currently using DAG files for vulture
# dagman = list(folder.glob("*.dag"))[0]
# print(f"dagman file {dagman} exists.")
# logger.info("dagman file %s exists.", dagman)
# need .condor file for vulture submission
condor = list(folder.glob("*.condor"))[0]
print(f"condor file {condor} exists.")
logger.info("condor file %s exists.", condor)
# ensure the log file exists
logfile = folder / "condor.log"
print(f"log file {logfile} exists.")
logger.info("log file %s exists.", logfile)
logfile.touch()
# submit
......@@ -196,7 +207,7 @@ class WorkflowService(WorkflowServiceIF):
# subprocess.run(["condor_submit_dag", str(dagman)], cwd=str(folder.absolute()))
# vulture is a workaround for testing locally without submitting to condor
print("submitting to vulture...")
logger.info("submitting to vulture...")
subprocess.run(
["vulture", "job", "execute", str(condor)], cwd=str(folder.absolute())
)
......@@ -219,7 +230,9 @@ class WorkflowService(WorkflowServiceIF):
else:
status = WorkflowRequestState.Running.name
print(
f"Updating state on workflow request {request.workflow_request_id} to {status}..."
logger.info(
"Updating state on workflow request %s to %s...",
request.workflow_request_id,
status,
)
request.update_status(status)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment