Skip to content
Snippets Groups Projects
Commit a958cf67 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Merge remote-tracking branch 'origin/SSA-6320-workflow-service' into SSA-6320-workflow-service

parents 02bdf50e 92f25127
No related branches found
No related tags found
No related merge requests found
Showing
with 120 additions and 118 deletions
...@@ -16,7 +16,7 @@ setup( ...@@ -16,7 +16,7 @@ setup(
author_email='dms-ssa@nrao.edu', author_email='dms-ssa@nrao.edu',
url='TBD', url='TBD',
license="GPL", license="GPL",
install_requires=['cx_Oracle', 'pycapo', 'psycopg2', 'events', 'schema'], install_requires=['cx_Oracle', 'pycapo', 'psycopg2', 'ssa-events', 'ssa-schema'],
keywords=[], keywords=[],
packages=['alma_reingester'], packages=['alma_reingester'],
package_dir={'':'src'}, package_dir={'':'src'},
......
...@@ -22,11 +22,11 @@ setup( ...@@ -22,11 +22,11 @@ setup(
'numpy', 'numpy',
'psycopg2', 'psycopg2',
'pycapo', 'pycapo',
'pymygdala',
'requests', 'requests',
'schema', 'ssa-pymygdala',
'sqlalchemy', 'ssa-schema',
'weblog_thumbs'], 'ssa-weblog_thumbs',
'sqlalchemy'],
keywords=[], keywords=[],
packages=['ingestion'], packages=['ingestion'],
package_dir={'':'src'}, package_dir={'':'src'},
......
...@@ -7,9 +7,6 @@ from setuptools import setup ...@@ -7,9 +7,6 @@ from setuptools import setup
VERSION = open('src/null/_version.py').readlines()[-1].split()[-1].strip("\"'") VERSION = open('src/null/_version.py').readlines()[-1].split()[-1].strip("\"'")
README = Path('README.md').read_text() README = Path('README.md').read_text()
# requires = [
# ]
tests_require = [ tests_require = [
'pytest>=5.4,<6.0' 'pytest>=5.4,<6.0'
] ]
......
...@@ -16,7 +16,7 @@ setup( ...@@ -16,7 +16,7 @@ setup(
author_email='dms-ssa@nrao.edu', author_email='dms-ssa@nrao.edu',
url='TBD', url='TBD',
license="GPL", license="GPL",
install_requires=['requests', 'pymygdala', 'pycapo', 'beautifulsoup4'], install_requires=['requests', 'ssa-pymygdala', 'pycapo', 'beautifulsoup4'],
keywords=[], keywords=[],
packages=['vlba_grabber'], packages=['vlba_grabber'],
package_dir={'':'src'}, package_dir={'':'src'},
......
...@@ -16,7 +16,7 @@ setup( ...@@ -16,7 +16,7 @@ setup(
author_email='dms-ssa@nrao.edu', author_email='dms-ssa@nrao.edu',
url='TBD', url='TBD',
license="GPL", license="GPL",
install_requires=['pymygdala'], install_requires=['ssa-pymygdala'],
keywords=[], keywords=[],
packages=['weblog_thumbs'], packages=['weblog_thumbs'],
package_dir={'':'src'}, package_dir={'':'src'},
......
...@@ -16,7 +16,7 @@ setup( ...@@ -16,7 +16,7 @@ setup(
author_email='dms-ssa@nrao.edu', author_email='dms-ssa@nrao.edu',
url='TBD', url='TBD',
license="GPL", license="GPL",
install_requires=['pika', 'psycopg2', 'cx_Oracle', 'pymygdala', 'pycapo', 'schema'], install_requires=['pika', 'psycopg2', 'cx_Oracle', 'ssa-pymygdala', 'pycapo', 'ssa-schema'],
keywords=[], keywords=[],
packages=['wf'], packages=['wf'],
package_dir={'':'src'}, package_dir={'':'src'},
......
...@@ -16,7 +16,7 @@ setup( ...@@ -16,7 +16,7 @@ setup(
author_email='dms-ssa@nrao.edu', author_email='dms-ssa@nrao.edu',
url='TBD', url='TBD',
license="GPL", license="GPL",
install_requires=['pandas', 'requests', 'schema', 'sqlalchemy', 'tqdm'], install_requires=['pandas', 'requests', 'ssa-schema', 'sqlalchemy', 'tqdm'],
keywords=[], keywords=[],
packages=['datafinder'], packages=['datafinder'],
package_dir={'':'src'}, package_dir={'':'src'},
......
...@@ -16,7 +16,7 @@ setup( ...@@ -16,7 +16,7 @@ setup(
author_email='dms-ssa@nrao.edu', author_email='dms-ssa@nrao.edu',
url='TBD', url='TBD',
license="GPL", license="GPL",
install_requires=['pendulum', 'pid', 'pycapo', 'pymygdala', 'schema', 'sqlalchemy'], install_requires=['pendulum', 'pid', 'pycapo', 'ssa-pymygdala', 'ssa-schema', 'sqlalchemy'],
keywords=[], keywords=[],
packages=['mr_books'], packages=['mr_books'],
package_dir={'':'src'}, package_dir={'':'src'},
......
...@@ -16,7 +16,7 @@ setup( ...@@ -16,7 +16,7 @@ setup(
author_email='dms-ssa@nrao.edu', author_email='dms-ssa@nrao.edu',
url='TBD', url='TBD',
license="GPL", license="GPL",
install_requires=['pendulum', 'pycapo', 'pymygdala'], install_requires=['pendulum', 'pycapo', 'ssa-pymygdala'],
keywords=[], keywords=[],
packages=['mr_clean'], packages=['mr_clean'],
package_dir={'':'src'}, package_dir={'':'src'},
......
...@@ -16,7 +16,7 @@ setup( ...@@ -16,7 +16,7 @@ setup(
author_email='dms-ssa@nrao.edu', author_email='dms-ssa@nrao.edu',
url='TBD', url='TBD',
license="GPL", license="GPL",
install_requires=['pymygdala', 'schema', 'support'], install_requires=['ssa-pymygdala', 'ssa-schema', 'ssa-support'],
keywords=[], keywords=[],
packages=['proprietary_setter'], packages=['proprietary_setter'],
package_dir={'':'src'}, package_dir={'':'src'},
......
...@@ -16,7 +16,7 @@ setup( ...@@ -16,7 +16,7 @@ setup(
author_email='dms-ssa@nrao.edu', author_email='dms-ssa@nrao.edu',
url='TBD', url='TBD',
license="GPL", license="GPL",
install_requires=['pycapo', 'pymygdala'], install_requires=['pycapo', 'ssa-pymygdala'],
keywords=[], keywords=[],
packages=['qa_results'], packages=['qa_results'],
package_dir={'':'src'}, package_dir={'':'src'},
......
...@@ -16,7 +16,7 @@ setup( ...@@ -16,7 +16,7 @@ setup(
author_email='dms-ssa@nrao.edu', author_email='dms-ssa@nrao.edu',
url='TBD', url='TBD',
license="GPL", license="GPL",
install_requires=['pycapo', 'pymygdala', 'schema', 'sqlalchemy'], install_requires=['pycapo', 'ssa-pymygdala', 'ssa-schema', 'sqlalchemy'],
tests_require=['pytest-mock'], tests_require=['pytest-mock'],
requires=['sqlalchemy', 'mysqldb'], requires=['sqlalchemy', 'mysqldb'],
keywords=[], keywords=[],
......
...@@ -11,6 +11,10 @@ WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' ...@@ -11,6 +11,10 @@ WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status'
def test_read_log(): def test_read_log():
"""
Tests whether or not the example log is being correctly read by checking for strings known to
exist in the example log file
"""
test_strs = [ test_strs = [
'Image size of job updated: 432', 'Image size of job updated: 432',
'432 - ResidentSetSize of job (KB)', '432 - ResidentSetSize of job (KB)',
...@@ -22,7 +26,10 @@ def test_read_log(): ...@@ -22,7 +26,10 @@ def test_read_log():
assert s in test_monitor.log assert s in test_monitor.log
def test_read_log_timeout(capsys): def test_read_log_timeout():
"""
Tests the timeout functionality of WorkflowMonitor.read_htcondor_log()
"""
with pytest.raises(SystemExit) as sys_ex: with pytest.raises(SystemExit) as sys_ex:
WorkflowMonitor('logs/file-that-does-not-exist.txt', 1) WorkflowMonitor('logs/file-that-does-not-exist.txt', 1)
assert sys_ex.type == SystemExit assert sys_ex.type == SystemExit
...@@ -30,6 +37,10 @@ def test_read_log_timeout(capsys): ...@@ -30,6 +37,10 @@ def test_read_log_timeout(capsys):
def test_parse_log(): def test_parse_log():
"""
Tests that the example log file is correctly parsed by checking for the known order of event
types
"""
test_event_types = [ test_event_types = [
WorkflowEventType.SUBMITTED, WorkflowEventType.SUBMITTED,
WorkflowEventType.EXECUTING, WorkflowEventType.EXECUTING,
...@@ -45,12 +56,23 @@ def test_parse_log(): ...@@ -45,12 +56,23 @@ def test_parse_log():
def test_parse_log_error(): def test_parse_log_error():
"""
Tests that WorkflowMonitor.parse_log() correctly raises a ValueError when a badly formatted
log is parsed
"""
with pytest.raises(ValueError) as val_err: with pytest.raises(ValueError) as val_err:
WorkflowMonitor('logs/test.log') WorkflowMonitor('logs/test.log')
assert val_err.type == ValueError assert val_err.type == ValueError
def test_monitor_events(mocker: MockerFixture): def test_monitor_events(mocker: MockerFixture):
"""
Test that mocks the core wf_monitor functionality of creating a connection to the RabbitMQ
server, configuring a channel with that connection, and emitting events through that channel
:param mocker: Pytest fixture from pytest_mock that wraps the real function and mimics that
its been called as normal; allows for testing interaction with the server without actually
requiring a connection to the server
"""
mock_list = [ mock_list = [
'pika.adapters.blocking_connection.BlockingChannel.basic_publish', 'pika.adapters.blocking_connection.BlockingChannel.basic_publish',
'channels.amqp_helpers.make_amqp_connection', 'channels.amqp_helpers.make_amqp_connection',
......
...@@ -43,7 +43,7 @@ requires = [ ...@@ -43,7 +43,7 @@ requires = [
'pyramid_debugtoolbar', 'pyramid_debugtoolbar',
'pyramid_tm', 'pyramid_tm',
'requests', 'requests',
'schema', 'ssa-schema',
'sqlalchemy', 'sqlalchemy',
'waitress', 'waitress',
'zope.sqlalchemy' 'zope.sqlalchemy'
......
...@@ -43,10 +43,10 @@ requires = [ ...@@ -43,10 +43,10 @@ requires = [
'pyramid_debugtoolbar', 'pyramid_debugtoolbar',
'pyramid_tm', 'pyramid_tm',
'requests', 'requests',
'schema', 'ssa-schema',
'sqlalchemy', 'sqlalchemy',
'waitress', 'waitress',
'workspaces', 'ssa-workspaces',
'zope.sqlalchemy' 'zope.sqlalchemy'
] ]
......
from typing import Callable, Any from typing import Callable, Any, Optional, Union
import pika import pika
from marshmallow import Schema from marshmallow import Schema
...@@ -6,115 +6,98 @@ from pycapo import CapoConfig ...@@ -6,115 +6,98 @@ from pycapo import CapoConfig
from workspaces.json import WorkflowEventSchema from workspaces.json import WorkflowEventSchema
def make_amqp_connection(profile: str) -> pika.BlockingConnection: CONNECTION = None
""" CHANNEL = None
Initialize connection to AMQP server given a CAPO profile CONFIG = None
:param profile: Capo profile to use
:return: Established connection
"""
capo_cfg = CapoConfig(profile=profile)
hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname')
username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username')
password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password')
return pika.BlockingConnection(
pika.ConnectionParameters(
host=hostname,
credentials=pika.PlainCredentials(
username=username,
password=password
)
)
)
def configure_amqp_channel(
connection: pika.BlockingConnection,
exchange: str,
exchange_type: str
) -> pika.adapters.blocking_connection.BlockingChannel:
"""
Takes in a connection and exchange declaration parameters and creates a channel to the given
exchange, configured with the given parameters
:param connection: RabbitMQ BlockingConnection
:param exchange: Exchange name
:param exchange_type: Exchange type
:return: Channel for given connection
"""
channel = connection.channel()
channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True)
return channel
def emit_amqp_message(
channel: pika.adapters.blocking_connection.BlockingChannel,
exchange: str,
routing_key: str,
msg: bytes
):
"""
Emit AMQP message to a given exchange through a given channel
:param channel: AMQP connection channel
:param exchange: AMQP exchange name
:param routing_key: Routing key
:param msg: Message to be emitted
"""
channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msg)
def receive_amqp_message(
channel: pika.adapters.blocking_connection.BlockingChannel,
exchange: str,
routing_key: str,
callback: Callable,
auto_ack: bool = False
):
"""
Establishes queue and binds it to a given channel and consumes messages matching the
routing key from given exchange
:param channel: AMQP connection channel
:param exchange: AMQP exchange name
:param routing_key: Routing key
:param callback: Callback function for when a message is consumed
:param auto_ack: If true, consumer automatically acknowledges when it consumes a message
"""
queue_result = channel.queue_declare(queue='', exclusive=True)
queue_name = queue_result.method.queue
channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack)
channel.start_consuming()
class Channel: class Channel:
def __init__(self, schema: Schema): def __init__(self, schema: Schema):
self.schema = schema self.schema = schema
def connect(self): def connect(self, **kwargs: Union[int, str]) -> pika.BlockingConnection:
# FIXME implement this """
pass Initialize connection to AMQP server given a CAPO profile
:return: Established connection
"""
global CONNECTION
global CHANNEL
global CONFIG
if not CONNECTION:
CONFIG = CapoConfig(
profile=kwargs.get('profile', None)
).settings('edu.nrao.archive.configuration.AmqpServer')
connection_parameters = pika.ConnectionParameters(
host=kwargs.get('hostname', CONFIG.hostname),
port=int(kwargs.get('port', CONFIG.port)),
# FIXME: Copied from events. Do these args need to be cast to int?
connection_attempts=kwargs.get('connection_attempts', 5),
socket_timeout=kwargs.get('socket_timeout', 5000),
retry_delay=kwargs.get('retry_delay', 500),
#
credentials=pika.PlainCredentials(
username=kwargs.get('username', CONFIG.username),
password=kwargs.get('password', CONFIG.password)
)
)
CONNECTION = pika.BlockingConnection(connection_parameters)
CHANNEL = CONNECTION.channel()
CHANNEL.exchange_declare(
# FIXME: CAPO config does not currently support 'exchange' value
exchange=kwargs.get('exchange', CONFIG.exchange),
exchange_type='topic',
durable=True,
auto_delete=False
)
def close(self): def close(self):
# FIXME implement this if CONNECTION:
pass CONNECTION.close()
def send(self, event): def send(self, event: WorkflowEventSchema):
# I don't know how to type Event there for Python but it would be
# nice if we could, somehow.
rendered = self.schema.dump(event) rendered = self.schema.dump(event)
# FIXME: transmit rendered over the underlying channel routing_key = f'{rendered["job_name"]}.{rendered["job_id"]}.{rendered["event_type"]}'
pass CHANNEL.basic_publish(routing_key=routing_key, body=str(rendered))
def listen(self, callback: Optional[Callable], pattern: str = '#', **kwargs: Union[str, bool]):
"""
Establishes queue and binds it to a given channel and consumes messages matching the
routing key from given exchange
:param callback: Callback function for when a message is consumed
:param pattern: Pattern to be used as routing key
Optional keyword arguments
:param exchange: AMQP exchange name
:param auto_ack: If true, consumer automatically acknowledges when it consumes a message
"""
exchange = kwargs.get('exchange', CONFIG.exchange)
auto_ack = kwargs.get('auto_ack', False)
def listen(self, callback: Callable[[Any], None], pattern: str = '#'):
def unwrapping_callback(message): def unwrapping_callback(message):
event = self.schema.load(message) event = self.schema.load(message)
callback(event) callback(event)
# FIXME: ask the underlying channel to call us back # FIXME: ask the underlying channel to call us back
# but only for messages matching pattern # but only for messages matching pattern
pass
def __enter__(self): queue = CHANNEL.queue_declare(queue='', exclusive=True).method.queue
self.connect() CHANNEL.queue_bind(queue=queue, exchange=exchange, routing_key=pattern)
CHANNEL.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack)
CHANNEL.start_consuming()
def __enter__(self, **kwargs: Union[int, str]):
"""
Keyword arguments for the AMQP connection. These do not need to be specified:
:param: hostname: Hostname to connect to
:param: port: Port to connect to
:param: connection_attempts: Number of connection attempts to try
:param: socket_timeout: Time to wait for a socket to connect
:param: retry_delay: Time to wait between retrying the connection
:param: username: Username to connect to as
:param: password: Password to use when connecting
:param: exchange: Exchange to use when connection
"""
self.connect(**kwargs)
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
self.close() self.close()
......
...@@ -15,11 +15,11 @@ class WorkflowEventSchema(Schema): ...@@ -15,11 +15,11 @@ class WorkflowEventSchema(Schema):
retval = fields.Integer(allow_none=True) retval = fields.Integer(allow_none=True)
# Enums apparently are not a first-class field type in Marshmallow # Enums apparently are not a first-class field type in Marshmallow
def get_type(self, obj: WorkflowEventType) -> int: def get_type(self, obj: WorkflowEventType) -> str:
return obj.type.value return obj.type.name.lower()
def load_type(self, value: int) -> WorkflowEventType: def load_type(self, name: str) -> WorkflowEventType:
return next(et for et in WorkflowEventType if et.value == value) return next(et for et in WorkflowEventType if et.name.lower() == name)
@post_load @post_load
def make_event(self, data, **kwargs): def make_event(self, data, **kwargs):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment