Skip to content
Snippets Groups Projects
test_get_archive_ingestion_message.py 3.21 KiB
""" Can we receive an ingest message from the archive?  """

# pylint: disable=E0402, W1203
import json
import logging
import sys
from enum import Enum
from pprint import pprint
from typing import Dict

from _pytest.capture import CaptureFixture
from _pytest.logging import LogCaptureFixture
from pycapo import CapoConfig

from messaging.router import Router

logger = logging.getLogger("ingest_complete_message")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))


class ArchiveEventParam(Enum):
    """The parameters we expect to find in an archive event"""

    APPLICATION = "application"
    FS_ID = "fileset_id"
    HOST = "host"
    PORT = "port"
    USER = "username"
    PW = "password"
    ROUTING_KEY = "key"
    EXCH = "exchange"
    TYPE = "type"
    PROFILE = "profile"


def test_ingest_complete(test_router: Router, caplog: LogCaptureFixture, capsys: CaptureFixture):
    """
    Try to capture ingest-complete message from archive.

    :param caplog: our log capturer

    :return:
    """

    caplog.set_level(logging.INFO)

    config = CapoConfig(profile="docker")
    amqp_settings = config.settings("edu.nrao.archive.configuration.AmqpServer")

    args = {
        "application": "archive_event",
        "fileset_id": "13B-014.sb29151475.eb29223944.56810.442529050924",
        "host": amqp_settings.hostname,
        "port": amqp_settings.port,
        "username": amqp_settings.username,
        "password": amqp_settings.password,
        "key": "ingestion-complete.#",
        "exchange": "archive.events",
        "type": "topic",
        "profile": "docker",
    }

    settings = args

    event_params = {
        "message": "Ingestion complete",
        "routing_key": "ingestion-complete.#",
        "project_code": "13B-014",
    }
    for key, val in event_params.items():
        settings[key] = val

    orig_cbs = test_router.callbacks

    try:
        test_router.callbacks = {
            '{"type": "topic"}': [ingestion_fake_callback],
        }
        message = json.dumps(settings)
        test_router.receive_message(message)

        received = capsys.readouterr().out
        # remove first and last line break, and get rid of the braces
        received = (
            received.replace("{\n", "{").replace("\n}\n", "}").replace("{", "").replace("}", "")
        )

        pprint(args)
        not_found = []
        items = received.split("\n")

        for skey, sval in settings.items():
            found = False
            for item in items:
                rkey, rval = item.split(": ")
                if rkey.replace('"', "") == skey and rval == sval:
                    found = True
                    break
            if not found:
                if skey not in event_params.keys():
                    not_found.append(skey)

        if len(not_found) > 0:
            raise ValueError(f"Not found in message received: {not_found}")

    finally:
        test_router.callbacks = orig_cbs


def ingestion_fake_callback(**message: Dict):
    """
    Pretend we're doing a callback and create the output we expect to receive.

    :param message:
    :return:
    """

    print("{")
    for key in ArchiveEventParam:
        print(f'"{key.value}": {message[key.value]}')
    print("}")