Skip to content
Snippets Groups Projects
Commit 86035d68 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Added tests; added new class WorkflowEvent; started filling out method logic in both src files

parent afbaaba0
No related branches found
No related tags found
No related merge requests found
import os
import json
import re
import time
import signal
import tracemalloc
from typing import List
from pathlib import Path
import pika
class WorkflowEvent:
def __init__(self, logfile_path: str):
self.log = self.read_htcondor_log(Path(logfile_path))
self.job_metadata = self.parse_log()
self.job_json = {}
for job_metadatum in self.job_metadata:
self.job_json[job_metadatum['name']] = self.job_to_amqp_event(job_metadatum)
from .wf_event import WorkflowEvent, EventType
def read_htcondor_log(self, logfile_path: Path, timeout: int = 60):
start_time = time.localtime()
if os.path.exists(logfile_path):
with open(logfile_path, 'r') as f:
return f.read()
else:
WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status'
def parse_log(self) -> dict:
def timeout_handler(signum: int, frame: tracemalloc.Frame):
"""
Signal handler function for handling a SIGALRM that arrives when WorkflowMonitor.read_log()
times out.
:param signum: Signal number (would be signal.SIGALRM in this case)
:param frame: Stack frame the signal was sent from
"""
msg = 'Timeout elapsed when attempting to read HTCondor log file. The job probably did ' \
'not complete.'
raise TimeoutError(msg)
# Timeout handling solution found here:
# https://stackoverflow.com/questions/492519/timeout-on-a-function-call
signal.signal(signal.SIGALRM, timeout_handler)
def get_job_name(body: str) -> str:
"""
Parses a log entry body string for the name of the job currently executing
:param body: Body of the log entry
:return: Job name
"""
r_body_parse = r'[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)'
body_match = re.search(r_body_parse, body)
return body_match.group('jobname')
def fmt_timestamp(timestamp: str) -> str:
"""
Format timestamp string into ISO 8601 format
2020-08-27T21:45:09+07:00
:param timestamp: Unformatted timestamp string
:return: Formatted timestamp string
"""
r_nums = r'[0-9]{2}'
r_timestamp = rf'(?P<mon>{r_nums})/(?P<d>{r_nums}) ' \
rf'(?P<h>{r_nums}):(?P<m>{r_nums}):(?P<s>{r_nums})'
match = re.search(r_timestamp, timestamp)
return '{year}-{mon}-{day}T{h}:{m}:{s}+07:00'.format(
# FIXME: HTCondor logs don't record year...
year='2020',
mon=match.group('mon'),
day=match.group('d'),
h=match.group('h'),
m=match.group('m'),
s=match.group('s')
)
class WorkflowMonitor:
def __init__(self, logfile_path: str, timeout: int = 60):
try:
self.log = self.read_htcondor_log(Path(logfile_path), timeout)
except TimeoutError as e:
# read_htcondor_log() timed out
print(e)
quit(-1)
self.events = self.parse_log()
for event in self.events:
self.send_event_data(event.json())
def read_htcondor_log(self, logfile_path: Path, timeout: int) -> str:
"""
Waits for HTCondor job to terminate and reads the resulting log file
:param logfile_path: Path to log file
:param timeout: Time in seconds to wait for the job to finish
:return: String contents of the log file
"""
signal.alarm(timeout)
while not os.path.exists(logfile_path):
time.sleep(3)
with open(logfile_path, 'r') as f:
return f.read()
def parse_log(self) -> List[WorkflowEvent]:
"""
Parse log for relevant details:
- For each job:
- For each job, create valid WorkflowEvent objects based on:
- Job name
- If/when the job executed
- If/when/why the job terminated
:return: Dictionary of important metadata for each job
:return: List of WorkflowEvent objects
"""
pass
# Regex that matches an event entry in the HTCondor log file.
r_htc_event = r'(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) ' \
r'(?P<timestamp>[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) ' \
r'(?P<desc>[a-zA-Z0-9 ?&:=<>\-._]+)\n' \
r'(?P<body>[\t ]+[^\.]*\n)*' \
r'...'
jobs = {}
events = []
for match in re.finditer(r_htc_event, self.log):
job_num = match.group('jobnum')
if match.group('eventnum') == '000':
# Populate jobs dict with job name
jobs[job_num] = get_job_name(match.group('body'))
try:
event_type = EventType(match.group('eventnum'))
except ValueError:
event_type = EventType.OTHER
# TODO: Add other regex groups as fields
events.append(WorkflowEvent(
job_name=jobs[job_num],
event_type=event_type,
timestamp=fmt_timestamp(match.group('timestamp')),
log=match.group(0)
))
return events
def job_to_amqp_event(self, job_metadatum: dict):
def send_event_data(self, event_metadatum: str):
"""
Converts job metadata dictionary into a JSON string and ...
:param job_metadatum: Job metadata
:return: JSON string of job metadata
Takes in a JSON-formatted string and sends it off to the workflow status exchange
:param event_metadatum: JSON string of event metadata
"""
return json.dumps(job_metadatum) # ?
\ No newline at end of file
# Send data to exchange
pass
\ No newline at end of file
import enum
import json
class EventType(enum.Enum):
SUBMITTED = 0
EXECUTING = 1
TERMINATED = 5
OTHER = -1
class WorkflowEvent:
def __init__(self, job_name: str,
event_type: EventType,
timestamp: str,
log: str,
retval: int = None):
self.job_name = job_name
self.type = event_type
self.timestamp = timestamp
self.retval = retval
self.log = log
def json(self) -> str:
d = {
'job_name': self.job_name,
'type': str(self.type),
'timestamp': self.timestamp,
'return_value': self.retval,
'log': self.log
}
return json.dumps(d)
from wf_monitor.wf_event import WorkflowEvent, EventType
from wf_monitor.monitor import WorkflowMonitor
log_path = 'apps/cli/utilities/wf_monitor/logs/condor.log'
test_monitor = WorkflowMonitor(log_path)
def test_read_log():
test_strs = [
'Image size of job updated: 432',
'432 - ResidentSetSize of job (KB)',
'(1) Normal termination (return value 0)',
'000 (3983.000.000) 08/26 11:06:06 Job submitted from host: '
'<10.64.1.178:9618?addrs=10.64.1.178-9618&noUDP&sock=4050180_7e37_3>'
]
for s in test_strs:
assert s in test_monitor.log
def test_parse_log():
test_event_types = [
EventType.SUBMITTED,
EventType.EXECUTING,
EventType.OTHER,
EventType.TERMINATED,
EventType.SUBMITTED,
EventType.EXECUTING,
EventType.OTHER,
EventType.TERMINATED
]
for e, e_type in zip(test_monitor.events, test_event_types):
assert e.type == e_type
def test_send_event_data():
pass
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment