Skip to content
Snippets Groups Projects
Commit 885441cd authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Added command-line arg support; started implementing send_event_data()

parent 59faedbf
No related branches found
No related tags found
No related merge requests found
......@@ -4,6 +4,7 @@ import sys
import time
import signal
import logging
import argparse
import subprocess
import tracemalloc
from typing import List, Union, Dict
......@@ -11,8 +12,10 @@ from pathlib import Path
import pika
import pendulum
from pycapo import CapoConfig
from .wf_event import WorkflowEvent, EventType
from ._version import ___version___ as VERSION
WORKFLOW_STATUS_EXCH: str = 'workspaces.workflow-service.workflow-status'
......@@ -178,10 +181,64 @@ class WorkflowMonitor:
return events
def send_event_data(self, event_metadatum: str):
def send_event_data(self, connection: pika.BlockingConnection, event_metadatum: str):
"""
Takes in a JSON-formatted string and sends it off to the workflow status exchange
:param event_metadatum: JSON string of event metadata
"""
# Send data to exchange
pass
\ No newline at end of file
channel: pika.channel.Channel = connection.channel()
channel.exchange_declare(WORKFLOW_STATUS_EXCH, 'topic', durable=True)
channel.basic_publish(
exchange=WORKFLOW_STATUS_EXCH,
# routing_key=?
)
_DESCRIPTION = 'Workspaces workflow monitor, version {}. Monitor execution of a workflow from ' \
'the command line.'
def make_arg_parser() -> argparse.ArgumentParser:
"""
Initialize argument parser for command-line arguments
:return: Argument parser
"""
parser: argparse.ArgumentParser = argparse.ArgumentParser(
description=_DESCRIPTION.format(VERSION),
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument('-P', '--profile', action='store', required=True,
help='profile name to use; e.g. test, production')
parser.add_argument('log_path', action='store', required=True,
help='path to the workflow\'s HTCondor log')
return parser
def make_amqp_connection(profile: str) -> pika.BlockingConnection:
"""
Initialize connection to AMQP server
:param profile: Capo profile to use
:return: Established connection
"""
capo_cfg = CapoConfig(profile=profile)
hostname: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname')
username: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username')
password: str = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password')
return pika.BlockingConnection(
pika.ConnectionParameters(
hostname,
credentials=pika.PlainCredentials(
username,
password
)
)
)
def main():
# Parse command-line args
args: argparse.Namespace = make_arg_parser().parse_args()
connection: pika.BlockingConnection = make_amqp_connection(args.profile)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment