From 9329ae90d31487e9c2898dde918347ab22062908 Mon Sep 17 00:00:00 2001 From: nhertz <nhertz@nrao.edu> Date: Tue, 1 Sep 2020 17:27:15 -0600 Subject: [PATCH] Finished implementing send_event_data(); did some refactoring --- .../wf_monitor/src/wf_monitor/monitor.py | 68 ++++++++++--------- .../wf_monitor/src/wf_monitor/wf_event.py | 15 ++-- 2 files changed, 47 insertions(+), 36 deletions(-) diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index 00efb6191..2a9848e61 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -31,8 +31,8 @@ def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None: :param signum: Signal number (would be signal.SIGALRM in this case) :param frame: Stack frame the signal was sent from """ - msg: str = 'Timeout elapsed when attempting to read HTCondor log file. The job probably did ' \ - 'not complete.' + msg: str = 'Timeout elapsed when attempting to read HTCondor log file. The job either did ' \ + 'not complete or the path to the log was not entered correctly.' raise TimeoutError(msg) @@ -107,7 +107,7 @@ class WorkflowMonitor: logger.info("Reading log file...") try: - self.log: str = self.read_htcondor_log(self.logfile_path, timeout) + self.log: str = self.read_htcondor_log(timeout) except TimeoutError as e: # read_htcondor_log() timed out logger.error(e) @@ -118,12 +118,7 @@ class WorkflowMonitor: self.events: List[WorkflowEvent] = self.parse_log() logger.info('Log file parsed.') - logger.info('Sending event data...') - for event in self.events: - self.send_event_data(event.json()) - logger.info('Event data sent.') - - def read_htcondor_log(self, logfile_path: Path, timeout: int) -> str: + def read_htcondor_log(self, timeout: int) -> str: """ Waits for HTCondor job to terminate and reads the resulting log file :param logfile_path: Path to log file @@ -132,12 +127,12 @@ class WorkflowMonitor: """ signal.alarm(timeout) - while not os.path.exists(logfile_path): + while not os.path.exists(self.logfile_path): logger.info('Attempting to read log file...') time.sleep(3) logger.info('Log file exists.') - with open(logfile_path, 'r') as f: + with open(self.logfile_path, 'r') as f: return f.read() def parse_log(self) -> List[WorkflowEvent]: @@ -154,11 +149,13 @@ class WorkflowMonitor: r'...' jobs: Dict[str, str] = {} events: List[WorkflowEvent] = [] + if sum(1 for _ in re.finditer(r_htc_event, self.log)) == 0: + raise ValueError('HTCondor log file not well-formatted. Was the file edited?') for match in re.finditer(r_htc_event, self.log): - job_num: str = match.group('jobnum') + job_id: str = match.group('jobnum') if match.group('eventnum') == '000': # Populate jobs dict with job name - jobs[job_num] = get_job_name(match.group('body')) + jobs[job_id] = get_job_name(match.group('body')) try: event_type: EventType = EventType(int(match.group('eventnum'))) @@ -172,7 +169,8 @@ class WorkflowMonitor: year: str = get_year(self.logfile_path) events.append(WorkflowEvent( - job_name=jobs[job_num], + job_name=jobs[job_id], + job_id=job_id, event_type=event_type, timestamp=fmt_timestamp(match.group('timestamp'), year), log=match.group(0), @@ -181,19 +179,6 @@ class WorkflowMonitor: return events - def send_event_data(self, connection: pika.BlockingConnection, event_metadatum: str): - """ - Takes in a JSON-formatted string and sends it off to the workflow status exchange - :param event_metadatum: JSON string of event metadata - """ - # Send data to exchange - channel: pika.channel.Channel = connection.channel() - channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True) - channel.basic_publish( - exchange=WORKFLOW_STATUS_EXCH, - # routing_key=? - ) - _DESCRIPTION = 'Workspaces workflow monitor, version {}. Monitor execution of a workflow from ' \ 'the command line.' @@ -208,10 +193,9 @@ def make_arg_parser() -> argparse.ArgumentParser: description=_DESCRIPTION.format(VERSION), formatter_class=argparse.RawTextHelpFormatter ) - parser.add_argument('-P', '--profile', action='store', required=True, - help='profile name to use; e.g. test, production') - parser.add_argument('log_path', action='store', required=True, - help='path to the workflow\'s HTCondor log') + parser.add_argument('-P', '--profile', action='store', required=False, default='nmtest', + help='profile name to use; e.g. test, production; DEFAULT: nmtest') + parser.add_argument('log_path', action='store', help='path to the workflow\'s HTCondor log') return parser @@ -238,7 +222,29 @@ def make_amqp_connection(profile: str) -> pika.BlockingConnection: ) ) + +def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent): + """ + Takes in a JSON-formatted string and sends it off to the workflow status exchange + :param event: JSON string of event metadata + """ + # Send data to exchange + logger.info('Sending event data...') + channel: pika.channel.Channel = connection.channel() + channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True) + logger.info('Event data sent.') + return channel.basic_publish( + exchange=WORKFLOW_STATUS_EXCH, + routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', + body=event.json() + ) + + def main(): # Parse command-line args args: argparse.Namespace = make_arg_parser().parse_args() connection: pika.BlockingConnection = make_amqp_connection(args.profile) + monitor = WorkflowMonitor(args.log_path) + + for event in monitor.events: + send_event_data(connection, event) diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py index 023875721..7fb68490e 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py @@ -10,12 +10,17 @@ class EventType(enum.Enum): OTHER = -1 class WorkflowEvent: - def __init__(self, job_name: str, - event_type: EventType, - timestamp: str, - log: str, - retval: int = None): + def __init__( + self, + job_name: str, + job_id: str, + event_type: EventType, + timestamp: str, + log: str, + retval: int = None + ): self.job_name: str = job_name + self.job_id: str = str(job_id) self.type: EventType = event_type self.timestamp: str = timestamp self.log: str = log -- GitLab