Skip to content
Snippets Groups Projects
Commit 2b55cff6 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Merge branch 'SSA-6323-workflow-status-monitoring' into SSA-6320-workflow-service

parents 3392bcec e91a0f3c
No related branches found
No related tags found
No related merge requests found
# Workflow Monitor
# Workspaces Workflow Monitor
Reads in an HTCondor log file and parses the relevant data from it, splitting it into events and formatting those into messages that are sent to the workspaces.workflow-service.workflow-status RabbitMQ exchange.
```
usage: wf_monitor [-h] [-P PROFILE] log_path
Workspaces workflow monitor, version 4.0.0a1.dev1. Monitor execution of a workflow from the command line.
positional arguments:
log_path path to the workflow's HTCondor log
optional arguments:
-h, --help show this help message and exit
-P PROFILE, --profile PROFILE
profile name to use; e.g. test, production; DEFAULT: nmtest
```
## Design
- Reads in HTCondor logs
- HTCondor executes a job
......
this will trigger an exception!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
\ No newline at end of file
......@@ -8,11 +8,12 @@ VERSION = open('src/wf_monitor/_version.py').readlines()[-1].split()[-1].strip("
README = Path('README.md').read_text()
requires = [
'pika',
'pendulum'
'pika==1.1',
'pendulum==2.1.2',
]
tests_require = [
'pytest>=5.4,<6.0'
'pytest>=5.4,<6.0',
'pytest-mock==3.3.1',
]
setup(
......@@ -24,7 +25,7 @@ setup(
author_email='dms-ssa@nrao.edu',
url='TBD',
license="GPL",
# install_requires=requires,
install_requires=requires,
tests_require=tests_require,
keywords=[],
packages=['wf_monitor'],
......@@ -33,6 +34,6 @@ setup(
'Programming Language :: Python :: 3.8'
],
entry_points={
'console_scripts': ['']
'console_scripts': ['wf_monitor = wf_monitor.monitor:main']
},
)
......@@ -5,9 +5,9 @@ import time
import signal
import logging
import argparse
import subprocess
import functools
import tracemalloc
from typing import List, Union, Dict
from typing import List, Union, Dict, Callable, Tuple, Any
from pathlib import Path
import pika
......@@ -17,13 +17,41 @@ from pycapo import CapoConfig
from .wf_event import WorkflowEvent, EventType
from ._version import ___version___ as VERSION
WORKFLOW_STATUS_EXCH: str = 'workspaces.workflow-service.workflow-status'
WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status'
logger: logging.Logger = logging.getLogger('wf_monitor')
logger = logging.getLogger('wf_monitor')
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
def log_decorator_factory(msg: str):
"""
Factory that produces log decorators with a given message
:param msg: Message to be logged
:return: Log decorator
"""
def log_decorator(f: Callable) -> Callable:
"""
Decorator that logs a message before and after a function is called
:param f: Function to be decorated
:return: Wrapper function
"""
@functools.wraps(f)
def wrapper(*args: Tuple, **kwargs: Dict) -> Any:
"""
Decorator boilerplate: wrapper function for log decorator
:param args: args for function f
:param kwargs: kwargs for function f
:return: Return value of the function
"""
logger.info(msg)
result = f(*args, **kwargs)
logger.info("Done.")
return result
return wrapper
return log_decorator
def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None:
"""
Signal handler function for handling a SIGALRM that arrives when WorkflowMonitor.read_log()
......@@ -31,8 +59,8 @@ def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None:
:param signum: Signal number (would be signal.SIGALRM in this case)
:param frame: Stack frame the signal was sent from
"""
msg: str = 'Timeout elapsed when attempting to read HTCondor log file. The job probably did ' \
'not complete.'
msg = 'Timeout elapsed when attempting to read HTCondor log file. The job either did ' \
'not complete or the path to the log was not entered correctly.'
raise TimeoutError(msg)
......@@ -47,8 +75,8 @@ def get_job_name(body: str) -> str:
:param body: Body of the log entry
:return: Job name
"""
r_body_parse: str = r'[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)'
body_match: re.Match = re.search(r_body_parse, body)
r_body_parse = r'[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)'
body_match = re.search(r_body_parse, body)
return body_match.group('jobname')
......@@ -58,8 +86,8 @@ def get_year(logfile_path: Path) -> str:
:param logfile_path: Path to log file
:return: Year log was created as a string
"""
status: os.stat_result = os.stat(logfile_path)
date: pendulum.DateTime = pendulum.from_timestamp(status.st_ctime)
status = os.stat(logfile_path)
date = pendulum.from_timestamp(status.st_ctime)
return str(date.year)
......@@ -71,10 +99,10 @@ def fmt_timestamp(timestamp: str, year: str) -> str:
:param year: Year HTCondor log was created as a string
:return: Formatted timestamp string
"""
r_nums: str = r'[0-9]{2}'
r_timestamp: str = rf'(?P<mon>{r_nums})/(?P<d>{r_nums}) ' \
r_nums = r'[0-9]{2}'
r_timestamp = rf'(?P<mon>{r_nums})/(?P<d>{r_nums}) ' \
rf'(?P<h>{r_nums}):(?P<m>{r_nums}):(?P<s>{r_nums})'
match: re.Match = re.search(r_timestamp, timestamp)
match = re.search(r_timestamp, timestamp)
return '{year}-{mon}-{day}T{h}:{m}:{s}+07:00'.format(
year=year,
......@@ -92,9 +120,9 @@ def get_retval(body: str) -> Union[int, None]:
:param body: Body of event log entry
:return: Return value as an int
"""
r_retval: str = r'return value (?P<retval>[0-9]+)'
r_retval = r'return value (?P<retval>[0-9]+)'
retval: re.Match = re.search(r_retval, body)
retval = re.search(r_retval, body)
if retval:
return int(retval.group('retval'))
else:
......@@ -102,28 +130,23 @@ def get_retval(body: str) -> Union[int, None]:
class WorkflowMonitor:
"""
Class that monitors the events in a given workflow via the log file produced by it
"""
def __init__(self, logfile_path: str, timeout: int = 60):
self.logfile_path: Path = Path(logfile_path)
self.logfile_path = Path(logfile_path)
logger.info("Reading log file...")
try:
self.log: str = self.read_htcondor_log(self.logfile_path, timeout)
self.log = self.read_htcondor_log(timeout)
except TimeoutError as e:
# read_htcondor_log() timed out
logger.error(e)
quit(-1)
logger.info('Log file read successfully.')
logger.info('Parsing log file...')
self.events: List[WorkflowEvent] = self.parse_log()
logger.info('Log file parsed.')
self.events = self.parse_log()
logger.info('Sending event data...')
for event in self.events:
self.send_event_data(event.json())
logger.info('Event data sent.')
def read_htcondor_log(self, logfile_path: Path, timeout: int) -> str:
@log_decorator_factory('Reading HTCondor log...')
def read_htcondor_log(self, timeout: int) -> str:
"""
Waits for HTCondor job to terminate and reads the resulting log file
:param logfile_path: Path to log file
......@@ -132,14 +155,15 @@ class WorkflowMonitor:
"""
signal.alarm(timeout)
while not os.path.exists(logfile_path):
while not os.path.exists(self.logfile_path):
logger.info('Attempting to read log file...')
time.sleep(3)
logger.info('Log file exists.')
with open(logfile_path, 'r') as f:
logger.info('Log file found.')
with open(self.logfile_path, 'r') as f:
return f.read()
@log_decorator_factory('Parsing HTCondor log...')
def parse_log(self) -> List[WorkflowEvent]:
"""
Parse log for relevant details:
......@@ -147,32 +171,36 @@ class WorkflowMonitor:
:return: List of WorkflowEvent objects
"""
# Regex that matches an event entry in the HTCondor log file.
r_htc_event: str = r'(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) ' \
r'(?P<timestamp>[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) ' \
r'(?P<desc>[a-zA-Z0-9 ?&:=<>\-._]+)\n' \
r'(?P<body>[\t ]+[^\.]*\n)*' \
r'...'
jobs: Dict[str, str] = {}
events: List[WorkflowEvent] = []
r_htc_event = r'(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) ' \
r'(?P<timestamp>[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) ' \
r'(?P<desc>[a-zA-Z0-9 ?&:=<>\-._]+)\n' \
r'(?P<body>[\t ]+[^\.]*\n)*' \
r'...'
jobs = {}
events = []
if sum(1 for _ in re.finditer(r_htc_event, self.log)) == 0:
# Regex didn't return any matches
raise ValueError('HTCondor log file not well-formatted. Was the file edited?')
for match in re.finditer(r_htc_event, self.log):
job_num: str = match.group('jobnum')
job_id = match.group('jobnum')
if match.group('eventnum') == '000':
# Populate jobs dict with job name
jobs[job_num] = get_job_name(match.group('body'))
jobs[job_id] = get_job_name(match.group('body'))
try:
event_type: EventType = EventType(int(match.group('eventnum')))
event_type = EventType(int(match.group('eventnum')))
except ValueError:
event_type: EventType = EventType.OTHER
event_type = EventType.OTHER
if event_type == EventType.TERMINATED:
retval: Union[int, None] = get_retval(match.group('body'))
retval = get_retval(match.group('body'))
else:
retval: Union[int, None] = None
retval = None
year: str = get_year(self.logfile_path)
year = get_year(self.logfile_path)
events.append(WorkflowEvent(
job_name=jobs[job_num],
job_name=jobs[job_id],
job_id=job_id,
event_type=event_type,
timestamp=fmt_timestamp(match.group('timestamp'), year),
log=match.group(0),
......@@ -181,18 +209,11 @@ class WorkflowMonitor:
return events
def send_event_data(self, connection: pika.BlockingConnection, event_metadatum: str):
"""
Takes in a JSON-formatted string and sends it off to the workflow status exchange
:param event_metadatum: JSON string of event metadata
"""
# Send data to exchange
channel: pika.channel.Channel = connection.channel()
channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True)
channel.basic_publish(
exchange=WORKFLOW_STATUS_EXCH,
# routing_key=?
)
def __str__(self):
return f'WorkflowMonitor, monitoring {self.logfile_path} that has events {self.events}'
def __repr__(self):
return f'{self.__class__.__name__} ({self.logfile_path}, {self.events})'
_DESCRIPTION = 'Workspaces workflow monitor, version {}. Monitor execution of a workflow from ' \
......@@ -204,18 +225,18 @@ def make_arg_parser() -> argparse.ArgumentParser:
Initialize argument parser for command-line arguments
:return: Argument parser
"""
parser: argparse.ArgumentParser = argparse.ArgumentParser(
parser = argparse.ArgumentParser(
description=_DESCRIPTION.format(VERSION),
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('-P', '--profile', action='store', required=True,
help='profile name to use; e.g. test, production')
parser.add_argument('log_path', action='store', required=True,
help='path to the workflow\'s HTCondor log')
parser.add_argument('-P', '--profile', action='store', required=False, default='nmtest',
help='profile name to use; e.g. test, production; DEFAULT: nmtest')
parser.add_argument('log_path', action='store', help='path to the workflow\'s HTCondor log')
return parser
@log_decorator_factory('Establishing connection to the AMQP server...')
def make_amqp_connection(profile: str) -> pika.BlockingConnection:
"""
Initialize connection to AMQP server
......@@ -224,9 +245,9 @@ def make_amqp_connection(profile: str) -> pika.BlockingConnection:
"""
capo_cfg = CapoConfig(profile=profile)
hostname: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname')
username: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username')
password: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password')
hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname')
username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username')
password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password')
return pika.BlockingConnection(
pika.ConnectionParameters(
......@@ -238,7 +259,28 @@ def make_amqp_connection(profile: str) -> pika.BlockingConnection:
)
)
@log_decorator_factory('Sending event data to the channel...')
def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent):
"""
Takes in a JSON-formatted string and sends it off to the workflow status exchange
:param event: JSON string of event metadata
"""
# Send data to exchange
channel = connection.channel()
channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True)
return channel.basic_publish(
exchange=WORKFLOW_STATUS_EXCH,
routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}',
body=event.json()
)
def main():
# Parse command-line args
args: argparse.Namespace = make_arg_parser().parse_args()
connection: pika.BlockingConnection = make_amqp_connection(args.profile)
args = make_arg_parser().parse_args()
connection = make_amqp_connection(args.profile)
monitor = WorkflowMonitor(args.log_path)
for event in monitor.events:
send_event_data(connection, event)
......@@ -10,19 +10,24 @@ class EventType(enum.Enum):
OTHER = -1
class WorkflowEvent:
def __init__(self, job_name: str,
event_type: EventType,
timestamp: str,
log: str,
retval: int = None):
self.job_name: str = job_name
self.type: EventType = event_type
self.timestamp: str = timestamp
self.log: str = log
self.retval: int = retval
def __init__(
self,
job_name: str,
job_id: str,
event_type: EventType,
timestamp: str,
log: str,
retval: int = None
):
self.job_name = job_name
self.job_id = str(job_id)
self.type = event_type
self.timestamp = timestamp
self.log = log
self.retval = retval
def json(self) -> str:
d: Dict[str, str] = {
d = {
'job_name': self.job_name,
'type': str(self.type),
'timestamp': self.timestamp,
......@@ -32,7 +37,7 @@ class WorkflowEvent:
return json.dumps(d)
def __str__(self):
return self.json()
return f'WorkflowEvent with data {self.json()}'
def __repr__(self):
return f'<WorkflowEvent {self.__dict__}>'
\ No newline at end of file
return f'<wf_monitor.WorkflowEvent {self.__dict__}>'
from wf_monitor.wf_event import WorkflowEvent, EventType
from wf_monitor.monitor import WorkflowMonitor
import pytest
from wf_monitor.wf_event import EventType
from wf_monitor.monitor import WorkflowMonitor, send_event_data, make_amqp_connection
log_path = 'logs/condor.log'
test_monitor = WorkflowMonitor(log_path)
......@@ -17,6 +18,13 @@ def test_read_log():
assert s in test_monitor.log
def test_read_log_timeout(capsys):
with pytest.raises(SystemExit) as sys_ex:
WorkflowMonitor('logs/file-that-does-not-exist.txt', 1)
assert sys_ex.type == SystemExit
assert sys_ex.value.code == -1
def test_parse_log():
test_event_types = [
EventType.SUBMITTED,
......@@ -32,5 +40,22 @@ def test_parse_log():
assert e.type == e_type
def test_send_event_data():
pass
def test_parse_log_error():
with pytest.raises(ValueError) as val_err:
WorkflowMonitor('logs/test.log')
assert val_err.type == ValueError
def test_send_event_data(mocker):
mock_list = [
'pika.adapters.blocking_connection.BlockingChannel.basic_publish',
'wf_monitor.monitor.make_amqp_connection',
'wf_monitor.monitor.send_event_data',
]
[mocker.patch(mock) for mock in mock_list]
connection = make_amqp_connection('nmtest')
for event in WorkflowMonitor(log_path).events:
send_event_data(connection, event)
assert connection.channel().basic_publish.call_count == 8
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment