diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index 423d6bba9767ba1c566403075284742c8bbc4d5f..25bcb7e3e9e915c7caf726d95d70b781e64dd381 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -1,41 +1,139 @@ import os -import json +import re import time import signal +import tracemalloc +from typing import List from pathlib import Path +import pika -class WorkflowEvent: - def __init__(self, logfile_path: str): - self.log = self.read_htcondor_log(Path(logfile_path)) - self.job_metadata = self.parse_log() - self.job_json = {} - for job_metadatum in self.job_metadata: - self.job_json[job_metadatum['name']] = self.job_to_amqp_event(job_metadatum) +from .wf_event import WorkflowEvent, EventType - def read_htcondor_log(self, logfile_path: Path, timeout: int = 60): - start_time = time.localtime() - if os.path.exists(logfile_path): - with open(logfile_path, 'r') as f: - return f.read() - else: +WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' - def parse_log(self) -> dict: +def timeout_handler(signum: int, frame: tracemalloc.Frame): + """ + Signal handler function for handling a SIGALRM that arrives when WorkflowMonitor.read_log() + times out. + :param signum: Signal number (would be signal.SIGALRM in this case) + :param frame: Stack frame the signal was sent from + """ + msg = 'Timeout elapsed when attempting to read HTCondor log file. The job probably did ' \ + 'not complete.' + raise TimeoutError(msg) + +# Timeout handling solution found here: +# https://stackoverflow.com/questions/492519/timeout-on-a-function-call +signal.signal(signal.SIGALRM, timeout_handler) + + +def get_job_name(body: str) -> str: + """ + Parses a log entry body string for the name of the job currently executing + :param body: Body of the log entry + :return: Job name + """ + r_body_parse = r'[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)' + body_match = re.search(r_body_parse, body) + return body_match.group('jobname') + + +def fmt_timestamp(timestamp: str) -> str: + """ + Format timestamp string into ISO 8601 format + 2020-08-27T21:45:09+07:00 + :param timestamp: Unformatted timestamp string + :return: Formatted timestamp string + """ + r_nums = r'[0-9]{2}' + r_timestamp = rf'(?P<mon>{r_nums})/(?P<d>{r_nums}) ' \ + rf'(?P<h>{r_nums}):(?P<m>{r_nums}):(?P<s>{r_nums})' + match = re.search(r_timestamp, timestamp) + + return '{year}-{mon}-{day}T{h}:{m}:{s}+07:00'.format( + # FIXME: HTCondor logs don't record year... + year='2020', + mon=match.group('mon'), + day=match.group('d'), + h=match.group('h'), + m=match.group('m'), + s=match.group('s') + ) + + +class WorkflowMonitor: + def __init__(self, logfile_path: str, timeout: int = 60): + try: + self.log = self.read_htcondor_log(Path(logfile_path), timeout) + except TimeoutError as e: + # read_htcondor_log() timed out + print(e) + quit(-1) + + self.events = self.parse_log() + + for event in self.events: + self.send_event_data(event.json()) + + def read_htcondor_log(self, logfile_path: Path, timeout: int) -> str: + """ + Waits for HTCondor job to terminate and reads the resulting log file + :param logfile_path: Path to log file + :param timeout: Time in seconds to wait for the job to finish + :return: String contents of the log file + """ + signal.alarm(timeout) + + while not os.path.exists(logfile_path): + time.sleep(3) + + with open(logfile_path, 'r') as f: + return f.read() + + def parse_log(self) -> List[WorkflowEvent]: """ Parse log for relevant details: - - For each job: + - For each job, create valid WorkflowEvent objects based on: - Job name - If/when the job executed - If/when/why the job terminated - :return: Dictionary of important metadata for each job + :return: List of WorkflowEvent objects """ - pass + # Regex that matches an event entry in the HTCondor log file. + r_htc_event = r'(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) ' \ + r'(?P<timestamp>[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) ' \ + r'(?P<desc>[a-zA-Z0-9 ?&:=<>\-._]+)\n' \ + r'(?P<body>[\t ]+[^\.]*\n)*' \ + r'...' + jobs = {} + events = [] + for match in re.finditer(r_htc_event, self.log): + job_num = match.group('jobnum') + if match.group('eventnum') == '000': + # Populate jobs dict with job name + jobs[job_num] = get_job_name(match.group('body')) + + try: + event_type = EventType(match.group('eventnum')) + except ValueError: + event_type = EventType.OTHER + + # TODO: Add other regex groups as fields + events.append(WorkflowEvent( + job_name=jobs[job_num], + event_type=event_type, + timestamp=fmt_timestamp(match.group('timestamp')), + log=match.group(0) + )) + + return events - def job_to_amqp_event(self, job_metadatum: dict): + def send_event_data(self, event_metadatum: str): """ - Converts job metadata dictionary into a JSON string and ... - :param job_metadatum: Job metadata - :return: JSON string of job metadata + Takes in a JSON-formatted string and sends it off to the workflow status exchange + :param event_metadatum: JSON string of event metadata """ - return json.dumps(job_metadatum) # ? \ No newline at end of file + # Send data to exchange + pass \ No newline at end of file diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py new file mode 100644 index 0000000000000000000000000000000000000000..6deca554c3304b802fe2989eac89b3820fd7712b --- /dev/null +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py @@ -0,0 +1,30 @@ +import enum +import json + +class EventType(enum.Enum): + SUBMITTED = 0 + EXECUTING = 1 + TERMINATED = 5 + OTHER = -1 + +class WorkflowEvent: + def __init__(self, job_name: str, + event_type: EventType, + timestamp: str, + log: str, + retval: int = None): + self.job_name = job_name + self.type = event_type + self.timestamp = timestamp + self.retval = retval + self.log = log + + def json(self) -> str: + d = { + 'job_name': self.job_name, + 'type': str(self.type), + 'timestamp': self.timestamp, + 'return_value': self.retval, + 'log': self.log + } + return json.dumps(d) diff --git a/apps/cli/utilities/wf_monitor/test/__init__.py b/apps/cli/utilities/wf_monitor/test/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py new file mode 100644 index 0000000000000000000000000000000000000000..96a55a6ad4d4b2ffc9ed4ce610b7d61b6657cd65 --- /dev/null +++ b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py @@ -0,0 +1,33 @@ +from wf_monitor.wf_event import WorkflowEvent, EventType +from wf_monitor.monitor import WorkflowMonitor + +log_path = 'apps/cli/utilities/wf_monitor/logs/condor.log' +test_monitor = WorkflowMonitor(log_path) + +def test_read_log(): + test_strs = [ + 'Image size of job updated: 432', + '432 - ResidentSetSize of job (KB)', + '(1) Normal termination (return value 0)', + '000 (3983.000.000) 08/26 11:06:06 Job submitted from host: ' + '<10.64.1.178:9618?addrs=10.64.1.178-9618&noUDP&sock=4050180_7e37_3>' + ] + for s in test_strs: + assert s in test_monitor.log + +def test_parse_log(): + test_event_types = [ + EventType.SUBMITTED, + EventType.EXECUTING, + EventType.OTHER, + EventType.TERMINATED, + EventType.SUBMITTED, + EventType.EXECUTING, + EventType.OTHER, + EventType.TERMINATED + ] + for e, e_type in zip(test_monitor.events, test_event_types): + assert e.type == e_type + +def test_send_event_data(): + pass \ No newline at end of file