Skip to content
Snippets Groups Projects
Commit 9329ae90 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Finished implementing send_event_data(); did some refactoring

parent b8c8a1e7
No related branches found
No related tags found
No related merge requests found
......@@ -31,8 +31,8 @@ def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None:
:param signum: Signal number (would be signal.SIGALRM in this case)
:param frame: Stack frame the signal was sent from
"""
msg: str = 'Timeout elapsed when attempting to read HTCondor log file. The job probably did ' \
'not complete.'
msg: str = 'Timeout elapsed when attempting to read HTCondor log file. The job either did ' \
'not complete or the path to the log was not entered correctly.'
raise TimeoutError(msg)
......@@ -107,7 +107,7 @@ class WorkflowMonitor:
logger.info("Reading log file...")
try:
self.log: str = self.read_htcondor_log(self.logfile_path, timeout)
self.log: str = self.read_htcondor_log(timeout)
except TimeoutError as e:
# read_htcondor_log() timed out
logger.error(e)
......@@ -118,12 +118,7 @@ class WorkflowMonitor:
self.events: List[WorkflowEvent] = self.parse_log()
logger.info('Log file parsed.')
logger.info('Sending event data...')
for event in self.events:
self.send_event_data(event.json())
logger.info('Event data sent.')
def read_htcondor_log(self, logfile_path: Path, timeout: int) -> str:
def read_htcondor_log(self, timeout: int) -> str:
"""
Waits for HTCondor job to terminate and reads the resulting log file
:param logfile_path: Path to log file
......@@ -132,12 +127,12 @@ class WorkflowMonitor:
"""
signal.alarm(timeout)
while not os.path.exists(logfile_path):
while not os.path.exists(self.logfile_path):
logger.info('Attempting to read log file...')
time.sleep(3)
logger.info('Log file exists.')
with open(logfile_path, 'r') as f:
with open(self.logfile_path, 'r') as f:
return f.read()
def parse_log(self) -> List[WorkflowEvent]:
......@@ -154,11 +149,13 @@ class WorkflowMonitor:
r'...'
jobs: Dict[str, str] = {}
events: List[WorkflowEvent] = []
if sum(1 for _ in re.finditer(r_htc_event, self.log)) == 0:
raise ValueError('HTCondor log file not well-formatted. Was the file edited?')
for match in re.finditer(r_htc_event, self.log):
job_num: str = match.group('jobnum')
job_id: str = match.group('jobnum')
if match.group('eventnum') == '000':
# Populate jobs dict with job name
jobs[job_num] = get_job_name(match.group('body'))
jobs[job_id] = get_job_name(match.group('body'))
try:
event_type: EventType = EventType(int(match.group('eventnum')))
......@@ -172,7 +169,8 @@ class WorkflowMonitor:
year: str = get_year(self.logfile_path)
events.append(WorkflowEvent(
job_name=jobs[job_num],
job_name=jobs[job_id],
job_id=job_id,
event_type=event_type,
timestamp=fmt_timestamp(match.group('timestamp'), year),
log=match.group(0),
......@@ -181,19 +179,6 @@ class WorkflowMonitor:
return events
def send_event_data(self, connection: pika.BlockingConnection, event_metadatum: str):
"""
Takes in a JSON-formatted string and sends it off to the workflow status exchange
:param event_metadatum: JSON string of event metadata
"""
# Send data to exchange
channel: pika.channel.Channel = connection.channel()
channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True)
channel.basic_publish(
exchange=WORKFLOW_STATUS_EXCH,
# routing_key=?
)
_DESCRIPTION = 'Workspaces workflow monitor, version {}. Monitor execution of a workflow from ' \
'the command line.'
......@@ -208,10 +193,9 @@ def make_arg_parser() -> argparse.ArgumentParser:
description=_DESCRIPTION.format(VERSION),
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('-P', '--profile', action='store', required=True,
help='profile name to use; e.g. test, production')
parser.add_argument('log_path', action='store', required=True,
help='path to the workflow\'s HTCondor log')
parser.add_argument('-P', '--profile', action='store', required=False, default='nmtest',
help='profile name to use; e.g. test, production; DEFAULT: nmtest')
parser.add_argument('log_path', action='store', help='path to the workflow\'s HTCondor log')
return parser
......@@ -238,7 +222,29 @@ def make_amqp_connection(profile: str) -> pika.BlockingConnection:
)
)
def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent):
"""
Takes in a JSON-formatted string and sends it off to the workflow status exchange
:param event: JSON string of event metadata
"""
# Send data to exchange
logger.info('Sending event data...')
channel: pika.channel.Channel = connection.channel()
channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True)
logger.info('Event data sent.')
return channel.basic_publish(
exchange=WORKFLOW_STATUS_EXCH,
routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}',
body=event.json()
)
def main():
# Parse command-line args
args: argparse.Namespace = make_arg_parser().parse_args()
connection: pika.BlockingConnection = make_amqp_connection(args.profile)
monitor = WorkflowMonitor(args.log_path)
for event in monitor.events:
send_event_data(connection, event)
......@@ -10,12 +10,17 @@ class EventType(enum.Enum):
OTHER = -1
class WorkflowEvent:
def __init__(self, job_name: str,
event_type: EventType,
timestamp: str,
log: str,
retval: int = None):
def __init__(
self,
job_name: str,
job_id: str,
event_type: EventType,
timestamp: str,
log: str,
retval: int = None
):
self.job_name: str = job_name
self.job_id: str = str(job_id)
self.type: EventType = event_type
self.timestamp: str = timestamp
self.log: str = log
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment