diff --git a/apps/cli/utilities/wf_monitor/README.md b/apps/cli/utilities/wf_monitor/README.md index 324afa805a9694447d2b0e57e67a20cf9c498e77..bd9b976d48cb6617aca92a50f5c6b01df196db2b 100644 --- a/apps/cli/utilities/wf_monitor/README.md +++ b/apps/cli/utilities/wf_monitor/README.md @@ -1,4 +1,21 @@ -# Workflow Monitor +# Workspaces Workflow Monitor + +Reads in an HTCondor log file and parses the relevant data from it, splitting it into events and formatting those into messages that are sent to the workspaces.workflow-service.workflow-status RabbitMQ exchange. + +``` +usage: wf_monitor [-h] [-P PROFILE] log_path + +Workspaces workflow monitor, version 4.0.0a1.dev1. Monitor execution of a workflow from the command line. + +positional arguments: + log_path path to the workflow's HTCondor log + +optional arguments: + -h, --help show this help message and exit + -P PROFILE, --profile PROFILE + profile name to use; e.g. test, production; DEFAULT: nmtest +``` + ## Design - Reads in HTCondor logs - HTCondor executes a job diff --git a/apps/cli/utilities/wf_monitor/logs/test.log b/apps/cli/utilities/wf_monitor/logs/test.log new file mode 100644 index 0000000000000000000000000000000000000000..9a98d74001825a698878a9f23cfd7a5961e8d1e4 --- /dev/null +++ b/apps/cli/utilities/wf_monitor/logs/test.log @@ -0,0 +1 @@ +this will trigger an exception!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! \ No newline at end of file diff --git a/apps/cli/utilities/wf_monitor/setup.py b/apps/cli/utilities/wf_monitor/setup.py index f9fbde3e9a553f5882c9e2f0350d82281e6a33fc..86da440095ebee75ad9003a8c85cc1f586583160 100644 --- a/apps/cli/utilities/wf_monitor/setup.py +++ b/apps/cli/utilities/wf_monitor/setup.py @@ -8,11 +8,12 @@ VERSION = open('src/wf_monitor/_version.py').readlines()[-1].split()[-1].strip(" README = Path('README.md').read_text() requires = [ - 'pika', - 'pendulum' + 'pika==1.1', + 'pendulum==2.1.2', ] tests_require = [ - 'pytest>=5.4,<6.0' + 'pytest>=5.4,<6.0', + 'pytest-mock==3.3.1', ] setup( @@ -24,7 +25,7 @@ setup( author_email='dms-ssa@nrao.edu', url='TBD', license="GPL", - # install_requires=requires, + install_requires=requires, tests_require=tests_require, keywords=[], packages=['wf_monitor'], @@ -33,6 +34,6 @@ setup( 'Programming Language :: Python :: 3.8' ], entry_points={ - 'console_scripts': [''] + 'console_scripts': ['wf_monitor = wf_monitor.monitor:main'] }, ) diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index 00efb6191a930324896ade988031923707c0d7e4..d7a9cfd94231341de44adb90406c92565281a425 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -5,9 +5,9 @@ import time import signal import logging import argparse -import subprocess +import functools import tracemalloc -from typing import List, Union, Dict +from typing import List, Union, Dict, Callable, Tuple, Any from pathlib import Path import pika @@ -17,13 +17,41 @@ from pycapo import CapoConfig from .wf_event import WorkflowEvent, EventType from ._version import ___version___ as VERSION -WORKFLOW_STATUS_EXCH: str = 'workspaces.workflow-service.workflow-status' +WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' -logger: logging.Logger = logging.getLogger('wf_monitor') +logger = logging.getLogger('wf_monitor') logger.setLevel(logging.INFO) logger.addHandler(logging.StreamHandler(sys.stdout)) +def log_decorator_factory(msg: str): + """ + Factory that produces log decorators with a given message + :param msg: Message to be logged + :return: Log decorator + """ + def log_decorator(f: Callable) -> Callable: + """ + Decorator that logs a message before and after a function is called + :param f: Function to be decorated + :return: Wrapper function + """ + @functools.wraps(f) + def wrapper(*args: Tuple, **kwargs: Dict) -> Any: + """ + Decorator boilerplate: wrapper function for log decorator + :param args: args for function f + :param kwargs: kwargs for function f + :return: Return value of the function + """ + logger.info(msg) + result = f(*args, **kwargs) + logger.info("Done.") + return result + return wrapper + return log_decorator + + def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None: """ Signal handler function for handling a SIGALRM that arrives when WorkflowMonitor.read_log() @@ -31,8 +59,8 @@ def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None: :param signum: Signal number (would be signal.SIGALRM in this case) :param frame: Stack frame the signal was sent from """ - msg: str = 'Timeout elapsed when attempting to read HTCondor log file. The job probably did ' \ - 'not complete.' + msg = 'Timeout elapsed when attempting to read HTCondor log file. The job either did ' \ + 'not complete or the path to the log was not entered correctly.' raise TimeoutError(msg) @@ -47,8 +75,8 @@ def get_job_name(body: str) -> str: :param body: Body of the log entry :return: Job name """ - r_body_parse: str = r'[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)' - body_match: re.Match = re.search(r_body_parse, body) + r_body_parse = r'[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)' + body_match = re.search(r_body_parse, body) return body_match.group('jobname') @@ -58,8 +86,8 @@ def get_year(logfile_path: Path) -> str: :param logfile_path: Path to log file :return: Year log was created as a string """ - status: os.stat_result = os.stat(logfile_path) - date: pendulum.DateTime = pendulum.from_timestamp(status.st_ctime) + status = os.stat(logfile_path) + date = pendulum.from_timestamp(status.st_ctime) return str(date.year) @@ -71,10 +99,10 @@ def fmt_timestamp(timestamp: str, year: str) -> str: :param year: Year HTCondor log was created as a string :return: Formatted timestamp string """ - r_nums: str = r'[0-9]{2}' - r_timestamp: str = rf'(?P<mon>{r_nums})/(?P<d>{r_nums}) ' \ + r_nums = r'[0-9]{2}' + r_timestamp = rf'(?P<mon>{r_nums})/(?P<d>{r_nums}) ' \ rf'(?P<h>{r_nums}):(?P<m>{r_nums}):(?P<s>{r_nums})' - match: re.Match = re.search(r_timestamp, timestamp) + match = re.search(r_timestamp, timestamp) return '{year}-{mon}-{day}T{h}:{m}:{s}+07:00'.format( year=year, @@ -92,9 +120,9 @@ def get_retval(body: str) -> Union[int, None]: :param body: Body of event log entry :return: Return value as an int """ - r_retval: str = r'return value (?P<retval>[0-9]+)' + r_retval = r'return value (?P<retval>[0-9]+)' - retval: re.Match = re.search(r_retval, body) + retval = re.search(r_retval, body) if retval: return int(retval.group('retval')) else: @@ -102,28 +130,23 @@ def get_retval(body: str) -> Union[int, None]: class WorkflowMonitor: + """ + Class that monitors the events in a given workflow via the log file produced by it + """ def __init__(self, logfile_path: str, timeout: int = 60): - self.logfile_path: Path = Path(logfile_path) + self.logfile_path = Path(logfile_path) - logger.info("Reading log file...") try: - self.log: str = self.read_htcondor_log(self.logfile_path, timeout) + self.log = self.read_htcondor_log(timeout) except TimeoutError as e: # read_htcondor_log() timed out logger.error(e) quit(-1) - logger.info('Log file read successfully.') - logger.info('Parsing log file...') - self.events: List[WorkflowEvent] = self.parse_log() - logger.info('Log file parsed.') + self.events = self.parse_log() - logger.info('Sending event data...') - for event in self.events: - self.send_event_data(event.json()) - logger.info('Event data sent.') - - def read_htcondor_log(self, logfile_path: Path, timeout: int) -> str: + @log_decorator_factory('Reading HTCondor log...') + def read_htcondor_log(self, timeout: int) -> str: """ Waits for HTCondor job to terminate and reads the resulting log file :param logfile_path: Path to log file @@ -132,14 +155,15 @@ class WorkflowMonitor: """ signal.alarm(timeout) - while not os.path.exists(logfile_path): + while not os.path.exists(self.logfile_path): logger.info('Attempting to read log file...') time.sleep(3) - logger.info('Log file exists.') - with open(logfile_path, 'r') as f: + logger.info('Log file found.') + with open(self.logfile_path, 'r') as f: return f.read() + @log_decorator_factory('Parsing HTCondor log...') def parse_log(self) -> List[WorkflowEvent]: """ Parse log for relevant details: @@ -147,32 +171,36 @@ class WorkflowMonitor: :return: List of WorkflowEvent objects """ # Regex that matches an event entry in the HTCondor log file. - r_htc_event: str = r'(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) ' \ - r'(?P<timestamp>[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) ' \ - r'(?P<desc>[a-zA-Z0-9 ?&:=<>\-._]+)\n' \ - r'(?P<body>[\t ]+[^\.]*\n)*' \ - r'...' - jobs: Dict[str, str] = {} - events: List[WorkflowEvent] = [] + r_htc_event = r'(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) ' \ + r'(?P<timestamp>[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) ' \ + r'(?P<desc>[a-zA-Z0-9 ?&:=<>\-._]+)\n' \ + r'(?P<body>[\t ]+[^\.]*\n)*' \ + r'...' + jobs = {} + events = [] + if sum(1 for _ in re.finditer(r_htc_event, self.log)) == 0: + # Regex didn't return any matches + raise ValueError('HTCondor log file not well-formatted. Was the file edited?') for match in re.finditer(r_htc_event, self.log): - job_num: str = match.group('jobnum') + job_id = match.group('jobnum') if match.group('eventnum') == '000': # Populate jobs dict with job name - jobs[job_num] = get_job_name(match.group('body')) + jobs[job_id] = get_job_name(match.group('body')) try: - event_type: EventType = EventType(int(match.group('eventnum'))) + event_type = EventType(int(match.group('eventnum'))) except ValueError: - event_type: EventType = EventType.OTHER + event_type = EventType.OTHER if event_type == EventType.TERMINATED: - retval: Union[int, None] = get_retval(match.group('body')) + retval = get_retval(match.group('body')) else: - retval: Union[int, None] = None + retval = None - year: str = get_year(self.logfile_path) + year = get_year(self.logfile_path) events.append(WorkflowEvent( - job_name=jobs[job_num], + job_name=jobs[job_id], + job_id=job_id, event_type=event_type, timestamp=fmt_timestamp(match.group('timestamp'), year), log=match.group(0), @@ -181,18 +209,11 @@ class WorkflowMonitor: return events - def send_event_data(self, connection: pika.BlockingConnection, event_metadatum: str): - """ - Takes in a JSON-formatted string and sends it off to the workflow status exchange - :param event_metadatum: JSON string of event metadata - """ - # Send data to exchange - channel: pika.channel.Channel = connection.channel() - channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True) - channel.basic_publish( - exchange=WORKFLOW_STATUS_EXCH, - # routing_key=? - ) + def __str__(self): + return f'WorkflowMonitor, monitoring {self.logfile_path} that has events {self.events}' + + def __repr__(self): + return f'{self.__class__.__name__} ({self.logfile_path}, {self.events})' _DESCRIPTION = 'Workspaces workflow monitor, version {}. Monitor execution of a workflow from ' \ @@ -204,18 +225,18 @@ def make_arg_parser() -> argparse.ArgumentParser: Initialize argument parser for command-line arguments :return: Argument parser """ - parser: argparse.ArgumentParser = argparse.ArgumentParser( + parser = argparse.ArgumentParser( description=_DESCRIPTION.format(VERSION), formatter_class=argparse.RawTextHelpFormatter ) - parser.add_argument('-P', '--profile', action='store', required=True, - help='profile name to use; e.g. test, production') - parser.add_argument('log_path', action='store', required=True, - help='path to the workflow\'s HTCondor log') + parser.add_argument('-P', '--profile', action='store', required=False, default='nmtest', + help='profile name to use; e.g. test, production; DEFAULT: nmtest') + parser.add_argument('log_path', action='store', help='path to the workflow\'s HTCondor log') return parser +@log_decorator_factory('Establishing connection to the AMQP server...') def make_amqp_connection(profile: str) -> pika.BlockingConnection: """ Initialize connection to AMQP server @@ -224,9 +245,9 @@ def make_amqp_connection(profile: str) -> pika.BlockingConnection: """ capo_cfg = CapoConfig(profile=profile) - hostname: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname') - username: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username') - password: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password') + hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname') + username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username') + password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password') return pika.BlockingConnection( pika.ConnectionParameters( @@ -238,7 +259,28 @@ def make_amqp_connection(profile: str) -> pika.BlockingConnection: ) ) + +@log_decorator_factory('Sending event data to the channel...') +def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent): + """ + Takes in a JSON-formatted string and sends it off to the workflow status exchange + :param event: JSON string of event metadata + """ + # Send data to exchange + channel = connection.channel() + channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True) + return channel.basic_publish( + exchange=WORKFLOW_STATUS_EXCH, + routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', + body=event.json() + ) + + def main(): # Parse command-line args - args: argparse.Namespace = make_arg_parser().parse_args() - connection: pika.BlockingConnection = make_amqp_connection(args.profile) + args = make_arg_parser().parse_args() + connection = make_amqp_connection(args.profile) + monitor = WorkflowMonitor(args.log_path) + + for event in monitor.events: + send_event_data(connection, event) diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py index 02387572103ae884d2ca1d64421be8f8367d6302..cb255f28d800a3cbb79aa99b36dd708a6a5bf85c 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py @@ -10,19 +10,24 @@ class EventType(enum.Enum): OTHER = -1 class WorkflowEvent: - def __init__(self, job_name: str, - event_type: EventType, - timestamp: str, - log: str, - retval: int = None): - self.job_name: str = job_name - self.type: EventType = event_type - self.timestamp: str = timestamp - self.log: str = log - self.retval: int = retval + def __init__( + self, + job_name: str, + job_id: str, + event_type: EventType, + timestamp: str, + log: str, + retval: int = None + ): + self.job_name = job_name + self.job_id = str(job_id) + self.type = event_type + self.timestamp = timestamp + self.log = log + self.retval = retval def json(self) -> str: - d: Dict[str, str] = { + d = { 'job_name': self.job_name, 'type': str(self.type), 'timestamp': self.timestamp, @@ -32,7 +37,7 @@ class WorkflowEvent: return json.dumps(d) def __str__(self): - return self.json() + return f'WorkflowEvent with data {self.json()}' def __repr__(self): - return f'<WorkflowEvent {self.__dict__}>' \ No newline at end of file + return f'<wf_monitor.WorkflowEvent {self.__dict__}>' diff --git a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py index 4e381941f20772294aecfb38f5961bf166d788a1..2fe39cb990466c8f513be600644001a4ac829754 100644 --- a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py +++ b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py @@ -1,5 +1,6 @@ -from wf_monitor.wf_event import WorkflowEvent, EventType -from wf_monitor.monitor import WorkflowMonitor +import pytest +from wf_monitor.wf_event import EventType +from wf_monitor.monitor import WorkflowMonitor, send_event_data, make_amqp_connection log_path = 'logs/condor.log' test_monitor = WorkflowMonitor(log_path) @@ -17,6 +18,13 @@ def test_read_log(): assert s in test_monitor.log +def test_read_log_timeout(capsys): + with pytest.raises(SystemExit) as sys_ex: + WorkflowMonitor('logs/file-that-does-not-exist.txt', 1) + assert sys_ex.type == SystemExit + assert sys_ex.value.code == -1 + + def test_parse_log(): test_event_types = [ EventType.SUBMITTED, @@ -32,5 +40,22 @@ def test_parse_log(): assert e.type == e_type -def test_send_event_data(): - pass +def test_parse_log_error(): + with pytest.raises(ValueError) as val_err: + WorkflowMonitor('logs/test.log') + assert val_err.type == ValueError + + +def test_send_event_data(mocker): + mock_list = [ + 'pika.adapters.blocking_connection.BlockingChannel.basic_publish', + 'wf_monitor.monitor.make_amqp_connection', + 'wf_monitor.monitor.send_event_data', + ] + [mocker.patch(mock) for mock in mock_list] + + connection = make_amqp_connection('nmtest') + + for event in WorkflowMonitor(log_path).events: + send_event_data(connection, event) + assert connection.channel().basic_publish.call_count == 8