Skip to content
Snippets Groups Projects
Commit 57447a87 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Adding decorator for logging function execution progress.

parent 6050c3f1
No related branches found
No related tags found
No related merge requests found
......@@ -5,9 +5,10 @@ import time
import signal
import logging
import argparse
import functools
import subprocess
import tracemalloc
from typing import List, Union, Dict
from typing import List, Union, Dict, Callable, Tuple
from pathlib import Path
import pika
......@@ -24,6 +25,18 @@ logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
def logdec_factory(msg: str):
def logdec(f: Callable) -> Callable:
@functools.wraps(f)
def wrapper(*args: Tuple, **kwargs: Dict) -> Callable:
logger.info(msg)
result = f(*args, **kwargs)
logger.info("Done.")
return result
return wrapper
return logdec
def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None:
"""
Signal handler function for handling a SIGALRM that arrives when WorkflowMonitor.read_log()
......@@ -105,19 +118,16 @@ class WorkflowMonitor:
def __init__(self, logfile_path: str, timeout: int = 60):
self.logfile_path: Path = Path(logfile_path)
logger.info("Reading log file...")
try:
self.log: str = self.read_htcondor_log(timeout)
except TimeoutError as e:
# read_htcondor_log() timed out
logger.error(e)
quit(-1)
logger.info('Log file read successfully.')
logger.info('Parsing log file...')
self.events: List[WorkflowEvent] = self.parse_log()
logger.info('Log file parsed.')
@logdec_factory('Reading HTCondor log...')
def read_htcondor_log(self, timeout: int) -> str:
"""
Waits for HTCondor job to terminate and reads the resulting log file
......@@ -131,10 +141,11 @@ class WorkflowMonitor:
logger.info('Attempting to read log file...')
time.sleep(3)
logger.info('Log file exists.')
logger.info('Log file found.')
with open(self.logfile_path, 'r') as f:
return f.read()
@logdec_factory('Parsing HTCondor log...')
def parse_log(self) -> List[WorkflowEvent]:
"""
Parse log for relevant details:
......@@ -200,6 +211,7 @@ def make_arg_parser() -> argparse.ArgumentParser:
return parser
@logdec_factory('Establishing connection to the AMQP server...')
def make_amqp_connection(profile: str) -> pika.BlockingConnection:
"""
Initialize connection to AMQP server
......@@ -223,16 +235,15 @@ def make_amqp_connection(profile: str) -> pika.BlockingConnection:
)
@logdec_factory('Sending event data to the channel...')
def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent):
"""
Takes in a JSON-formatted string and sends it off to the workflow status exchange
:param event: JSON string of event metadata
"""
# Send data to exchange
logger.info('Sending event data...')
channel: pika.channel.Channel = connection.channel()
channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True)
logger.info('Event data sent.')
return channel.basic_publish(
exchange=WORKFLOW_STATUS_EXCH,
routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}',
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment