diff --git a/apps/cli/utilities/wf_monitor/setup.py b/apps/cli/utilities/wf_monitor/setup.py index c8d405fce2f8e643a833936a2dc173723558715c..f9fbde3e9a553f5882c9e2f0350d82281e6a33fc 100644 --- a/apps/cli/utilities/wf_monitor/setup.py +++ b/apps/cli/utilities/wf_monitor/setup.py @@ -7,12 +7,14 @@ from setuptools import setup VERSION = open('src/wf_monitor/_version.py').readlines()[-1].split()[-1].strip("\"'") README = Path('README.md').read_text() -# requires = [ -# ] - +requires = [ + 'pika', + 'pendulum' +] tests_require = [ 'pytest>=5.4,<6.0' ] + setup( name=Path().absolute().name, version=VERSION, diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index 25bcb7e3e9e915c7caf726d95d70b781e64dd381..e69de6d9b21b41b3a68db1b3bced2c1ce91f2014 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -1,29 +1,38 @@ import os import re +import sys import time import signal +import logging +import subprocess import tracemalloc -from typing import List +from typing import List, Union, Dict from pathlib import Path import pika +import pendulum from .wf_event import WorkflowEvent, EventType -WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' +WORKFLOW_STATUS_EXCH: str = 'workspaces.workflow-service.workflow-status' +logger: logging.Logger = logging.getLogger('wf_monitor') +logger.setLevel(logging.INFO) +logger.addHandler(logging.StreamHandler(sys.stdout)) -def timeout_handler(signum: int, frame: tracemalloc.Frame): + +def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None: """ Signal handler function for handling a SIGALRM that arrives when WorkflowMonitor.read_log() times out. :param signum: Signal number (would be signal.SIGALRM in this case) :param frame: Stack frame the signal was sent from """ - msg = 'Timeout elapsed when attempting to read HTCondor log file. The job probably did ' \ - 'not complete.' + msg: str = 'Timeout elapsed when attempting to read HTCondor log file. The job probably did ' \ + 'not complete.' raise TimeoutError(msg) + # Timeout handling solution found here: # https://stackoverflow.com/questions/492519/timeout-on-a-function-call signal.signal(signal.SIGALRM, timeout_handler) @@ -35,26 +44,37 @@ def get_job_name(body: str) -> str: :param body: Body of the log entry :return: Job name """ - r_body_parse = r'[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)' - body_match = re.search(r_body_parse, body) + r_body_parse: str = r'[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)' + body_match: re.Match = re.search(r_body_parse, body) return body_match.group('jobname') -def fmt_timestamp(timestamp: str) -> str: +def get_year(logfile_path: Path) -> str: + """ + Parses year from log file creation date + :param logfile_path: Path to log file + :return: Year log was created as a string + """ + status: os.stat_result = os.stat(logfile_path) + date: pendulum.DateTime = pendulum.from_timestamp(status.st_ctime) + return str(date.year) + + +def fmt_timestamp(timestamp: str, year: str) -> str: """ Format timestamp string into ISO 8601 format 2020-08-27T21:45:09+07:00 :param timestamp: Unformatted timestamp string + :param year: Year HTCondor log was created as a string :return: Formatted timestamp string """ - r_nums = r'[0-9]{2}' - r_timestamp = rf'(?P<mon>{r_nums})/(?P<d>{r_nums}) ' \ - rf'(?P<h>{r_nums}):(?P<m>{r_nums}):(?P<s>{r_nums})' - match = re.search(r_timestamp, timestamp) + r_nums: str = r'[0-9]{2}' + r_timestamp: str = rf'(?P<mon>{r_nums})/(?P<d>{r_nums}) ' \ + rf'(?P<h>{r_nums}):(?P<m>{r_nums}):(?P<s>{r_nums})' + match: re.Match = re.search(r_timestamp, timestamp) return '{year}-{mon}-{day}T{h}:{m}:{s}+07:00'.format( - # FIXME: HTCondor logs don't record year... - year='2020', + year=year, mon=match.group('mon'), day=match.group('d'), h=match.group('h'), @@ -63,19 +83,42 @@ def fmt_timestamp(timestamp: str) -> str: ) +def get_retval(body: str) -> Union[int, None]: + """ + Parse return value from job terminated event + :param body: Body of event log entry + :return: Return value as an int + """ + r_retval: str = r'return value (?P<retval>[0-9]+)' + + retval: re.Match = re.search(r_retval, body) + if retval: + return int(retval.group('retval')) + else: + return + + class WorkflowMonitor: def __init__(self, logfile_path: str, timeout: int = 60): + self.logfile_path: Path = Path(logfile_path) + + logger.info("Reading log file...") try: - self.log = self.read_htcondor_log(Path(logfile_path), timeout) + self.log: str = self.read_htcondor_log(self.logfile_path, timeout) except TimeoutError as e: # read_htcondor_log() timed out - print(e) + logger.error(e) quit(-1) + logger.info('Log file read successfully.') - self.events = self.parse_log() + logger.info('Parsing log file...') + self.events: List[WorkflowEvent] = self.parse_log() + logger.info('Log file parsed.') + logger.info('Sending event data...') for event in self.events: self.send_event_data(event.json()) + logger.info('Event data sent.') def read_htcondor_log(self, logfile_path: Path, timeout: int) -> str: """ @@ -87,45 +130,50 @@ class WorkflowMonitor: signal.alarm(timeout) while not os.path.exists(logfile_path): + logger.info('Attempting to read log file...') time.sleep(3) + logger.info('Log file exists.') with open(logfile_path, 'r') as f: return f.read() def parse_log(self) -> List[WorkflowEvent]: """ Parse log for relevant details: - - For each job, create valid WorkflowEvent objects based on: - - Job name - - If/when the job executed - - If/when/why the job terminated + - :return: List of WorkflowEvent objects """ # Regex that matches an event entry in the HTCondor log file. - r_htc_event = r'(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) ' \ - r'(?P<timestamp>[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) ' \ - r'(?P<desc>[a-zA-Z0-9 ?&:=<>\-._]+)\n' \ - r'(?P<body>[\t ]+[^\.]*\n)*' \ - r'...' - jobs = {} - events = [] + r_htc_event: str = r'(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) ' \ + r'(?P<timestamp>[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) ' \ + r'(?P<desc>[a-zA-Z0-9 ?&:=<>\-._]+)\n' \ + r'(?P<body>[\t ]+[^\.]*\n)*' \ + r'...' + jobs: Dict[str, str] = {} + events: List[WorkflowEvent] = [] for match in re.finditer(r_htc_event, self.log): - job_num = match.group('jobnum') + job_num: str = match.group('jobnum') if match.group('eventnum') == '000': # Populate jobs dict with job name jobs[job_num] = get_job_name(match.group('body')) try: - event_type = EventType(match.group('eventnum')) + event_type: EventType = EventType(int(match.group('eventnum'))) except ValueError: - event_type = EventType.OTHER + event_type: EventType = EventType.OTHER + + if event_type == EventType.TERMINATED: + retval: Union[int, None] = get_retval(match.group('body')) + else: + retval: Union[int, None] = None - # TODO: Add other regex groups as fields + year: str = get_year(self.logfile_path) events.append(WorkflowEvent( job_name=jobs[job_num], event_type=event_type, - timestamp=fmt_timestamp(match.group('timestamp')), - log=match.group(0) + timestamp=fmt_timestamp(match.group('timestamp'), year), + log=match.group(0), + retval=retval )) return events diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/utils.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/utils.py deleted file mode 100644 index acfde62d0c2117d28c2453cd6af87ff9521f7ab8..0000000000000000000000000000000000000000 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/utils.py +++ /dev/null @@ -1,4 +0,0 @@ -import signal - -def timeout_handler(signum, frame): - print("Function {}") \ No newline at end of file diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py index 6deca554c3304b802fe2989eac89b3820fd7712b..02387572103ae884d2ca1d64421be8f8367d6302 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py @@ -1,5 +1,7 @@ import enum import json +from typing import Dict + class EventType(enum.Enum): SUBMITTED = 0 @@ -13,14 +15,14 @@ class WorkflowEvent: timestamp: str, log: str, retval: int = None): - self.job_name = job_name - self.type = event_type - self.timestamp = timestamp - self.retval = retval - self.log = log + self.job_name: str = job_name + self.type: EventType = event_type + self.timestamp: str = timestamp + self.log: str = log + self.retval: int = retval def json(self) -> str: - d = { + d: Dict[str, str] = { 'job_name': self.job_name, 'type': str(self.type), 'timestamp': self.timestamp, @@ -28,3 +30,9 @@ class WorkflowEvent: 'log': self.log } return json.dumps(d) + + def __str__(self): + return self.json() + + def __repr__(self): + return f'<WorkflowEvent {self.__dict__}>' \ No newline at end of file diff --git a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py index 96a55a6ad4d4b2ffc9ed4ce610b7d61b6657cd65..4e381941f20772294aecfb38f5961bf166d788a1 100644 --- a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py +++ b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py @@ -1,9 +1,10 @@ from wf_monitor.wf_event import WorkflowEvent, EventType from wf_monitor.monitor import WorkflowMonitor -log_path = 'apps/cli/utilities/wf_monitor/logs/condor.log' +log_path = 'logs/condor.log' test_monitor = WorkflowMonitor(log_path) + def test_read_log(): test_strs = [ 'Image size of job updated: 432', @@ -15,6 +16,7 @@ def test_read_log(): for s in test_strs: assert s in test_monitor.log + def test_parse_log(): test_event_types = [ EventType.SUBMITTED, @@ -29,5 +31,6 @@ def test_parse_log(): for e, e_type in zip(test_monitor.events, test_event_types): assert e.type == e_type + def test_send_event_data(): - pass \ No newline at end of file + pass