From 2373267d621c5e4c7375d2b68df3b77444bbf3b6 Mon Sep 17 00:00:00 2001 From: nhertz <nhertz@nrao.edu> Date: Thu, 3 Sep 2020 13:07:20 -0600 Subject: [PATCH] Removed variable type annotations by @dlyons' request --- .../wf_monitor/src/wf_monitor/monitor.py | 96 +++++++++++-------- .../wf_monitor/src/wf_monitor/wf_event.py | 14 +-- .../wf_monitor/test/test_wf_monitor.py | 10 +- 3 files changed, 68 insertions(+), 52 deletions(-) diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index 011bb34e6..282def29c 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -8,7 +8,7 @@ import argparse import functools import subprocess import tracemalloc -from typing import List, Union, Dict, Callable, Tuple +from typing import List, Union, Dict, Callable, Tuple, Any from pathlib import Path import pika @@ -18,23 +18,39 @@ from pycapo import CapoConfig from .wf_event import WorkflowEvent, EventType from ._version import ___version___ as VERSION -WORKFLOW_STATUS_EXCH: str = 'workspaces.workflow-service.workflow-status' +WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' -logger: logging.Logger = logging.getLogger('wf_monitor') +logger = logging.getLogger('wf_monitor') logger.setLevel(logging.INFO) logger.addHandler(logging.StreamHandler(sys.stdout)) -def logdec_factory(msg: str): - def logdec(f: Callable) -> Callable: +def log_decorator_factory(msg: str): + """ + Factory that produces log decorators with a given message + :param msg: Message to be logged + :return: Log decorator + """ + def log_decorator(f: Callable) -> Callable: + """ + Decorator that logs a message before and after a function is called + :param f: Function to be decorated + :return: Wrapper function + """ @functools.wraps(f) - def wrapper(*args: Tuple, **kwargs: Dict) -> Callable: + def wrapper(*args: Tuple, **kwargs: Dict) -> Any: + """ + Decorator boilerplate: wrapper function for log decorator + :param args: args for function f + :param kwargs: kwargs for function f + :return: Return value of the function + """ logger.info(msg) result = f(*args, **kwargs) logger.info("Done.") return result return wrapper - return logdec + return log_decorator def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None: @@ -44,7 +60,7 @@ def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None: :param signum: Signal number (would be signal.SIGALRM in this case) :param frame: Stack frame the signal was sent from """ - msg: str = 'Timeout elapsed when attempting to read HTCondor log file. The job either did ' \ + msg = 'Timeout elapsed when attempting to read HTCondor log file. The job either did ' \ 'not complete or the path to the log was not entered correctly.' raise TimeoutError(msg) @@ -60,8 +76,8 @@ def get_job_name(body: str) -> str: :param body: Body of the log entry :return: Job name """ - r_body_parse: str = r'[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)' - body_match: re.Match = re.search(r_body_parse, body) + r_body_parse = r'[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)' + body_match = re.search(r_body_parse, body) return body_match.group('jobname') @@ -71,8 +87,8 @@ def get_year(logfile_path: Path) -> str: :param logfile_path: Path to log file :return: Year log was created as a string """ - status: os.stat_result = os.stat(logfile_path) - date: pendulum.DateTime = pendulum.from_timestamp(status.st_ctime) + status = os.stat(logfile_path) + date = pendulum.from_timestamp(status.st_ctime) return str(date.year) @@ -84,10 +100,10 @@ def fmt_timestamp(timestamp: str, year: str) -> str: :param year: Year HTCondor log was created as a string :return: Formatted timestamp string """ - r_nums: str = r'[0-9]{2}' - r_timestamp: str = rf'(?P<mon>{r_nums})/(?P<d>{r_nums}) ' \ + r_nums = r'[0-9]{2}' + r_timestamp = rf'(?P<mon>{r_nums})/(?P<d>{r_nums}) ' \ rf'(?P<h>{r_nums}):(?P<m>{r_nums}):(?P<s>{r_nums})' - match: re.Match = re.search(r_timestamp, timestamp) + match = re.search(r_timestamp, timestamp) return '{year}-{mon}-{day}T{h}:{m}:{s}+07:00'.format( year=year, @@ -105,9 +121,9 @@ def get_retval(body: str) -> Union[int, None]: :param body: Body of event log entry :return: Return value as an int """ - r_retval: str = r'return value (?P<retval>[0-9]+)' + r_retval = r'return value (?P<retval>[0-9]+)' - retval: re.Match = re.search(r_retval, body) + retval = re.search(r_retval, body) if retval: return int(retval.group('retval')) else: @@ -116,18 +132,18 @@ def get_retval(body: str) -> Union[int, None]: class WorkflowMonitor: def __init__(self, logfile_path: str, timeout: int = 60): - self.logfile_path: Path = Path(logfile_path) + self.logfile_path = Path(logfile_path) try: - self.log: str = self.read_htcondor_log(timeout) + self.log = self.read_htcondor_log(timeout) except TimeoutError as e: # read_htcondor_log() timed out logger.error(e) quit(-1) - self.events: List[WorkflowEvent] = self.parse_log() + self.events = self.parse_log() - @logdec_factory('Reading HTCondor log...') + @log_decorator_factory('Reading HTCondor log...') def read_htcondor_log(self, timeout: int) -> str: """ Waits for HTCondor job to terminate and reads the resulting log file @@ -145,7 +161,7 @@ class WorkflowMonitor: with open(self.logfile_path, 'r') as f: return f.read() - @logdec_factory('Parsing HTCondor log...') + @log_decorator_factory('Parsing HTCondor log...') def parse_log(self) -> List[WorkflowEvent]: """ Parse log for relevant details: @@ -153,32 +169,32 @@ class WorkflowMonitor: :return: List of WorkflowEvent objects """ # Regex that matches an event entry in the HTCondor log file. - r_htc_event: str = r'(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) ' \ + r_htc_event = r'(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) ' \ r'(?P<timestamp>[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) ' \ r'(?P<desc>[a-zA-Z0-9 ?&:=<>\-._]+)\n' \ r'(?P<body>[\t ]+[^\.]*\n)*' \ r'...' - jobs: Dict[str, str] = {} - events: List[WorkflowEvent] = [] + jobs = {} + events = [] if sum(1 for _ in re.finditer(r_htc_event, self.log)) == 0: raise ValueError('HTCondor log file not well-formatted. Was the file edited?') for match in re.finditer(r_htc_event, self.log): - job_id: str = match.group('jobnum') + job_id = match.group('jobnum') if match.group('eventnum') == '000': # Populate jobs dict with job name jobs[job_id] = get_job_name(match.group('body')) try: - event_type: EventType = EventType(int(match.group('eventnum'))) + event_type = EventType(int(match.group('eventnum'))) except ValueError: - event_type: EventType = EventType.OTHER + event_type = EventType.OTHER if event_type == EventType.TERMINATED: - retval: Union[int, None] = get_retval(match.group('body')) + retval = get_retval(match.group('body')) else: - retval: Union[int, None] = None + retval = None - year: str = get_year(self.logfile_path) + year = get_year(self.logfile_path) events.append(WorkflowEvent( job_name=jobs[job_id], job_id=job_id, @@ -206,7 +222,7 @@ def make_arg_parser() -> argparse.ArgumentParser: Initialize argument parser for command-line arguments :return: Argument parser """ - parser: argparse.ArgumentParser = argparse.ArgumentParser( + parser = argparse.ArgumentParser( description=_DESCRIPTION.format(VERSION), formatter_class=argparse.RawTextHelpFormatter ) @@ -217,7 +233,7 @@ def make_arg_parser() -> argparse.ArgumentParser: return parser -@logdec_factory('Establishing connection to the AMQP server...') +@log_decorator_factory('Establishing connection to the AMQP server...') def make_amqp_connection(profile: str) -> pika.BlockingConnection: """ Initialize connection to AMQP server @@ -226,9 +242,9 @@ def make_amqp_connection(profile: str) -> pika.BlockingConnection: """ capo_cfg = CapoConfig(profile=profile) - hostname: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname') - username: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username') - password: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password') + hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname') + username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username') + password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password') return pika.BlockingConnection( pika.ConnectionParameters( @@ -241,14 +257,14 @@ def make_amqp_connection(profile: str) -> pika.BlockingConnection: ) -@logdec_factory('Sending event data to the channel...') +@log_decorator_factory('Sending event data to the channel...') def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent): """ Takes in a JSON-formatted string and sends it off to the workflow status exchange :param event: JSON string of event metadata """ # Send data to exchange - channel: pika.channel.Channel = connection.channel() + channel = connection.channel() channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True) return channel.basic_publish( exchange=WORKFLOW_STATUS_EXCH, @@ -259,8 +275,8 @@ def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent): def main(): # Parse command-line args - args: argparse.Namespace = make_arg_parser().parse_args() - connection: pika.BlockingConnection = make_amqp_connection(args.profile) + args = make_arg_parser().parse_args() + connection = make_amqp_connection(args.profile) monitor = WorkflowMonitor(args.log_path) for event in monitor.events: diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py index ec105af95..cb255f28d 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py @@ -19,15 +19,15 @@ class WorkflowEvent: log: str, retval: int = None ): - self.job_name: str = job_name - self.job_id: str = str(job_id) - self.type: EventType = event_type - self.timestamp: str = timestamp - self.log: str = log - self.retval: int = retval + self.job_name = job_name + self.job_id = str(job_id) + self.type = event_type + self.timestamp = timestamp + self.log = log + self.retval = retval def json(self) -> str: - d: Dict[str, str] = { + d = { 'job_name': self.job_name, 'type': str(self.type), 'timestamp': self.timestamp, diff --git a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py index a908f46cd..1519e4548 100644 --- a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py +++ b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py @@ -6,12 +6,12 @@ import pytest_mock from wf_monitor.wf_event import WorkflowEvent, EventType from wf_monitor.monitor import WorkflowMonitor, send_event_data, make_amqp_connection -log_path: str = 'logs/condor.log' -test_monitor: WorkflowMonitor = WorkflowMonitor(log_path) +log_path = 'logs/condor.log' +test_monitor = WorkflowMonitor(log_path) def test_read_log(): - test_strs: List[str] = [ + test_strs = [ 'Image size of job updated: 432', '432 - ResidentSetSize of job (KB)', '(1) Normal termination (return value 0)', @@ -30,7 +30,7 @@ def test_read_log_timeout(capsys): def test_parse_log(): - test_event_types: List[EventType] = [ + test_event_types = [ EventType.SUBMITTED, EventType.EXECUTING, EventType.OTHER, @@ -58,7 +58,7 @@ def test_send_event_data(mocker): ] [mocker.patch(mock) for mock in mock_list] - connection: pika.BlockingConnection = make_amqp_connection('nmtest') + connection = make_amqp_connection('nmtest') for event in WorkflowMonitor(log_path).events: send_event_data(connection, event) -- GitLab