diff --git a/apps/cli/executables/alma_reingester/setup.py b/apps/cli/executables/alma_reingester/setup.py index 01d37ee293b06f9e64a6a194d5cdd9979928d25d..283276bc17c6ebd6aad3c8f2cb4040ce6ebf5720 100644 --- a/apps/cli/executables/alma_reingester/setup.py +++ b/apps/cli/executables/alma_reingester/setup.py @@ -16,7 +16,7 @@ setup( author_email='dms-ssa@nrao.edu', url='TBD', license="GPL", - install_requires=['cx_Oracle', 'pycapo', 'psycopg2', 'events', 'schema'], + install_requires=['cx_Oracle', 'pycapo', 'psycopg2', 'ssa-events', 'ssa-schema'], keywords=[], packages=['alma_reingester'], package_dir={'':'src'}, diff --git a/apps/cli/executables/ingestion/setup.py b/apps/cli/executables/ingestion/setup.py index 2a90e81ebb8d86dcfc97ebf09f584d01fe83635a..6979853a2fd0373b4939448e79092732562febca 100644 --- a/apps/cli/executables/ingestion/setup.py +++ b/apps/cli/executables/ingestion/setup.py @@ -22,11 +22,11 @@ setup( 'numpy', 'psycopg2', 'pycapo', - 'pymygdala', 'requests', - 'schema', - 'sqlalchemy', - 'weblog_thumbs'], + 'ssa-pymygdala', + 'ssa-schema', + 'ssa-weblog_thumbs', + 'sqlalchemy'], keywords=[], packages=['ingestion'], package_dir={'':'src'}, diff --git a/apps/cli/executables/null/setup.py b/apps/cli/executables/null/setup.py index c7d3294077322e07c1726b495a16a006a10b0aa9..3399db9f1e7f7c075ea47f44ce6436478e139f3d 100644 --- a/apps/cli/executables/null/setup.py +++ b/apps/cli/executables/null/setup.py @@ -7,9 +7,6 @@ from setuptools import setup VERSION = open('src/null/_version.py').readlines()[-1].split()[-1].strip("\"'") README = Path('README.md').read_text() -# requires = [ -# ] - tests_require = [ 'pytest>=5.4,<6.0' ] diff --git a/apps/cli/executables/vlba_grabber/setup.py b/apps/cli/executables/vlba_grabber/setup.py index f6b5f801e1084a076ec1a8e24966a51ce47572fc..5ec7c3aea260f4c4b52621701e6308e98ad9a8ab 100644 --- a/apps/cli/executables/vlba_grabber/setup.py +++ b/apps/cli/executables/vlba_grabber/setup.py @@ -16,7 +16,7 @@ setup( author_email='dms-ssa@nrao.edu', url='TBD', license="GPL", - install_requires=['requests', 'pymygdala', 'pycapo', 'beautifulsoup4'], + install_requires=['requests', 'ssa-pymygdala', 'pycapo', 'beautifulsoup4'], keywords=[], packages=['vlba_grabber'], package_dir={'':'src'}, diff --git a/apps/cli/executables/weblog_thumbs/setup.py b/apps/cli/executables/weblog_thumbs/setup.py index 1c2d896c13539c61e033f84d92cd03501d4bad09..2f83979e0de2df2d25fc1cba4f0a744d3793325e 100644 --- a/apps/cli/executables/weblog_thumbs/setup.py +++ b/apps/cli/executables/weblog_thumbs/setup.py @@ -16,7 +16,7 @@ setup( author_email='dms-ssa@nrao.edu', url='TBD', license="GPL", - install_requires=['pymygdala'], + install_requires=['ssa-pymygdala'], keywords=[], packages=['weblog_thumbs'], package_dir={'':'src'}, diff --git a/apps/cli/launchers/wf/setup.py b/apps/cli/launchers/wf/setup.py index df4b7819d8899382177e9ed38860de1a81cfc289..792132cba6bad78f5f15ea187bc72ceba22baf65 100644 --- a/apps/cli/launchers/wf/setup.py +++ b/apps/cli/launchers/wf/setup.py @@ -16,7 +16,7 @@ setup( author_email='dms-ssa@nrao.edu', url='TBD', license="GPL", - install_requires=['pika', 'psycopg2', 'cx_Oracle', 'pymygdala', 'pycapo', 'schema'], + install_requires=['pika', 'psycopg2', 'cx_Oracle', 'ssa-pymygdala', 'pycapo', 'ssa-schema'], keywords=[], packages=['wf'], package_dir={'':'src'}, diff --git a/apps/cli/utilities/datafinder/setup.py b/apps/cli/utilities/datafinder/setup.py index 0c527a77ede638b86532a7429b29dc1501ca213f..0f7072833a9230e799cb5e2099bbb6699dc55891 100644 --- a/apps/cli/utilities/datafinder/setup.py +++ b/apps/cli/utilities/datafinder/setup.py @@ -16,7 +16,7 @@ setup( author_email='dms-ssa@nrao.edu', url='TBD', license="GPL", - install_requires=['pandas', 'requests', 'schema', 'sqlalchemy', 'tqdm'], + install_requires=['pandas', 'requests', 'ssa-schema', 'sqlalchemy', 'tqdm'], keywords=[], packages=['datafinder'], package_dir={'':'src'}, diff --git a/apps/cli/utilities/mr_books/setup.py b/apps/cli/utilities/mr_books/setup.py index ab8b97f85b4d64d2089ea59ff915ae2078ab5516..6fc448a3a46e0825ee1e0137cab6fe84f8e210d3 100644 --- a/apps/cli/utilities/mr_books/setup.py +++ b/apps/cli/utilities/mr_books/setup.py @@ -16,7 +16,7 @@ setup( author_email='dms-ssa@nrao.edu', url='TBD', license="GPL", - install_requires=['pendulum', 'pid', 'pycapo', 'pymygdala', 'schema', 'sqlalchemy'], + install_requires=['pendulum', 'pid', 'pycapo', 'ssa-pymygdala', 'ssa-schema', 'sqlalchemy'], keywords=[], packages=['mr_books'], package_dir={'':'src'}, diff --git a/apps/cli/utilities/mr_clean/setup.py b/apps/cli/utilities/mr_clean/setup.py index 65945b49271e0fdc9fa175d227833cf93e218c43..80b09c427ae187111e32ee50962c6675f1086771 100644 --- a/apps/cli/utilities/mr_clean/setup.py +++ b/apps/cli/utilities/mr_clean/setup.py @@ -16,7 +16,7 @@ setup( author_email='dms-ssa@nrao.edu', url='TBD', license="GPL", - install_requires=['pendulum', 'pycapo', 'pymygdala'], + install_requires=['pendulum', 'pycapo', 'ssa-pymygdala'], keywords=[], packages=['mr_clean'], package_dir={'':'src'}, diff --git a/apps/cli/utilities/proprietary_setter/setup.py b/apps/cli/utilities/proprietary_setter/setup.py index 66397d214725cf34116821fba63d2a956ed63c0f..df171e73cae2fd6900b384143e09cbbf941e65b7 100644 --- a/apps/cli/utilities/proprietary_setter/setup.py +++ b/apps/cli/utilities/proprietary_setter/setup.py @@ -16,7 +16,7 @@ setup( author_email='dms-ssa@nrao.edu', url='TBD', license="GPL", - install_requires=['pymygdala', 'schema', 'support'], + install_requires=['ssa-pymygdala', 'ssa-schema', 'ssa-support'], keywords=[], packages=['proprietary_setter'], package_dir={'':'src'}, diff --git a/apps/cli/utilities/qa_results/setup.py b/apps/cli/utilities/qa_results/setup.py index 1750c6d10b122e7f35417e75d675e72e2170f610..d6b41136c179fee50c562978e67b9777b11c37b4 100644 --- a/apps/cli/utilities/qa_results/setup.py +++ b/apps/cli/utilities/qa_results/setup.py @@ -16,7 +16,7 @@ setup( author_email='dms-ssa@nrao.edu', url='TBD', license="GPL", - install_requires=['pycapo', 'pymygdala'], + install_requires=['pycapo', 'ssa-pymygdala'], keywords=[], packages=['qa_results'], package_dir={'':'src'}, diff --git a/apps/cli/utilities/s_code_project_updater/setup.py b/apps/cli/utilities/s_code_project_updater/setup.py index 9aa45572a805d200f2ef671897bc0bfc94633a6a..0b4ce307ed2ab12b92cba25b00a1324d663b4815 100644 --- a/apps/cli/utilities/s_code_project_updater/setup.py +++ b/apps/cli/utilities/s_code_project_updater/setup.py @@ -16,7 +16,7 @@ setup( author_email='dms-ssa@nrao.edu', url='TBD', license="GPL", - install_requires=['pycapo', 'pymygdala', 'schema', 'sqlalchemy'], + install_requires=['pycapo', 'ssa-pymygdala', 'ssa-schema', 'sqlalchemy'], tests_require=['pytest-mock'], requires=['sqlalchemy', 'mysqldb'], keywords=[], diff --git a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py index 7f0cf4e95f759e937f9389be4d51abdc52cf0b36..f0f13d281771d4d67e95f828db92366489e7abb4 100644 --- a/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py +++ b/apps/cli/utilities/wf_monitor/test/test_wf_monitor.py @@ -11,6 +11,10 @@ WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' def test_read_log(): + """ + Tests whether or not the example log is being correctly read by checking for strings known to + exist in the example log file + """ test_strs = [ 'Image size of job updated: 432', '432 - ResidentSetSize of job (KB)', @@ -22,7 +26,10 @@ def test_read_log(): assert s in test_monitor.log -def test_read_log_timeout(capsys): +def test_read_log_timeout(): + """ + Tests the timeout functionality of WorkflowMonitor.read_htcondor_log() + """ with pytest.raises(SystemExit) as sys_ex: WorkflowMonitor('logs/file-that-does-not-exist.txt', 1) assert sys_ex.type == SystemExit @@ -30,6 +37,10 @@ def test_read_log_timeout(capsys): def test_parse_log(): + """ + Tests that the example log file is correctly parsed by checking for the known order of event + types + """ test_event_types = [ WorkflowEventType.SUBMITTED, WorkflowEventType.EXECUTING, @@ -45,12 +56,23 @@ def test_parse_log(): def test_parse_log_error(): + """ + Tests that WorkflowMonitor.parse_log() correctly raises a ValueError when a badly formatted + log is parsed + """ with pytest.raises(ValueError) as val_err: WorkflowMonitor('logs/test.log') assert val_err.type == ValueError def test_monitor_events(mocker: MockerFixture): + """ + Test that mocks the core wf_monitor functionality of creating a connection to the RabbitMQ + server, configuring a channel with that connection, and emitting events through that channel + :param mocker: Pytest fixture from pytest_mock that wraps the real function and mimics that + its been called as normal; allows for testing interaction with the server without actually + requiring a connection to the server + """ mock_list = [ 'pika.adapters.blocking_connection.BlockingChannel.basic_publish', 'channels.amqp_helpers.make_amqp_connection', diff --git a/services/archive/setup.py b/services/archive/setup.py index f4bdad5d46b61ee1aeade38379984e804fa62ee9..c11f2d3542b222ec95312dfc749e6363bd36ea28 100644 --- a/services/archive/setup.py +++ b/services/archive/setup.py @@ -43,7 +43,7 @@ requires = [ 'pyramid_debugtoolbar', 'pyramid_tm', 'requests', - 'schema', + 'ssa-schema', 'sqlalchemy', 'waitress', 'zope.sqlalchemy' diff --git a/services/workflow/setup.py b/services/workflow/setup.py index 71f6c4bb0943be573fed881cd04f989cb76865fb..2d6903af4d7ae9b574a764a9da31193060abc997 100644 --- a/services/workflow/setup.py +++ b/services/workflow/setup.py @@ -43,10 +43,10 @@ requires = [ 'pyramid_debugtoolbar', 'pyramid_tm', 'requests', - 'schema', + 'ssa-schema', 'sqlalchemy', 'waitress', - 'workspaces', + 'ssa-workspaces', 'zope.sqlalchemy' ] diff --git a/shared/channels/src/channels/amqp_helpers.py b/shared/channels/src/channels/amqp_helpers.py index 4b9cf84820748c3042bfe66c4975666ffb7334ba..3b509e32f310600520ba49a14bb1bf057d1dbf40 100644 --- a/shared/channels/src/channels/amqp_helpers.py +++ b/shared/channels/src/channels/amqp_helpers.py @@ -1,4 +1,4 @@ -from typing import Callable, Any +from typing import Callable, Any, Optional, Union import pika from marshmallow import Schema @@ -6,115 +6,98 @@ from pycapo import CapoConfig from workspaces.json import WorkflowEventSchema -def make_amqp_connection(profile: str) -> pika.BlockingConnection: - """ - Initialize connection to AMQP server given a CAPO profile - :param profile: Capo profile to use - :return: Established connection - """ - capo_cfg = CapoConfig(profile=profile) - - hostname = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.hostname') - username = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.username') - password = capo_cfg.getstring('edu.nrao.archive.configuration.AmqpServer.password') - - return pika.BlockingConnection( - pika.ConnectionParameters( - host=hostname, - credentials=pika.PlainCredentials( - username=username, - password=password - ) - ) - ) - - -def configure_amqp_channel( - connection: pika.BlockingConnection, - exchange: str, - exchange_type: str -) -> pika.adapters.blocking_connection.BlockingChannel: - """ - Takes in a connection and exchange declaration parameters and creates a channel to the given - exchange, configured with the given parameters - :param connection: RabbitMQ BlockingConnection - :param exchange: Exchange name - :param exchange_type: Exchange type - :return: Channel for given connection - """ - channel = connection.channel() - channel.exchange_declare(exchange=exchange, exchange_type=exchange_type, durable=True) - return channel - - -def emit_amqp_message( - channel: pika.adapters.blocking_connection.BlockingChannel, - exchange: str, - routing_key: str, - msg: bytes -): - """ - Emit AMQP message to a given exchange through a given channel - :param channel: AMQP connection channel - :param exchange: AMQP exchange name - :param routing_key: Routing key - :param msg: Message to be emitted - """ - channel.basic_publish(exchange=exchange, routing_key=routing_key, body=msg) - - -def receive_amqp_message( - channel: pika.adapters.blocking_connection.BlockingChannel, - exchange: str, - routing_key: str, - callback: Callable, - auto_ack: bool = False -): - """ - Establishes queue and binds it to a given channel and consumes messages matching the - routing key from given exchange - :param channel: AMQP connection channel - :param exchange: AMQP exchange name - :param routing_key: Routing key - :param callback: Callback function for when a message is consumed - :param auto_ack: If true, consumer automatically acknowledges when it consumes a message - """ - queue_result = channel.queue_declare(queue='', exclusive=True) - queue_name = queue_result.method.queue - channel.queue_bind(queue=queue_name, exchange=exchange, routing_key=routing_key) - channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=auto_ack) - channel.start_consuming() +CONNECTION = None +CHANNEL = None +CONFIG = None class Channel: def __init__(self, schema: Schema): self.schema = schema - def connect(self): - # FIXME implement this - pass + def connect(self, **kwargs: Union[int, str]) -> pika.BlockingConnection: + """ + Initialize connection to AMQP server given a CAPO profile + :return: Established connection + """ + global CONNECTION + global CHANNEL + global CONFIG + + if not CONNECTION: + CONFIG = CapoConfig( + profile=kwargs.get('profile', None) + ).settings('edu.nrao.archive.configuration.AmqpServer') + + connection_parameters = pika.ConnectionParameters( + host=kwargs.get('hostname', CONFIG.hostname), + port=int(kwargs.get('port', CONFIG.port)), + # FIXME: Copied from events. Do these args need to be cast to int? + connection_attempts=kwargs.get('connection_attempts', 5), + socket_timeout=kwargs.get('socket_timeout', 5000), + retry_delay=kwargs.get('retry_delay', 500), + # + credentials=pika.PlainCredentials( + username=kwargs.get('username', CONFIG.username), + password=kwargs.get('password', CONFIG.password) + ) + ) + CONNECTION = pika.BlockingConnection(connection_parameters) + CHANNEL = CONNECTION.channel() + CHANNEL.exchange_declare( + # FIXME: CAPO config does not currently support 'exchange' value + exchange=kwargs.get('exchange', CONFIG.exchange), + exchange_type='topic', + durable=True, + auto_delete=False + ) def close(self): - # FIXME implement this - pass + if CONNECTION: + CONNECTION.close() - def send(self, event): - # I don't know how to type Event there for Python but it would be - # nice if we could, somehow. + def send(self, event: WorkflowEventSchema): rendered = self.schema.dump(event) - # FIXME: transmit rendered over the underlying channel - pass + routing_key = f'{rendered["job_name"]}.{rendered["job_id"]}.{rendered["event_type"]}' + CHANNEL.basic_publish(routing_key=routing_key, body=str(rendered)) + + def listen(self, callback: Optional[Callable], pattern: str = '#', **kwargs: Union[str, bool]): + """ + Establishes queue and binds it to a given channel and consumes messages matching the + routing key from given exchange + :param callback: Callback function for when a message is consumed + :param pattern: Pattern to be used as routing key + Optional keyword arguments + :param exchange: AMQP exchange name + :param auto_ack: If true, consumer automatically acknowledges when it consumes a message + """ + exchange = kwargs.get('exchange', CONFIG.exchange) + auto_ack = kwargs.get('auto_ack', False) - def listen(self, callback: Callable[[Any], None], pattern: str = '#'): def unwrapping_callback(message): event = self.schema.load(message) callback(event) # FIXME: ask the underlying channel to call us back # but only for messages matching pattern - pass - def __enter__(self): - self.connect() + queue = CHANNEL.queue_declare(queue='', exclusive=True).method.queue + CHANNEL.queue_bind(queue=queue, exchange=exchange, routing_key=pattern) + CHANNEL.basic_consume(queue=queue, on_message_callback=callback, auto_ack=auto_ack) + CHANNEL.start_consuming() + + def __enter__(self, **kwargs: Union[int, str]): + """ + Keyword arguments for the AMQP connection. These do not need to be specified: + :param: hostname: Hostname to connect to + :param: port: Port to connect to + :param: connection_attempts: Number of connection attempts to try + :param: socket_timeout: Time to wait for a socket to connect + :param: retry_delay: Time to wait between retrying the connection + :param: username: Username to connect to as + :param: password: Password to use when connecting + :param: exchange: Exchange to use when connection + """ + self.connect(**kwargs) def __exit__(self, exc_type, exc_val, exc_tb): self.close() diff --git a/shared/workspaces/src/workspaces/json.py b/shared/workspaces/src/workspaces/json.py index cdb72af2d4543d3552e1abefbae82f6d0067958b..0244672cbe09d14d19502afda10410a0533b9d76 100644 --- a/shared/workspaces/src/workspaces/json.py +++ b/shared/workspaces/src/workspaces/json.py @@ -15,11 +15,11 @@ class WorkflowEventSchema(Schema): retval = fields.Integer(allow_none=True) # Enums apparently are not a first-class field type in Marshmallow - def get_type(self, obj: WorkflowEventType) -> int: - return obj.type.value + def get_type(self, obj: WorkflowEventType) -> str: + return obj.type.name.lower() - def load_type(self, value: int) -> WorkflowEventType: - return next(et for et in WorkflowEventType if et.value == value) + def load_type(self, name: str) -> WorkflowEventType: + return next(et for et in WorkflowEventType if et.name.lower() == name) @post_load def make_event(self, data, **kwargs):