From 885441cd2f219b3b4f309e3374c250e5c841620d Mon Sep 17 00:00:00 2001 From: nhertz <nhertz@nrao.edu> Date: Mon, 31 Aug 2020 16:32:50 -0600 Subject: [PATCH] Added command-line arg support; started implementing send_event_data() --- .../wf_monitor/src/wf_monitor/monitor.py | 61 ++++++++++++++++++- 1 file changed, 59 insertions(+), 2 deletions(-) diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index e69de6d9b..00efb6191 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -4,6 +4,7 @@ import sys import time import signal import logging +import argparse import subprocess import tracemalloc from typing import List, Union, Dict @@ -11,8 +12,10 @@ from pathlib import Path import pika import pendulum +from pycapo import CapoConfig from .wf_event import WorkflowEvent, EventType +from ._version import ___version___ as VERSION WORKFLOW_STATUS_EXCH: str = 'workspaces.workflow-service.workflow-status' @@ -178,10 +181,64 @@ class WorkflowMonitor: return events - def send_event_data(self, event_metadatum: str): + def send_event_data(self, connection: pika.BlockingConnection, event_metadatum: str): """ Takes in a JSON-formatted string and sends it off to the workflow status exchange :param event_metadatum: JSON string of event metadata """ # Send data to exchange - pass \ No newline at end of file + channel: pika.channel.Channel = connection.channel() + channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True) + channel.basic_publish( + exchange=WORKFLOW_STATUS_EXCH, + # routing_key=? + ) + + +_DESCRIPTION = 'Workspaces workflow monitor, version {}. Monitor execution of a workflow from ' \ + 'the command line.' + + +def make_arg_parser() -> argparse.ArgumentParser: + """ + Initialize argument parser for command-line arguments + :return: Argument parser + """ + parser: argparse.ArgumentParser = argparse.ArgumentParser( + description=_DESCRIPTION.format(VERSION), + formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument('-P', '--profile', action='store', required=True, + help='profile name to use; e.g. test, production') + parser.add_argument('log_path', action='store', required=True, + help='path to the workflow\'s HTCondor log') + + return parser + + +def make_amqp_connection(profile: str) -> pika.BlockingConnection: + """ + Initialize connection to AMQP server + :param profile: Capo profile to use + :return: Established connection + """ + capo_cfg = CapoConfig(profile=profile) + + hostname: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname') + username: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username') + password: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password') + + return pika.BlockingConnection( + pika.ConnectionParameters( + hostname, + credentials=pika.PlainCredentials( + username, + password + ) + ) + ) + +def main(): + # Parse command-line args + args: argparse.Namespace = make_arg_parser().parse_args() + connection: pika.BlockingConnection = make_amqp_connection(args.profile) -- GitLab