Skip to content
Snippets Groups Projects
Commit 2373267d authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Removed variable type annotations by @dlyons' request

parent 7d1d61d2
No related branches found
No related tags found
No related merge requests found
......@@ -8,7 +8,7 @@ import argparse
import functools
import subprocess
import tracemalloc
from typing import List, Union, Dict, Callable, Tuple
from typing import List, Union, Dict, Callable, Tuple, Any
from pathlib import Path
import pika
......@@ -18,23 +18,39 @@ from pycapo import CapoConfig
from .wf_event import WorkflowEvent, EventType
from ._version import ___version___ as VERSION
WORKFLOW_STATUS_EXCH: str = 'workspaces.workflow-service.workflow-status'
WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status'
logger: logging.Logger = logging.getLogger('wf_monitor')
logger = logging.getLogger('wf_monitor')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
def logdec_factory(msg: str):
def logdec(f: Callable) -> Callable:
def log_decorator_factory(msg: str):
"""
Factory that produces log decorators with a given message
:param msg: Message to be logged
:return: Log decorator
"""
def log_decorator(f: Callable) -> Callable:
"""
Decorator that logs a message before and after a function is called
:param f: Function to be decorated
:return: Wrapper function
"""
@functools.wraps(f)
def wrapper(*args: Tuple, **kwargs: Dict) -> Callable:
def wrapper(*args: Tuple, **kwargs: Dict) -> Any:
"""
Decorator boilerplate: wrapper function for log decorator
:param args: args for function f
:param kwargs: kwargs for function f
:return: Return value of the function
"""
logger.info(msg)
result = f(*args, **kwargs)
logger.info("Done.")
return result
return wrapper
return logdec
return log_decorator
def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None:
......@@ -44,7 +60,7 @@ def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None:
:param signum: Signal number (would be signal.SIGALRM in this case)
:param frame: Stack frame the signal was sent from
"""
msg: str = 'Timeout elapsed when attempting to read HTCondor log file. The job either did ' \
msg = 'Timeout elapsed when attempting to read HTCondor log file. The job either did ' \
'not complete or the path to the log was not entered correctly.'
raise TimeoutError(msg)
......@@ -60,8 +76,8 @@ def get_job_name(body: str) -> str:
:param body: Body of the log entry
:return: Job name
"""
r_body_parse: str = r'[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)'
body_match: re.Match = re.search(r_body_parse, body)
r_body_parse = r'[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)'
body_match = re.search(r_body_parse, body)
return body_match.group('jobname')
......@@ -71,8 +87,8 @@ def get_year(logfile_path: Path) -> str:
:param logfile_path: Path to log file
:return: Year log was created as a string
"""
status: os.stat_result = os.stat(logfile_path)
date: pendulum.DateTime = pendulum.from_timestamp(status.st_ctime)
status = os.stat(logfile_path)
date = pendulum.from_timestamp(status.st_ctime)
return str(date.year)
......@@ -84,10 +100,10 @@ def fmt_timestamp(timestamp: str, year: str) -> str:
:param year: Year HTCondor log was created as a string
:return: Formatted timestamp string
"""
r_nums: str = r'[0-9]{2}'
r_timestamp: str = rf'(?P<mon>{r_nums})/(?P<d>{r_nums}) ' \
r_nums = r'[0-9]{2}'
r_timestamp = rf'(?P<mon>{r_nums})/(?P<d>{r_nums}) ' \
rf'(?P<h>{r_nums}):(?P<m>{r_nums}):(?P<s>{r_nums})'
match: re.Match = re.search(r_timestamp, timestamp)
match = re.search(r_timestamp, timestamp)
return '{year}-{mon}-{day}T{h}:{m}:{s}+07:00'.format(
year=year,
......@@ -105,9 +121,9 @@ def get_retval(body: str) -> Union[int, None]:
:param body: Body of event log entry
:return: Return value as an int
"""
r_retval: str = r'return value (?P<retval>[0-9]+)'
r_retval = r'return value (?P<retval>[0-9]+)'
retval: re.Match = re.search(r_retval, body)
retval = re.search(r_retval, body)
if retval:
return int(retval.group('retval'))
else:
......@@ -116,18 +132,18 @@ def get_retval(body: str) -> Union[int, None]:
class WorkflowMonitor:
def __init__(self, logfile_path: str, timeout: int = 60):
self.logfile_path: Path = Path(logfile_path)
self.logfile_path = Path(logfile_path)
try:
self.log: str = self.read_htcondor_log(timeout)
self.log = self.read_htcondor_log(timeout)
except TimeoutError as e:
# read_htcondor_log() timed out
logger.error(e)
quit(-1)
self.events: List[WorkflowEvent] = self.parse_log()
self.events = self.parse_log()
@logdec_factory('Reading HTCondor log...')
@log_decorator_factory('Reading HTCondor log...')
def read_htcondor_log(self, timeout: int) -> str:
"""
Waits for HTCondor job to terminate and reads the resulting log file
......@@ -145,7 +161,7 @@ class WorkflowMonitor:
with open(self.logfile_path, 'r') as f:
return f.read()
@logdec_factory('Parsing HTCondor log...')
@log_decorator_factory('Parsing HTCondor log...')
def parse_log(self) -> List[WorkflowEvent]:
"""
Parse log for relevant details:
......@@ -153,32 +169,32 @@ class WorkflowMonitor:
:return: List of WorkflowEvent objects
"""
# Regex that matches an event entry in the HTCondor log file.
r_htc_event: str = r'(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) ' \
r_htc_event = r'(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) ' \
r'(?P<timestamp>[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) ' \
r'(?P<desc>[a-zA-Z0-9 ?&:=<>\-._]+)\n' \
r'(?P<body>[\t ]+[^\.]*\n)*' \
r'...'
jobs: Dict[str, str] = {}
events: List[WorkflowEvent] = []
jobs = {}
events = []
if sum(1 for _ in re.finditer(r_htc_event, self.log)) == 0:
raise ValueError('HTCondor log file not well-formatted. Was the file edited?')
for match in re.finditer(r_htc_event, self.log):
job_id: str = match.group('jobnum')
job_id = match.group('jobnum')
if match.group('eventnum') == '000':
# Populate jobs dict with job name
jobs[job_id] = get_job_name(match.group('body'))
try:
event_type: EventType = EventType(int(match.group('eventnum')))
event_type = EventType(int(match.group('eventnum')))
except ValueError:
event_type: EventType = EventType.OTHER
event_type = EventType.OTHER
if event_type == EventType.TERMINATED:
retval: Union[int, None] = get_retval(match.group('body'))
retval = get_retval(match.group('body'))
else:
retval: Union[int, None] = None
retval = None
year: str = get_year(self.logfile_path)
year = get_year(self.logfile_path)
events.append(WorkflowEvent(
job_name=jobs[job_id],
job_id=job_id,
......@@ -206,7 +222,7 @@ def make_arg_parser() -> argparse.ArgumentParser:
Initialize argument parser for command-line arguments
:return: Argument parser
"""
parser: argparse.ArgumentParser = argparse.ArgumentParser(
parser = argparse.ArgumentParser(
description=_DESCRIPTION.format(VERSION),
formatter_class=argparse.RawTextHelpFormatter
)
......@@ -217,7 +233,7 @@ def make_arg_parser() -> argparse.ArgumentParser:
return parser
@logdec_factory('Establishing connection to the AMQP server...')
@log_decorator_factory('Establishing connection to the AMQP server...')
def make_amqp_connection(profile: str) -> pika.BlockingConnection:
"""
Initialize connection to AMQP server
......@@ -226,9 +242,9 @@ def make_amqp_connection(profile: str) -> pika.BlockingConnection:
"""
capo_cfg = CapoConfig(profile=profile)
hostname: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname')
username: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username')
password: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password')
hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname')
username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username')
password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password')
return pika.BlockingConnection(
pika.ConnectionParameters(
......@@ -241,14 +257,14 @@ def make_amqp_connection(profile: str) -> pika.BlockingConnection:
)
@logdec_factory('Sending event data to the channel...')
@log_decorator_factory('Sending event data to the channel...')
def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent):
"""
Takes in a JSON-formatted string and sends it off to the workflow status exchange
:param event: JSON string of event metadata
"""
# Send data to exchange
channel: pika.channel.Channel = connection.channel()
channel = connection.channel()
channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True)
return channel.basic_publish(
exchange=WORKFLOW_STATUS_EXCH,
......@@ -259,8 +275,8 @@ def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent):
def main():
# Parse command-line args
args: argparse.Namespace = make_arg_parser().parse_args()
connection: pika.BlockingConnection = make_amqp_connection(args.profile)
args = make_arg_parser().parse_args()
connection = make_amqp_connection(args.profile)
monitor = WorkflowMonitor(args.log_path)
for event in monitor.events:
......
......@@ -19,15 +19,15 @@ class WorkflowEvent:
log: str,
retval: int = None
):
self.job_name: str = job_name
self.job_id: str = str(job_id)
self.type: EventType = event_type
self.timestamp: str = timestamp
self.log: str = log
self.retval: int = retval
self.job_name = job_name
self.job_id = str(job_id)
self.type = event_type
self.timestamp = timestamp
self.log = log
self.retval = retval
def json(self) -> str:
d: Dict[str, str] = {
d = {
'job_name': self.job_name,
'type': str(self.type),
'timestamp': self.timestamp,
......
......@@ -6,12 +6,12 @@ import pytest_mock
from wf_monitor.wf_event import WorkflowEvent, EventType
from wf_monitor.monitor import WorkflowMonitor, send_event_data, make_amqp_connection
log_path: str = 'logs/condor.log'
test_monitor: WorkflowMonitor = WorkflowMonitor(log_path)
log_path = 'logs/condor.log'
test_monitor = WorkflowMonitor(log_path)
def test_read_log():
test_strs: List[str] = [
test_strs = [
'Image size of job updated: 432',
'432 - ResidentSetSize of job (KB)',
'(1) Normal termination (return value 0)',
......@@ -30,7 +30,7 @@ def test_read_log_timeout(capsys):
def test_parse_log():
test_event_types: List[EventType] = [
test_event_types = [
EventType.SUBMITTED,
EventType.EXECUTING,
EventType.OTHER,
......@@ -58,7 +58,7 @@ def test_send_event_data(mocker):
]
[mocker.patch(mock) for mock in mock_list]
connection: pika.BlockingConnection = make_amqp_connection('nmtest')
connection = make_amqp_connection('nmtest')
for event in WorkflowMonitor(log_path).events:
send_event_data(connection, event)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment