diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index 2a9848e610c5f0c0e99cee7f6e96cf1a623f0725..d84aace07488b6df6e1e3ae3ff354a38410fb341 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -5,9 +5,10 @@ import time import signal import logging import argparse +import functools import subprocess import tracemalloc -from typing import List, Union, Dict +from typing import List, Union, Dict, Callable, Tuple from pathlib import Path import pika @@ -24,6 +25,18 @@ logger.setLevel(logging.INFO) logger.addHandler(logging.StreamHandler(sys.stdout)) +def logdec_factory(msg: str): + def logdec(f: Callable) -> Callable: + @functools.wraps(f) + def wrapper(*args: Tuple, **kwargs: Dict) -> Callable: + logger.info(msg) + result = f(*args, **kwargs) + logger.info("Done.") + return result + return wrapper + return logdec + + def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None: """ Signal handler function for handling a SIGALRM that arrives when WorkflowMonitor.read_log() @@ -105,19 +118,16 @@ class WorkflowMonitor: def __init__(self, logfile_path: str, timeout: int = 60): self.logfile_path: Path = Path(logfile_path) - logger.info("Reading log file...") try: self.log: str = self.read_htcondor_log(timeout) except TimeoutError as e: # read_htcondor_log() timed out logger.error(e) quit(-1) - logger.info('Log file read successfully.') - logger.info('Parsing log file...') self.events: List[WorkflowEvent] = self.parse_log() - logger.info('Log file parsed.') + @logdec_factory('Reading HTCondor log...') def read_htcondor_log(self, timeout: int) -> str: """ Waits for HTCondor job to terminate and reads the resulting log file @@ -131,10 +141,11 @@ class WorkflowMonitor: logger.info('Attempting to read log file...') time.sleep(3) - logger.info('Log file exists.') + logger.info('Log file found.') with open(self.logfile_path, 'r') as f: return f.read() + @logdec_factory('Parsing HTCondor log...') def parse_log(self) -> List[WorkflowEvent]: """ Parse log for relevant details: @@ -200,6 +211,7 @@ def make_arg_parser() -> argparse.ArgumentParser: return parser +@logdec_factory('Establishing connection to the AMQP server...') def make_amqp_connection(profile: str) -> pika.BlockingConnection: """ Initialize connection to AMQP server @@ -223,16 +235,15 @@ def make_amqp_connection(profile: str) -> pika.BlockingConnection: ) +@logdec_factory('Sending event data to the channel...') def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent): """ Takes in a JSON-formatted string and sends it off to the workflow status exchange :param event: JSON string of event metadata """ # Send data to exchange - logger.info('Sending event data...') channel: pika.channel.Channel = connection.channel() channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True) - logger.info('Event data sent.') return channel.basic_publish( exchange=WORKFLOW_STATUS_EXCH, routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}',