Skip to content
Snippets Groups Projects
Commit 3f884957 authored by Janet Goldstein's avatar Janet Goldstein Committed by Daniel Lyons
Browse files

WS-507: Last touches on MR333

* added documentation
* standardized names of manifest sections
* miscellaneous minor cleanup
parent d1f428c9
No related branches found
No related tags found
1 merge request!337ingestion manifest creation for EVLA CAL ingestion only
""" This is the entrypoint for ingestion launching """
import logging
import sys
from pathlib import Path
# pylint: disable=R0903
from ingest_envoy.ingestion_manifest_writer import EvlaCalIngestionManifestWriter
from ingest_envoy.utilities import ScienceProductType
......
......@@ -38,8 +38,15 @@ class EvlaCalIngestionManifestWriter:
def __init__(self, sp_type: ScienceProductType, ingest_path: Path):
self.sp_type = sp_type
self.ingest_path = ingest_path
self.input_group = self.output_group = self.infiles = None
def write_evla_cal_manifest(self, locator: str) -> Tuple[Path, List[Path]]:
"""
Write the manifest and associated ingestion files
:param locator: science product locator
:return:
"""
self.infiles = [file for file in self.ingest_path.iterdir()]
self.input_group = EvlaCalInputGroup(EvlaCalInputScienceProduct(locator))
......
""" Various 'n' sundry utilities for our tests """
# pylint: disable=E0401
# pylint: disable=E0401, R1721
from enum import Enum
from pathlib import Path
from typing import List
# pylint: disable=R1721
import pytest
from ingest_envoy.utilities import WEBLOG
......@@ -63,3 +63,13 @@ def populate_fake_ingest_path(staging_dir: Path) -> List[Path]:
files.append(path)
return files
class IngestionManifestKey(Enum):
"""Sections we expect to see in a manifest"""
INPUT_GROUP = "input_group"
OUPUT_GROUP = "output_group"
INGESTION_PATH = "ingestion_path"
SCIENCE_PRODUCTS = "science_products"
ANCILLARY_PRODUCTS = "ancillary_products"
""" Miscellaneous manifest-building tests """
import json
import logging
import shutil
import sys
from pathlib import Path
# pylint: disable=E0401, E0402
# pylint: disable=E0401, E0402, R1721
import pytest
from ingest_envoy.ingestion_manifest import IngestionManifest
from ingest_envoy.utilities import ScienceProductType
from .conftest import populate_fake_ingest_path, WANTED_FILENAMES, UNWANTED
from .conftest import populate_fake_ingest_path, WANTED_FILENAMES, UNWANTED, IngestionManifestKey
logger = logging.getLogger(IngestionManifest.__name__)
logger.setLevel(logging.INFO)
......@@ -17,7 +20,12 @@ logger.addHandler(logging.StreamHandler(sys.stdout))
def test_entry_point_for_evla_cal(ingest_path: Path):
"""
Confirm that the ingestion launcher entrypoint functions as expected.
:param ingest_path: fake tmp ingestion path
:return:
"""
populate_fake_ingest_path(ingest_path)
manifest = IngestionManifest(
ingest_path, ScienceProductType.EVLA_CAL.value, "uid://evla/calibration/meeniemyniemoe"
......@@ -27,7 +35,31 @@ def test_entry_point_for_evla_cal(ingest_path: Path):
assert manifest_file.exists()
files = [file for file in ingest_path.iterdir()]
assert len(files) == len(WANTED_FILENAMES) + len(UNWANTED) + 1
# TODO: make sure this is a real manifest, not just some fake file
# make sure manifest_file contains an IngestionManifest
with open(manifest_file, "r") as out:
manifest_content = dict(json.load(out).items())
assert len(manifest_content.keys()) >= len(IngestionManifestKey)
for key in IngestionManifestKey:
assert key.value in manifest_content.keys()
assert "EvlaCalInputGroup object" in manifest_content[IngestionManifestKey.INPUT_GROUP.value]
assert "EvlaCalOutputGroup object" in manifest_content[IngestionManifestKey.OUPUT_GROUP.value]
assert "PosixPath" in manifest_content[IngestionManifestKey.INGESTION_PATH.value]
science_products = manifest_content[IngestionManifestKey.SCIENCE_PRODUCTS.value]
assert science_products[0] == "["
assert science_products[-1] == "]"
assert "EvlaCalInputScienceProduct object" in science_products
ancillary_products = manifest_content[IngestionManifestKey.ANCILLARY_PRODUCTS.value]
assert ancillary_products[0] == "["
assert ancillary_products[-1] == "]"
assert "Weblog object" in ancillary_products
shutil.rmtree(ingest_path)
@pytest.mark.skip("TODO: test_builds_realfast_sdm_manifest")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment