From 83166de22780eef438915d398f9c21e7a4d3822d Mon Sep 17 00:00:00 2001 From: nhertz <nhertz@nrao.edu> Date: Tue, 13 Oct 2020 14:47:54 -0600 Subject: [PATCH] Vulture now creates a HTCondor-esque log file as it executes; logs are readable by wf_monitor --- apps/cli/executables/vulture/logs/test.log | 8 +++ .../vulture/src/vulture/condorlite.py | 72 ++++++++++++++++++- 2 files changed, 77 insertions(+), 3 deletions(-) create mode 100644 apps/cli/executables/vulture/logs/test.log diff --git a/apps/cli/executables/vulture/logs/test.log b/apps/cli/executables/vulture/logs/test.log new file mode 100644 index 000000000..bb39fb6c5 --- /dev/null +++ b/apps/cli/executables/vulture/logs/test.log @@ -0,0 +1,8 @@ +000 (4242.000.000) 10/13 14:45:09 Job submitted from host: <0.0.0.0:7777> + Job node: null +... +001 (4242.000.000) 10/13 14:45:09 Job executing on host: <0.0.0.0:7777> +... +005 (4242.000.000) 10/13 14:45:09 Job terminated. + Normal termination (return value 0) +... diff --git a/apps/cli/executables/vulture/src/vulture/condorlite.py b/apps/cli/executables/vulture/src/vulture/condorlite.py index 4491aee36..984e23b31 100644 --- a/apps/cli/executables/vulture/src/vulture/condorlite.py +++ b/apps/cli/executables/vulture/src/vulture/condorlite.py @@ -1,10 +1,66 @@ -# TODO: Event types that need to be supported: submitted, executing, terminated -# TODO: Log to file +import logging +import pendulum import subprocess from pathlib import Path from typing import NamedTuple, List, Dict from collections import namedtuple +from workspaces.schema import WorkflowEventType + + +class MockLogger: + """ + Class representing a logger for Vulture that will simulate HTCondor logs + """ + + def __init__(self): + log_dir = "logs" + log_filename = f"vulture_{pendulum.now().to_iso8601_string()}.log" + self.logger = logging.getLogger("vulture") + self.logger.setLevel(logging.INFO) + self.logger.addHandler(logging.FileHandler(f"{log_dir}/{log_filename}", mode="w+")) + + def log_event(self, event_type: WorkflowEventType, event_body: str = None): + """ + Log a mock HTCondor event to the log file + :param event_type: Type of the event + :param event_body: Body of the event log (can be empty) + :return: + """ + header = self.create_header(event_type) + # Indent each line of the body + if event_body: + event_body_list = event_body.split("\n") + if len(event_body_list) > 1: + event_body = "\t".join(event_body_list) + else: + event_body = "\t" + event_body + footer = "..." + self.logger.info(msg="\n".join([header, event_body, footer])) if event_body else self.logger.info("\n".join([header, footer])) + + @staticmethod + def create_header(event_type: WorkflowEventType) -> str: + """ + Create mock HTCondor event log header + :param event_type: Type of event being logged + :return: The header as a string + EXAMPLE: + 000 (3983.000.000) 08/26 11:06:06 Job submitted from host: <10.64.1.178:9618?addrs=10.64.1.178-9618&noUDP&sock=4050180_7e37_3> + """ + mock_host = "<0.0.0.0:7777>" + event_descs = { + WorkflowEventType.SUBMITTED: f"Job submitted from host: {mock_host}", + WorkflowEventType.EXECUTING: f"Job executing on host: {mock_host}", + WorkflowEventType.TERMINATED: "Job terminated.", + WorkflowEventType.OTHER: "Unknown description.", + } + event_num = f"{event_type.value:03d}" + # TODO: Add calculable job number + job_num = f"({4242:04d}.000.000)" + timestamp = f"{pendulum.now().format('MM/DD HH:mm:ss')}" + event_desc = f"{event_descs[event_type]}" + return " ".join([event_num, job_num, timestamp, event_desc]) + class Job: """ @@ -12,7 +68,9 @@ class Job: """ def __init__(self, file: Path): + self.logger = MockLogger() self.fields, self.command = self.parse(file) + self.logger.log_event(WorkflowEventType.SUBMITTED, f"Job node: {self.fields['executable']}") def parse(self, file: Path) -> NamedTuple: """ @@ -46,7 +104,15 @@ class Job: return field_dict def execute(self): - subprocess.run([self.fields['executable'], self.fields['arguments']]) + result_strings = { + 0: "Normal termination (return value 0)", + -1: "Error in execution (return value -1)", + } + self.logger.log_event(WorkflowEventType.EXECUTING) + retval = subprocess.call([self.fields["executable"], self.fields["arguments"]]) + exec_results = f"{result_strings[retval]}" + + self.logger.log_event(WorkflowEventType.TERMINATED, exec_results) class Dag: -- GitLab