Skip to content
Snippets Groups Projects
Commit 83166de2 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Vulture now creates a HTCondor-esque log file as it executes; logs are

readable by wf_monitor
parent b6bb4dec
No related branches found
No related tags found
No related merge requests found
000 (4242.000.000) 10/13 14:45:09 Job submitted from host: <0.0.0.0:7777>
Job node: null
...
001 (4242.000.000) 10/13 14:45:09 Job executing on host: <0.0.0.0:7777>
...
005 (4242.000.000) 10/13 14:45:09 Job terminated.
Normal termination (return value 0)
...
# TODO: Event types that need to be supported: submitted, executing, terminated
# TODO: Log to file
import logging
import pendulum
import subprocess
from pathlib import Path
from typing import NamedTuple, List, Dict
from collections import namedtuple
from workspaces.schema import WorkflowEventType
class MockLogger:
"""
Class representing a logger for Vulture that will simulate HTCondor logs
"""
def __init__(self):
log_dir = "logs"
log_filename = f"vulture_{pendulum.now().to_iso8601_string()}.log"
self.logger = logging.getLogger("vulture")
self.logger.setLevel(logging.INFO)
self.logger.addHandler(logging.FileHandler(f"{log_dir}/{log_filename}", mode="w+"))
def log_event(self, event_type: WorkflowEventType, event_body: str = None):
"""
Log a mock HTCondor event to the log file
:param event_type: Type of the event
:param event_body: Body of the event log (can be empty)
:return:
"""
header = self.create_header(event_type)
# Indent each line of the body
if event_body:
event_body_list = event_body.split("\n")
if len(event_body_list) > 1:
event_body = "\t".join(event_body_list)
else:
event_body = "\t" + event_body
footer = "..."
self.logger.info(msg="\n".join([header, event_body, footer])) if event_body else self.logger.info("\n".join([header, footer]))
@staticmethod
def create_header(event_type: WorkflowEventType) -> str:
"""
Create mock HTCondor event log header
:param event_type: Type of event being logged
:return: The header as a string
EXAMPLE:
000 (3983.000.000) 08/26 11:06:06 Job submitted from host: <10.64.1.178:9618?addrs=10.64.1.178-9618&noUDP&sock=4050180_7e37_3>
"""
mock_host = "<0.0.0.0:7777>"
event_descs = {
WorkflowEventType.SUBMITTED: f"Job submitted from host: {mock_host}",
WorkflowEventType.EXECUTING: f"Job executing on host: {mock_host}",
WorkflowEventType.TERMINATED: "Job terminated.",
WorkflowEventType.OTHER: "Unknown description.",
}
event_num = f"{event_type.value:03d}"
# TODO: Add calculable job number
job_num = f"({4242:04d}.000.000)"
timestamp = f"{pendulum.now().format('MM/DD HH:mm:ss')}"
event_desc = f"{event_descs[event_type]}"
return " ".join([event_num, job_num, timestamp, event_desc])
class Job:
"""
......@@ -12,7 +68,9 @@ class Job:
"""
def __init__(self, file: Path):
self.logger = MockLogger()
self.fields, self.command = self.parse(file)
self.logger.log_event(WorkflowEventType.SUBMITTED, f"Job node: {self.fields['executable']}")
def parse(self, file: Path) -> NamedTuple:
"""
......@@ -46,7 +104,15 @@ class Job:
return field_dict
def execute(self):
subprocess.run([self.fields['executable'], self.fields['arguments']])
result_strings = {
0: "Normal termination (return value 0)",
-1: "Error in execution (return value -1)",
}
self.logger.log_event(WorkflowEventType.EXECUTING)
retval = subprocess.call([self.fields["executable"], self.fields["arguments"]])
exec_results = f"{result_strings[retval]}"
self.logger.log_event(WorkflowEventType.TERMINATED, exec_results)
class Dag:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment