-
WS-254, WS-253, WS-251: Catch ingest-complete event, parse it, and create request based on given info
WS-254, WS-253, WS-251: Catch ingest-complete event, parse it, and create request based on given info
test_get_archive_ingestion_message.py 3.21 KiB
""" Can we receive an ingest message from the archive? """
# pylint: disable=E0402, W1203
import json
import logging
import sys
from enum import Enum
from pprint import pprint
from typing import Dict
from _pytest.capture import CaptureFixture
from _pytest.logging import LogCaptureFixture
from pycapo import CapoConfig
from messaging.router import Router
logger = logging.getLogger("ingest_complete_message")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
class ArchiveEventParam(Enum):
"""The parameters we expect to find in an archive event"""
APPLICATION = "application"
FS_ID = "fileset_id"
HOST = "host"
PORT = "port"
USER = "username"
PW = "password"
ROUTING_KEY = "key"
EXCH = "exchange"
TYPE = "type"
PROFILE = "profile"
def test_ingest_complete(test_router: Router, caplog: LogCaptureFixture, capsys: CaptureFixture):
"""
Try to capture ingest-complete message from archive.
:param caplog: our log capturer
:return:
"""
caplog.set_level(logging.INFO)
config = CapoConfig(profile="docker")
amqp_settings = config.settings("edu.nrao.archive.configuration.AmqpServer")
args = {
"application": "archive_event",
"fileset_id": "13B-014.sb29151475.eb29223944.56810.442529050924",
"host": amqp_settings.hostname,
"port": amqp_settings.port,
"username": amqp_settings.username,
"password": amqp_settings.password,
"key": "ingestion-complete.#",
"exchange": "archive.events",
"type": "topic",
"profile": "docker",
}
settings = args
event_params = {
"message": "Ingestion complete",
"routing_key": "ingestion-complete.#",
"project_code": "13B-014",
}
for key, val in event_params.items():
settings[key] = val
orig_cbs = test_router.callbacks
try:
test_router.callbacks = {
'{"type": "topic"}': [ingestion_fake_callback],
}
message = json.dumps(settings)
test_router.receive_message(message)
received = capsys.readouterr().out
# remove first and last line break, and get rid of the braces
received = (
received.replace("{\n", "{").replace("\n}\n", "}").replace("{", "").replace("}", "")
)
pprint(args)
not_found = []
items = received.split("\n")
for skey, sval in settings.items():
found = False
for item in items:
rkey, rval = item.split(": ")
if rkey.replace('"', "") == skey and rval == sval:
found = True
break
if not found:
if skey not in event_params.keys():
not_found.append(skey)
if len(not_found) > 0:
raise ValueError(f"Not found in message received: {not_found}")
finally:
test_router.callbacks = orig_cbs
def ingestion_fake_callback(**message: Dict):
"""
Pretend we're doing a callback and create the output we expect to receive.
:param message:
:return:
"""
print("{")
for key in ArchiveEventParam:
print(f'"{key.value}": {message[key.value]}')
print("}")