diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py index fee031d5de309aa24fa63c94f947be77aab1f79d..b187e89191d5f2db6eb3a31020558c539f6ce3c6 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py @@ -1,26 +1,330 @@ """ This is the entrypoint for ingestion launching """ - +import abc +import json import logging +import re import sys +import tarfile from pathlib import Path -# pylint: disable=R0903 +# pylint: disable=E0401, R0903, R1721 +from typing import Tuple, List + +import pendulum +from pendulum import DateTime -from ingest_envoy.ingestion_manifest_writer import EvlaCalIngestionManifestWriter -from ingest_envoy.utilities import ScienceProductType +from ingest_envoy.manifest_components import ( + MANIFEST_NAME_BASE, + MANIFEST_NAME_EXT, + ARTIFACT_NAME, + ARTIFACT_EXT, + WEBLOG, + JSON, + IngestionManifestKey, + ManifestComponentIF, + InputScienceProduct, + InputGroup, + ManifestParameters, + OutputScienceProduct, + AncillaryProduct, + OutputGroup, + SCIENCE_PRODUCT_PATTERN, +) +from ingest_envoy.utilities import ( + ScienceProductType, + Telescope, + IngestionManifestException, + AncillaryProductType, + find_output_science_products, +) logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) logger.addHandler(logging.StreamHandler(sys.stdout)) +# pylint: disable=R0902, R0913 -class IngestionManifest: - """needed for ingestion-launching interface""" - def __init__(self, staging_source_dir: str, ingestion_type: str, locator: str): - self.ingest_path = Path(staging_source_dir) - self.sp_type = ScienceProductType.from_str(ingestion_type) +class ManifestIF(ManifestComponentIF): + """Interface for all ingestion manifests""" + + def __init__( + self, + telescope: Telescope, + sp_type: ScienceProductType, + staging_source_dir: Path, + locator: str, + # all except EVLA_EB and VLASS catalog manifest have input group + input_group: InputGroup, + # every manifest has at least one output group + output_group: OutputGroup, + ): + self.staging_source_dir = staging_source_dir + self.sp_type = sp_type self.locator = locator + self.input_group = input_group + self.output_group = output_group + self.telescope = telescope + + self.parameters = self.build_ingest_parameters() + + self.files_found = [file for file in self.staging_source_dir.iterdir()] + + @abc.abstractmethod + def create(self): + """ + Build and write the manifest, which includes gathering various items in + ingestion_path to get info for the manifest. + + :return: + """ + + @abc.abstractmethod + def write(self): + """ + Write this manifest to a file, along with the artifacts tar and any other files required + for this type of ingestion, at the ingest_path + + :param: location of files to be ingested, which is where we'll put the manifest + :return: + """ + raise NotImplementedError + + def __eq__(self, other): + if isinstance(other, IngestionManifest): + return other.input_group == self.input_group and other.output_group == self.output_group + + return False + + @abc.abstractmethod + def to_json(self) -> JSON: + """ + Turn this object into a JSON string suitable for writing to a file + + :return: + """ + raise NotImplementedError + + +class IngestionManifestBuilder: + """Builds ingestion manifest and associated files from files in ingestion_path""" + + def __init__( + self, + staging_source_dir: Path, + sp_type: ScienceProductType, + locator: str, + telescope: Telescope, + ): + self.telescope = telescope + self.staging_source_dir = staging_source_dir + self.sp_type = sp_type + self.locator = locator + self.files_found = [file for file in staging_source_dir.iterdir()] + if len(self.files_found) == 0: + raise IngestionManifestException(f"No ingestion files found at {staging_source_dir}") + + def build(self) -> Tuple[ManifestIF, Path]: + """ + Using only -relevant- files in ingestion_path, write the manifest + and produce other files required for ingestion. + + :return: the ingestion manifest and the file containing its JSON + """ + + # # create any other ingestion files needed for this type of ingestion + # self._find_additional_ingestion_files() + + # create the manifest + manifest = IngestionManifest( + telescope=self.telescope, + locator=self.locator, + sp_type=self.sp_type, + staging_source_dir=self.staging_source_dir, + input_group=self._build_input_group(), + output_group=self._build_output_group(), + ) + + manifest_file = manifest.write() + + self.write_ingestion_artifacts_tar() + + return manifest, manifest_file + + def _build_input_group(self): + """ + Create the input group using the parameters. + + :return: + """ + + # N.B. this is sufficient for most types of ingestion, + # but ALMA CALs will have multiple EB SPs, identified only by locator, + # and VLBAs have no input group at all. + sp_in = InputScienceProduct(sp_type=self.sp_type, locator=self.locator) + + return InputGroup([sp_in]) + + def _define_output_science_products(self): + sp_files = find_output_science_products(self.files_found, self.staging_source_dir) + sps_out = [OutputScienceProduct(self.sp_type, file.name) for file in sp_files] + return sps_out + + def _build_output_group(self) -> OutputGroup: + """ + Create the output group using the parameters. + + :return: + """ + + # find ancillary products, if any + ancillary_products = self._find_ancillary_products() + + tar_filename = self.build_artifacts_filename() + artifacts_ap = AncillaryProduct( + type=AncillaryProductType.PIPELINE_ARTIFACTS, filename=tar_filename + ) + ancillary_products.append(artifacts_ap) + + return OutputGroup(self._define_output_science_products(), ancillary_products) + + @staticmethod + def build_artifacts_filename() -> str: + """ + Build unique manifest filename in standard format. + + :return: the filename + """ + current_time = pendulum.now() + timestamp = format_timestamp(current_time) + return f"{ARTIFACT_NAME}{timestamp}{ARTIFACT_EXT}" + + def write_ingestion_artifacts_tar(self) -> Path: + """ + Take the list of files and build a tar for inclusion into the archive. + This happens in the staging area for ingestion. + The EVLA CAL tar will contain just the manifest. + + :return: a .tar archive of the ingestion artifacts + """ + ingestion_files = [file for file in self.staging_source_dir.iterdir() if file.is_file] + manifest_file = find_manifest(self.staging_source_dir) + ing_tar = self.staging_source_dir / self.build_artifacts_filename() + with tarfile.open(ing_tar, "w") as ingestion_artifacts_tar: + for file in ingestion_files: + ingestion_artifacts_tar.add(file) + + # include the manifest + if manifest_file not in ingestion_files: + ingestion_artifacts_tar.add(manifest_file) + + return ing_tar + + def _find_ancillary_products(self) -> List[AncillaryProduct]: + """ + Round up any ancillary files found in ingestion path + + :return: ancillary product(s) found + """ + + ancillary_products = [] + # if there's a weblog in here, grab it + maybe_weblogs = [file for file in self.files_found if file.name.endswith(WEBLOG)] + if len(maybe_weblogs) > 0: + weblog = maybe_weblogs[0] + weblog_ap = AncillaryProduct( + type=AncillaryProductType.PIPELINE_WEBLOG, filename=weblog.name + ) + ancillary_products.append(weblog_ap) + + more_aps = self._find_additional_ingestion_files() + if len(more_aps) > 0: + ancillary_products.append(more_aps) + + return ancillary_products + + def _find_additional_ingestion_files(self) -> List[Path]: + """ + Round up any other necessary ingestion file(s) + + :return: additional relevant files found in ingestion path, if any + """ + + if self.sp_type == ScienceProductType.EVLA_CAL: + # there won't be any others + return [] + + # TODO when the time comes: we'll have extra information for other ingestion types; + # see archive-metaproject + raise NotImplementedError + + +class IngestionManifest(ManifestIF): + """needed for ingestion-launching interface""" + + def build_ingest_parameters(self): + """ + Make the "parameters" section of the manifest + + :return: + """ + if self.sp_type != ScienceProductType.EVLA_CAL: + raise NotImplementedError() + + return ManifestParameters( + telescope=self.telescope, + reingest=False, + ngas_ingest=False, + calibrate=False, + staging_source_dir=self.staging_source_dir, + ) + + def _build_input_group(self): + """ + Create the input group using the parameters. + :return: + """ + # N.B. this is sufficient for most types of ingestion, + # but ALMA CALs will have multiple EB SPs, identified only by locator, + # and VLBAs have no input group at all. + sp_in = InputScienceProduct(sp_type=self.sp_type, locator=self.locator) + return InputGroup([sp_in]) + + def _build_output_group(self) -> OutputGroup: + """ + Create the output group using the parameters. + :return: + """ + sp_tar = self._find_science_product_tar() + find_output_science_products(self.files_found, self.staging_source_dir) + sps_out = [OutputScienceProduct(self.sp_type, sp_tar.name)] + + # find ancillary products, if any + ancillary_products = self._find_ancillary_products() + weblog = Path(self.ingestion_path / WEBLOG) + if weblog.exists(): + ancillary_products.append(AncillaryProduct(type=WEBLOG, filename=str(weblog))) + + return OutputGroup(sps_out) + + # @property + def ingestion_path(self) -> Path: + return self.parameters.ingestion_path + + def write(self) -> Path: + """ + Write the manifest .json file. + + :return: + """ + + output_path = self.staging_source_dir / build_manifest_filename() + + to_write = json.dumps(self.to_json(), indent=4) + with open(output_path, "w") as out: + out.write(to_write) + + return output_path def create(self): """ @@ -31,9 +335,85 @@ class IngestionManifest: """ if self.sp_type != ScienceProductType.EVLA_CAL: - return NotImplementedError( - f"Don't yet know how to handle {self.sp_type.value} science product" + raise NotImplementedError( + f"Don't yet know how to handle {self.sp_type.value} ingestion" ) - writer = EvlaCalIngestionManifestWriter(self.ingest_path) - writer.write_evla_cal_manifest(self.locator) + builder = IngestionManifestBuilder( + staging_source_dir=Path(self.staging_source_dir), + sp_type=self.sp_type, + locator=self.locator, + telescope=self.telescope, + ) + builder.build() + + def to_json(self) -> JSON: + """ + Turn this object into a JSON string suitable for writing to a file + + :return: + """ + + to_return = dict(self.__dict__) + + return { + "locator": to_return["locator"], + IngestionManifestKey.PARAMETERS.value: self.build_ingest_parameters().to_json(), + IngestionManifestKey.INGESTION_PATH.value: str(self.ingestion_path), + IngestionManifestKey.INPUT_GROUP.value: to_return[ + IngestionManifestKey.INPUT_GROUP.value + ].to_json(), + IngestionManifestKey.OUTPUT_GROUP.value: to_return[ + IngestionManifestKey.OUTPUT_GROUP.value + ].to_json(), + } + + def _find_science_product_tar(self) -> Path: + """ + A calibration ingestion staging dir should have ONE science product tar; ignore any others + + :return: + """ + files = [file for file in self.staging_source_dir.iterdir() if file.is_file] + for file in files: + if re.match(SCIENCE_PRODUCT_PATTERN, file.name): + return file + + raise FileNotFoundError(f"no science product found at {self.staging_source_dir}") + + +def format_timestamp(datetime: DateTime) -> str: + """ + Format the current time as + 2021_07_01T13_49_17.237 + + :param datetime: current timestamp + :return: timestamp suitable for ingestion manifest filename + """ + + return datetime.format("YYYY_MM_DDThh_mm_ss.SSS") + + +def build_manifest_filename() -> str: + """ + Build unique manifest filename in standard format. + + :return: the filename + """ + current_time = pendulum.now() + timestamp = format_timestamp(current_time) + return f"{MANIFEST_NAME_BASE}{timestamp}{MANIFEST_NAME_EXT}" + + +def find_manifest(ingestion_path: Path) -> Path: + """ + Find the ingestion manifest at this ingestion path. + + :param ingestion_path: home of ingestion files + :return: + """ + for file in ingestion_path.iterdir(): + if file.name.startswith(MANIFEST_NAME_BASE) and file.name.endswith(MANIFEST_NAME_EXT): + return file + + raise FileNotFoundError(f"No ingestion manifest found at {ingestion_path}") diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest_writer.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest_writer.py deleted file mode 100644 index 0329aa15db9b8639e8dfad3a791ffa1d23ab7540..0000000000000000000000000000000000000000 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest_writer.py +++ /dev/null @@ -1,207 +0,0 @@ -"""Build an ingestion manifest file""" -import json -import logging -import re -import sys -import tarfile -from pathlib import Path -from typing import List, Tuple, Dict - -# pylint: disable=E0401, E0402,R1721 -import pendulum -from pendulum import DateTime - -from .utilities import ( - MANIFEST_NAME_BASE, - MANIFEST_NAME_EXT, - ARTIFACT_NAME, - ARTIFACT_EXT, - WEBLOG, - SCIENCE_PRODUCT_PATTERN, - EvlaCalInputScienceProduct, - EvlaCalInputGroup, - EvlaCalOutputScienceProduct, - EvlaCalOutputGroup, -) - -logger = logging.getLogger(__name__) -logger.setLevel(logging.INFO) -logger.addHandler(logging.StreamHandler(sys.stdout)) - - -class EvlaCalIngestionManifestWriter: - """For building ingestion manifests""" - - # (science product type is always EVLA_CAL) - def __init__(self, ingest_path: Path): - self.ingest_path = ingest_path - self.input_group = self.output_group = self.infiles = None - - def write_evla_cal_manifest(self, locator: str) -> Tuple[Path, List[Path]]: - """ - Write the manifest and associated ingestion files - - :param locator: science product locator - :return: - """ - - self.infiles = [file for file in self.ingest_path.iterdir()] - self.input_group = EvlaCalInputGroup(EvlaCalInputScienceProduct(locator)) - self.output_group = self._make_evla_cal_output_group() - - # Pull out the manifest content and stringify it - manifest_content = json.dumps(self.content(), indent=4) - - manifest_filename = self.manifest_filename() - # Write the manifest to the staging area - staging_manifest = Path(self.ingest_path / manifest_filename) - with open(staging_manifest, "w") as out: - out.write(f"{manifest_content}\n") - - # Get all the files we'll need - addl_ingestion_files = self.find_additional_ingestion_files() - - # Write the artifacts tar. - artifacts_tar = self.write_ingestion_artifacts_tar(staging_manifest, addl_ingestion_files) - addl_ingestion_files.append(artifacts_tar) - - # return a Path explicitly; LocalPath won't work - return Path(staging_manifest), addl_ingestion_files - - def write_ingestion_artifacts_tar( - self, manifest_file: Path, ingestion_files: List[Path] - ) -> Path: - """ - Take the list of files and build a tar for inclusion into the archive. - This happens in the staging area for ingestion. - The EVLA CAL tar will contain just the manifest - - :param manifest_file: the ingestion manifest - :param ingestion_files: all the files needed for ingestion - :return: a .tar archive of the ingestion artifacts - """ - ingestion_artifacts_fn = self.ingest_path / "ingestion_artifacts.tar" - with tarfile.open(ingestion_artifacts_fn, "w") as ingestion_artifacts_tar: - for file in ingestion_files: - ingestion_artifacts_tar.add(file) - - # include the manifest - if manifest_file not in ingestion_files: - ingestion_artifacts_tar.add(manifest_file) - - return ingestion_artifacts_fn - - def _make_evla_cal_output_group(self): - """ - Create the JSON for the "output-group" section of the manifest. - An EVLA CAL OutputGroup contains a science product(s) and an ancillary product - (the weblog) - - :return: manifest output group - """ - sp_tar = self._find_science_product_tar() - science_product = EvlaCalOutputScienceProduct(str(sp_tar)) - weblog = Path(self.ingest_path / WEBLOG) - if weblog.exists(): - return EvlaCalOutputGroup(science_product, weblog) - - return EvlaCalOutputGroup(science_product, None) - - def content(self) -> Dict[str, str]: - """ - Accessor for manifest content - - :return: manifest as dict - """ - - params = { - "reingest": "false", - "ngas_ingest": "false", - "calibrate": "false", - "ingestion_path": str(self.ingest_path), - } - - json_out = dict( - parameters=params, - input_group=self.input_group.__json__(), - output_group=self.output_group.__json__(), - # associate_group=self.associate_group.__json__(), # TODO when we need it - ingestion_path=str(self.ingest_path), - ) - return json_out - - def _find_science_product_tar(self) -> Path: - """ - A calibration ingestion staging dir should have ONE science product tar; ignore any others - - :return: - """ - files = [file for file in self.ingest_path.iterdir() if file.is_file] - for file in files: - if re.match(SCIENCE_PRODUCT_PATTERN, file.name): - return file - - raise FileNotFoundError(f"no science product found at {self.ingest_path}") - - @staticmethod - def format_timestamp(datetime: DateTime) -> str: - """ - Format the current time as - 2021_07_01T13_49_17.237 - - :param datetime: current timestamp - :return: timestamp suitable for ingestion manifest filename - """ - - return datetime.format("YYYY_MM_DDThh_mm_ss.SSS") - - @staticmethod - def manifest_filename() -> str: - """ - Build unique manifest filename in standard format. - - :return: the filename - """ - current_time = pendulum.now() - timestamp = EvlaCalIngestionManifestWriter.format_timestamp(current_time) - return f"{MANIFEST_NAME_BASE}{timestamp}{MANIFEST_NAME_EXT}" - - @staticmethod - def artifact_filename() -> str: - """ - Build manifest and artifacts filenames. - - :return: the filenames - """ - current_time = pendulum.now() - timestamp = EvlaCalIngestionManifestWriter.format_timestamp(current_time) - artifact_filename = f"{ARTIFACT_NAME}{timestamp}{ARTIFACT_EXT}" - return artifact_filename - - def find_additional_ingestion_files(self) -> List[Path]: - """ - Gather the files required for ingestion. - There won't be any for EVLA CAL ingestion; this is just a placeholder. - - :return: ingestion inputs - """ - - return [] - - # TODO: we'll have extra information for other ingestion types - # coll_files = aux_files = [] - # - # if self.additional_metadata is not None: - # addl_md = json.loads(self.additional_metadata) - # aux_files.append(addl_md["filename"]) - # - # if self.collection_metadata is not None: - # coll_md = json.loads(self.collection_metadata) - # coll_files.append(coll_md["filename"]) - # - # aux_files += coll_files - # - # # be sure to add the manifest itself - # aux_files.append(self.manifest_filename) - # - # return list(set(aux_files)) diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py index 04dde6127cba46fc7d6be782eb6adc4a034527af..1abf5039cf0efd9e5d91c1105a19d3d3a61fbfdd 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py @@ -9,7 +9,7 @@ from ingest_envoy.interfaces import IngestLauncherIF class IngestCalibrationLauncher(IngestLauncherIF): def __init__(self, parameters: dict): self.logger = logging.getLogger("ingest_envoy") - self.ingestion_type = "calibration" + self.sci_product_type = "calibration" self.parameters = parameters self.staging_source_dir = ( self.parameters["staging_area"] + "/" + self.parameters["workflowDir"] @@ -57,5 +57,6 @@ class IngestCalibrationLauncher(IngestLauncherIF): def create_manifest(self): spl = self.parameters["spl"] + telescope = self.parameters["telescope"] - IngestionManifest(self.staging_source_dir, self.ingestion_type, spl).create() + IngestionManifest(self.staging_source_dir, self.sci_product_type, spl, telescope).create() diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/manifest_components.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/manifest_components.py new file mode 100644 index 0000000000000000000000000000000000000000..acdcd8ff3fe2a2c5cf20e04099daea60fe7fc63b --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/manifest_components.py @@ -0,0 +1,270 @@ +""" Pieces of any ingestion manifest """ + +import abc +import re +from enum import Enum +from pathlib import Path +from typing import Union, List, Dict + +from ingest_envoy.utilities import ScienceProductType, Telescope, AncillaryProductType + +MANIFEST_NAME_BASE = "ingestion_manifest_" +MANIFEST_NAME_EXT = ".json" +ARTIFACT_NAME = "ingestion_artifacts_" +ARTIFACT_EXT = ".tar" +WEBLOG = "weblog.tgz" +SCIENCE_PRODUCT_PATTERN = re.compile("[a-zA-Z0-9._\\-+]*\\.tar") +JSON = Union[int, float, str, List["JSON"], Dict[str, "JSON"]] + + +class IngestionManifestKey(Enum): + """Sections we expect to see in a manifest""" + + PARAMETERS = "parameters" + INPUT_GROUP = "input_group" + OUTPUT_GROUP = "output_group" + INGESTION_PATH = "ingestion_path" + SCIENCE_PRODUCTS = "science_products" + ANCILLARY_PRODUCTS = "ancillary_products" + + +class ParamsKey(Enum): + """Keys in ManifestParameters""" + + PARAMETERS = "parameters" + TELESCOPE = "telescope" + REINGEST = "reingest" + NGAS_INGEST = "ngas_ingest" + CALIBRATE = "calibrate" + INGESTION_PATH = IngestionManifestKey.INGESTION_PATH.value + + +class ManifestComponentIF(abc.ABC): + """Interface for components of an ingestion manifest (including the manifest itself)""" + + @abc.abstractmethod + def __eq__(self, other): + return NotImplemented + + @abc.abstractmethod + def to_json(self) -> JSON: + """ + JSON-ify this object + + :return: json.load()-able string + """ + return NotImplemented + + +class InputScienceProduct(ManifestComponentIF): + """Represents a science product in the "input-group" section of the ingestion manifest.""" + + def __init__(self, locator: str, sp_type: ScienceProductType = None): + self.type = sp_type + self.locator = locator + + def __eq__(self, other): + if isinstance(other, InputScienceProduct): + return other.type == self.type and other.locator == self.locator + + return False + + def to_json(self) -> JSON: + """ + Turn me into a json-ifiable dict + + :return: dicty-me + """ + if self.type: + return {"type": str(self.type), "locator": self.locator} + return {"locator": self.locator} + + +class InputGroup(ManifestComponentIF): + """Generic ingestion manifest input group""" + + def __init__(self, science_products: List[InputScienceProduct]): + self.science_products = science_products + + def __eq__(self, other): + if isinstance(other, InputGroup): + return other.science_products == self.science_products + + return False + + def to_json(self) -> JSON: + """ + Turn me into a json-ifiable dict + + :return: dicty-me + """ + sps = dict(self.__dict__)[IngestionManifestKey.SCIENCE_PRODUCTS.value] + sps = [sp.to_json() for sp in sps] + + return { + IngestionManifestKey.INPUT_GROUP.value: { + IngestionManifestKey.SCIENCE_PRODUCTS.value: sps + } + } + + +class ManifestParameters(ManifestComponentIF): + """Represents "parameters" section of ingestion manifest""" + + def __init__( + self, + telescope: Telescope, + reingest: bool, + ngas_ingest: bool, + calibrate: bool, + staging_source_dir: Path, + ): + self.telescope = telescope + self.reingest = reingest + self.ngas_ingest = ngas_ingest + self.calibrate = calibrate + self.staging_source_dir = staging_source_dir + + def __eq__(self, other): + if isinstance(other, ManifestParameters): + return ( + other.telescope.value == self.telescope.value + and other.reingest == self.reingest + and other.ngas_ingest == self.ngas_ingest + and other.calibrate == self.calibrate + and other.staging_source_dir == self.staging_source_dir + ) + + return False + + def to_json(self) -> JSON: + return { + ParamsKey.PARAMETERS.value: { + ParamsKey.TELESCOPE.value: str(self.telescope), + ParamsKey.REINGEST.value: self.reingest, + ParamsKey.NGAS_INGEST.value: self.ngas_ingest, + ParamsKey.CALIBRATE.value: self.calibrate, + ParamsKey.INGESTION_PATH.value: str(self.staging_source_dir), + } + } + + +class OutputScienceProduct(ManifestComponentIF): + """Generic science product contained in manifest output group""" + + def __init__(self, type: ScienceProductType, filename: str): + self.type = type + self.filename = filename + + def __eq__(self, other): + if isinstance(other, OutputScienceProduct): + return other.type == self.type and other.filename == self.filename + + return False + + def to_json(self) -> JSON: + return {"type": self.type.value, "filename": self.filename} + + +class AncillaryProduct(ManifestComponentIF): + """Represents an ancillary product in an ingestion manifest""" + + def __init__( + self, + type: AncillaryProductType, + filename: str, + science_associate: str = None, + group_with: str = None, + ): + self.type = type + self.filename = filename + self.science_associate = science_associate + self.group_with = group_with + + # make this an ancillary to a particular science product (assumes locator string) + if science_associate: + self.science_associate = science_associate # TODO, possibly: enum? + # make this an ancillary to the group of a science product (assumes locator string) + if group_with: + self.group_with = group_with + + def __eq__(self, other): + if isinstance(other, AncillaryProduct): + return ( + other.type == self.type + and other.filename == self.filename + and other.group_with == self.group_with + and other.science_associate == self.science_associate + ) + + return False + + def to_json(self) -> JSON: + """ + Turn me into a json-ifiable dict + + :return: dicty-me + """ + me_dict = dict(self.__dict__) + me_dict["type"] = self.type.value + clean_dict = {} + for key, val in me_dict.items(): + if val is not None: + clean_dict[key] = val + + return clean_dict + + +class OutputGroup(ManifestComponentIF): + """Generic ingestion manifest output group""" + + def __init__( + self, + science_products: List[OutputScienceProduct], + ancillary_products: List[AncillaryProduct] = None, + ): + self.science_products = science_products + self.ancillary_products = ancillary_products + + def __eq__(self, other): + if isinstance(other, OutputGroup): + return ( + other.science_products == self.science_products + and other.ancillary_products == self.ancillary_products + ) + + return False + + def to_json(self) -> JSON: + """ + Turn me into a json-ifiable dict + + :return: dicty-me + """ + me_dict = dict(self.__dict__) + + sps = me_dict[IngestionManifestKey.SCIENCE_PRODUCTS.value] + sps = [sp.to_json() for sp in sps] + me_dict[IngestionManifestKey.SCIENCE_PRODUCTS.value] = sps + + aps = me_dict[IngestionManifestKey.ANCILLARY_PRODUCTS.value] + if aps: + aps = [ap.to_json() for ap in aps] + me_dict[IngestionManifestKey.ANCILLARY_PRODUCTS.value] = aps + + return {IngestionManifestKey.OUTPUT_GROUP.value: me_dict} + + +class Weblog: + """Represents a weblog.tgz as an ancillary product""" + + def __init__(self, weblog_path: Path): + self.ancillary_product = {"type": "weblog", "filename": str(weblog_path)} + + def to_json(self) -> JSON: + """ + JSON-ify this object + + :return: json.load()-able string + """ + return dict(self.__dict__) diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/utilities.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/utilities.py index cad2a31dcfe9f645953129235dcf6c1911dd52f7..5dd4ca39be25d9edaf2dc84dfaa6692956eb8bf7 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/utilities.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/utilities.py @@ -2,27 +2,14 @@ from __future__ import annotations -import json -import re -import tarfile from enum import Enum -from pathlib import Path -from typing import List, Dict -# pylint: disable=E0401, R0903, R1721 -MANIFEST_NAME_BASE = "ingestion_manifest_" -MANIFEST_NAME_EXT = ".json" -ARTIFACT_NAME = "ingestion_artifacts_" -ARTIFACT_EXT = ".tar" - -WEBLOG = "weblog.tgz" - -SCIENCE_PRODUCT_PATTERN = re.compile("[a-zA-Z0-9._\\-+]*\\.tar") +# pylint: disable=E0401, R0903, R1721, W0622 class Telescope(Enum): - """Codifying the names of our telescopes, because Janet can't abide magic strings""" + """Codifying names of telescopes that appear in ingestion manifests""" VLA = "VLA" EVLA = "EVLA" @@ -30,41 +17,7 @@ class Telescope(Enum): VLBA = "VLBA" GBT = "GBT" NONE = "NONE" - - -class IngestionType(Enum): - """Types of ingestion we'll have to do""" - - # ALMA products - ALMA_SDM = Telescope.ALMA - ALMA_CAL = Telescope.ALMA - ALMA_AUDI = Telescope.ALMA - - # EVLA products - EVLA_SDM = Telescope.EVLA - EVLA_BDF = Telescope.EVLA - EVLA_CAL = Telescope.EVLA - - # RealFast projects - REALFAST_SDM = Telescope.EVLA - - # VLASS projects - VLASS_QUICKLOOK = Telescope.EVLA - - # VLBA ingestion. (IDI and UVFITS products are treated the same.) - VLBA_FITS = Telescope.VLBA - - # Coming Real Soon Now: VLBA Mark 4 product ingestion - VLBA_MARK4 = Telescope.VLBA - - # Also coming Real Soon: GBT execution block ingestion - GBT_EB = Telescope.GBT - - # Hot on its heels: LVLA execution block ingestion - LVLA_EB = Telescope.VLA - - # When we just don't know what we're dealing with - UNKNOWN = Telescope.NONE + UNKNOWN = "UNKNOWN" class ScienceProductType(Enum): @@ -75,29 +28,6 @@ class ScienceProductType(Enum): CATALOG = "catalog" IMAGE = "image" - def __str__(self): - return f'"{str(self.value)}"' - - @staticmethod - def from_str(sp_type_in) -> ScienceProductType: - """ - In comes a string; out comes the corresponding ScienceProductType, if any. - Or maybe it's already a ScienceProductType, in which case we can just return it. - - :param sp_type_in: a string that "should" represent a ScienceProductType - :return: - """ - if isinstance(sp_type_in, ScienceProductType): - return sp_type_in - - for spt in ScienceProductType: - if spt.value == sp_type_in: - return spt - - raise ValueError( - f"unrecognized ScienceProductType: {sp_type_in}; it's a {type(sp_type_in)}" - ) - class AncillaryProductType(Enum): """The various types of ancillary products we'll encounter""" @@ -119,205 +49,25 @@ class AncillaryProductType(Enum): CANDIDATE_IMG = "candidate_image" THUMBNAIL_IMG = "thumbnail_image" - def __str__(self): - return f'"{self.value}"' - - -class AncillaryProduct: - """Represents an ancillary product in an ingestion manifest""" - - def __init__( - self, ap_type: AncillaryProductType, filename: str, science_associate: str, group_with: str - ): - self.ap_type = ap_type - self.filename = filename - # make this an ancillary to a particular science product (assumes locator string) - self.science_associate = science_associate # TODO, possibly: enum? - # make this an ancillary to the group of a science product (assumes locator string) - self.group_with = group_with - - -def write_ingestion_artifact_tar(ingestion_location: Path, ingestion_files: List[Path]) -> Path: - """ - Take the list of files and build a tar for inclusion into the archive. - This happens in the staging area for ingestion. - - :param ingestion_location: path to ingestion location (probably the spool directory) - :param ingestion_files: all the files needed for ingestion - :return: a .tar archive of the ingestion artifacts - """ - ingestion_artifacts_tar = ingestion_location / "ingestion_artifacts.tar" - with tarfile.open(ingestion_artifacts_tar, "w") as ingestion_artifacts_tar: - for file in ingestion_files: - ingestion_artifacts_tar.add(file) - - return Path(ingestion_artifacts_tar.name) - -class EvlaCalInputScienceProduct: - """Represents the "science_product" in EvlaCalInputGroup""" +class InvalidLocatorException(Exception): + """Throw this if we're fed a bad science product locator""" - def __init__(self, locator: str): - self.locator = locator - self.type = ScienceProductType.EVLA_CAL - def __json__(self) -> Dict[str, str]: - json_out = self.__dict__ - json_out["type"] = ScienceProductType.EVLA_CAL.value - return json_out +class IngestionManifestException(Exception): + """Throw this if we're unable to construct an ingestion manifest using supplied inputs""" -class EvlaCalInputGroup: +def find_output_science_products(files_found, staging_source_dir): """ - This represents the starting point for processing which generated a science product. - - There is not always an input group for every output group (rawdata, for instance). - - Initial assumption: Input groups consist only of science products. - """ - - def __init__(self, science_product: EvlaCalInputScienceProduct): + Round up the output science products associated with this SP type. - # science product locator - self.science_products = [science_product] - - def __json__(self) -> Dict[str, str]: - """ - Create the "input-group" section of the manifest as a JSON string - - :return: JSONified InputGroup - """ - json_out = self.__dict__ - sps = json_out["science_products"] - sci_prod = sps[0] - sp_str = sci_prod if isinstance(sci_prod, str) else sci_prod.__json__() - json_out["science_products"] = f"[{sp_str}]" - - return json_out - - -class EvlaCalOutputScienceProduct: - """The science product in the output group""" - - def __init__(self, filename: str): - self.filename = filename - self.type = ScienceProductType.EVLA_CAL - - def __json__(self) -> Dict[str, str]: - json_out = self.__dict__ - json_out["type"] = ScienceProductType.EVLA_CAL.value - return json_out - - -class Weblog: - """Represents a weblog.tgz as an ancillary product""" - - def __init__(self, weblog_path: Path): - self.ancillary_product = {"type": "weblog", "filename": str(weblog_path)} - - def __json__(self) -> Dict[str, str]: - return dict(self.__dict__) - - -class EvlaCalOutputGroup: - """Represents result of data processing. Will have a single calibration tar - plus a weblog. + :return: """ + sp_files = [file for file in files_found if file.name.endswith(".tar")] + if len(sp_files) == 0: + raise IngestionManifestException( + f"No output science products found at {staging_source_dir}" + ) - def __init__(self, science_product: EvlaCalOutputScienceProduct, weblog: Path): - self.science_products = [science_product] - self.ancillary_products = [Weblog(weblog)] - - def __json__(self) -> Dict[str, str]: - """ - Create the "output-group" section of the manifest as a JSON string. - __json__() will not work; __repr__() is necessary for json.loads() to succeed. - - :return: JSONified OutputGroup - """ - - json_out = self.__dict__ - anc_prod = self.ancillary_products[0] - ap_str = anc_prod if isinstance(anc_prod, str) else anc_prod.__json__() - json_out[IngestionManifestKey.ANCILLARY_PRODUCTS.value] = f"[{ap_str}]" - sci_prod = self.science_products[0] - sp_str = sci_prod if isinstance(sci_prod, str) else sci_prod.__json__() - json_out[IngestionManifestKey.SCIENCE_PRODUCTS.value] = f"[{sp_str}]" - - return json_out - - -class EvlaCalIngestionManifest: - """TODO: this is JUST the ingestion manifest JSON, not a bespoke object""" - - def __init__(self, ingestion_path: Path, spl: str): - """ - - :param ingestion_path: staging directory - :param spl: science product locator - """ - self.ingestion_path = ingestion_path - self.locator = spl - self.sp_type = ScienceProductType.EVLA_CAL - - self.infiles = [file for file in self.ingestion_path.iterdir()] - self.input_group = EvlaCalInputGroup(EvlaCalInputScienceProduct(self.locator)) - self.output_group = self._make_output_group() - - def __str__(self): - params = self._make_params_section() - input_group = self.input_group.__json__() - output_group = self.output_group.__json__() - return f"{params}\n{input_group}\n{output_group}" - - def _make_params_section(self) -> str: - """ - Create the JSON for the "parameters" section of the manifest. - It's always the same for any EVLA CAL ingestion manifest, except for the ingestion path. - - :return: stringified JSON - """ - params = { - "parameters": { - "reingest": "false", - "ngas-ingest": "false", - "calibrate": "false", - "ingestion_path": str(self.ingestion_path), - } - } - return json.dumps(params) - - def _make_output_group(self) -> EvlaCalOutputGroup: - """ - Create the JSON for the "output-group" section of the manifest. - An EVLA CAL OutputGroup contains a science product(s) and an ancillary product - (the weblog) - - :return: manifest output group - """ - sp_tar = self._find_science_product_tar() - science_product = EvlaCalOutputScienceProduct(str(sp_tar)) - weblog = Path(self.ingestion_path / WEBLOG) - if weblog.exists(): - return EvlaCalOutputGroup(science_product, weblog) - - return EvlaCalOutputGroup(science_product, None) - - def _find_science_product_tar(self) -> Path: - """ - A calibration ingestion staging dir should have ONE science product tar; ignore any others - :return: - """ - for file in self.infiles: - if re.match(SCIENCE_PRODUCT_PATTERN, file.name): - return file - - -class IngestionManifestKey(Enum): - """Sections we expect to see in a manifest""" - - INPUT_GROUP = "input_group" - OUTPUT_GROUP = "output_group" - INGESTION_PATH = "ingestion_path" - SCIENCE_PRODUCTS = "science_products" - ANCILLARY_PRODUCTS = "ancillary_products" + return sp_files diff --git a/apps/cli/executables/pexable/ingest_envoy/test/conftest.py b/apps/cli/executables/pexable/ingest_envoy/test/conftest.py index 77f21016eea84a4a09a42ba968916d9e3fd38a20..800194aedd12bf16faf860c0adc0ebf8aa5703cd 100644 --- a/apps/cli/executables/pexable/ingest_envoy/test/conftest.py +++ b/apps/cli/executables/pexable/ingest_envoy/test/conftest.py @@ -7,14 +7,14 @@ from typing import List import pytest -from ingest_envoy.utilities import WEBLOG +from ingest_envoy.manifest_components import WEBLOG WANTED_FILENAMES = ["my_science_products.tar", WEBLOG] UNWANTED = ["ignore_me.fits", "just_a_lotta_nothing", "uninteresting_metadata.xml"] @pytest.fixture(scope="function") -def ingest_path(tmpdir: Path): +def ingest_path(tmpdir: Path) -> Path: """ Make an "ingestion path" for tests @@ -35,14 +35,14 @@ def find_example_manifest(manifest_name: str) -> Path: :param manifest_name: unique file identifier :return: full path to the manifest file """ - filename = manifest_name + "_manifest.json" + filename = manifest_name + ".json" for file in Path.cwd().rglob(filename): return file raise FileNotFoundError(filename) -def populate_fake_ingest_path(staging_dir: Path) -> List[Path]: +def populate_fake_evla_cal_ingest_path(staging_dir: Path) -> List[Path]: """ Create a directory containing fake calibration products, plus other stuff that we -don't- want to ingest. diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_eb_manifest.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/MANIFEST_EVLA_EB.json similarity index 59% rename from apps/cli/executables/pexable/ingest_envoy/test/examples/evla_eb_manifest.json rename to apps/cli/executables/pexable/ingest_envoy/test/examples/MANIFEST_EVLA_EB.json index 0a2ad890c042fabf420ee9e14fcb142f76000c33..bc88b3bcd97bd197625a797ed88f6e8a5dba5d4b 100644 --- a/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_eb_manifest.json +++ b/apps/cli/executables/pexable/ingest_envoy/test/examples/MANIFEST_EVLA_EB.json @@ -3,19 +3,19 @@ "reingest": false, "ngas_ingest": false, "telescope": "EVLA", - "ingestion_path": "/home/mchammer/evla/parallel-prod" + "ingestion_path": "/home/mchammer/evla/parallel-dev" }, "output_group": { "science_products": [ { "type": "execution_block", - "filename": "X_osro_000.59368.65423814815" + "filename": "19A-001.sb1234567890.eb233423545632.54321.894327984569" } ], "ancillary_products": [ { "type": "ingestion_artifacts", - "filename": "ingestion_artifacts_2021_06_03_T15_52_35.031.tar" + "filename": "ingestion_artifacts_2019_07_26_T10_49_44.890.tar" } ] } diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_ALMA_AOI.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_ALMA_AOI.json new file mode 100644 index 0000000000000000000000000000000000000000..717007a81c8cab6aefdbf412bbd5eaa15fce87f8 --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_ALMA_AOI.json @@ -0,0 +1,122 @@ +{ + "parameters": { + "reingest": false, + "ngas_ingest": false, + "telescope": "ALMA", + "ingestion_path": "/lustre/naasc/web/almapipe/pipeline/vatest/image_qa/NGC1333_IRAS_4A", + "additional_metadata": "AOI.json" + }, + "input_group": { + "science_products": [ + { + "locator": "uid://alma/calibration/d5d44d72-113f-48e0-9aa3-4b7b2646ea73" + } + ] + }, + "output_group": { + "science_products": [ + { + "type": "continuum_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw19_21_23_25.cont.I.tt0.pbcor.fits", + "ancillary_products": [ + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw19_21_23_25.cont.I.alpha.error.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw19_21_23_25.cont.I.alpha.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw19_21_23_25.cont.I.mask.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw19_21_23_25.cont.I.pb.tt0.fits" + } + ] + }, + { + "type": "continuum_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw19_21_23_25.cont.I.tt1.pbcor.fits" + }, + { + "type": "image_cube", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw25.cube.I.pbcor.fits", + "ancillary_products": [ + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw25.cube.I.mask.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw25.cube.I.pb.fits" + } + ] + } + ], + "ancillary_products": [ + { + "type": "pipeline_auxillary_tar", + "filename": "uid___A001_X133d_X1e64_pipeline_artifacts.tar" + }, + { + "type": "pipeline_weblog", + "filename": "uid___A001_X133d_X1e64.hifa_cubeimage.weblog.tgz" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw19.mfs.I.mask.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw19.mfs.I.pbcor.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw19.mfs.I.pb.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw21.mfs.I.mask.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw21.mfs.I.pbcor.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw21.mfs.I.pb.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw23.mfs.I.mask.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw23.mfs.I.pbcor.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw23.mfs.I.pb.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw25.mfs.I.mask.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw25.mfs.I.pbcor.fits" + }, + { + "type": "fits_image", + "filename": "uid___A001_X133d_X1e64.NGC1333_IRAS_4A_sci.spw25.mfs.I.pb.fits" + }, + { + "type": "ingestion_artifacts", + "filename": "ingestion_artifacts_2019_07_26_T08_57_32.535.tar" + } + ] + } +} diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_ALMA_Cal.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_ALMA_Cal.json new file mode 100644 index 0000000000000000000000000000000000000000..114ed9b8c3b32c41da30e3b64d50438859a49fb1 --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_ALMA_Cal.json @@ -0,0 +1,38 @@ +{ + "parameters": { + "reingest": true, + "ngas_ingest": false, + "telescope": "ALMA", + "ingestion_path": "." + }, + "input_group": { + "science_products": [ + { + "locator": "uid://alma/execblock/abcdefg" + }, + { + "locator": "uid://alma/execblock/hijklmno" + }, + { + "locator": "uid://alma/execblock/pqrstuvw" + }, + { + "locator": "uid://alma/execblock/xyz12345" + } + ] + }, + "output_group": { + "science_products": [ + { + "type": "naasc_calibration", + "filename": "uid://A001/X1234/X678" + } + ], + "ancillary_products": [ + { + "type": "ingestion_artifacts", + "filename": "ingestion_artifacts_2019_07_30_T13_07_52.515.tar" + } + ] + } +} diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_ALMA_EB.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_ALMA_EB.json new file mode 100644 index 0000000000000000000000000000000000000000..473427f838494cb28b5e067ca136e61d067ceb99 --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_ALMA_EB.json @@ -0,0 +1,22 @@ +{ + "parameters": { + "reingest": false, + "ngas_ingest": false, + "telescope": "ALMA", + "ingestion_path": "/users/jsheckar/projects/archive/workflow-all/workflow-server/./rawdata" + }, + "output_group": { + "science_products": [ + { + "type": "execution_block", + "filename": "uid___A032_Xffff_Xa01" + } + ], + "ancillary_products": [ + { + "type": "ingestion_artifacts", + "filename": "ingestion_artifacts_2019_07_26_T10_50_26.851.tar" + } + ] + } +} diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_EVLA_Cal.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_EVLA_Cal.json new file mode 100644 index 0000000000000000000000000000000000000000..43d5150387c77361a31ef58fdad34695ba98c955 --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_EVLA_Cal.json @@ -0,0 +1,33 @@ +{ + "parameters": { + "reingest": false, + "ngas_ingest": false, + "telescope": "EVLA", + "ingestion_path": "/lustre/yea/and/here/we/go" + }, + "input_group": { + "science_products": [ + { + "locator": "uid://evla/execblock/fjdsakljfkdlsajfkldsa" + } + ] + }, + "output_group": { + "science_products": [ + { + "type": "calibration", + "filename": "XYZ-abc+TMN.O00.tar" + } + ], + "ancillary_products": [ + { + "type": "pipeline_weblog", + "filename": "qrs.weblog.tgz" + }, + { + "type": "ingestion_artifacts", + "filename": "ingestion_artifacts_2019_07_25_T15_43_33.144.tar" + } + ] + } +} diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_Realfast_SDM.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_Realfast_SDM.json new file mode 100644 index 0000000000000000000000000000000000000000..b7ef437c6f6da713859a1342f92b274a427e33a7 --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_Realfast_SDM.json @@ -0,0 +1,40 @@ +{ + "parameters": { + "reingest": false, + "ngas_ingest": false, + "telescope": "EVLA", + "ingestion_path": "/Need/To/Decide/About/This", + "collection_metadata": "realfast.json" + }, + "output_group": { + "science_products": [ + { + "type": "execution_block", + "filename": "realfast_20A-333.place.holder.here", + "ancillary_products": [ + { + "type": "candidate_image", + "filename": "aaaa.png" + }, + { + "type": "candidate_image", + "filename": "bbbb.png" + } + ] + } + ], + "ancillary_products": [ + { + "type": "ingestion_artifacts", + "filename": "ingestion_artifacts_2020_07_17_T13_45_39.902.tar" + } + ] + }, + "associate_group": { + "science_products": [ + { + "locator": "uid://evla/execblock/abcd-efgh-ijkl-mnop" + } + ] + } +} diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_VLASS_QUICKLOOK.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_VLASS_QUICKLOOK.json new file mode 100644 index 0000000000000000000000000000000000000000..2e2dd550aa06b94867b7e84a06887d865779d757 --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_VLASS_QUICKLOOK.json @@ -0,0 +1,44 @@ +{ + "parameters": { + "reingest": false, + "ngas_ingest": false, + "telescope": "EVLA", + "ingestion_path": "/this/is/my/fake/path", + "additional_metadata": "T00t99.json" + }, + "input_group": { + "science_products": [ + { + "locator": "uid://evla/calibration/ABCDE-FGHIJKLMN-OPQRSTUVW-XYZ" + } + ] + }, + "output_group": { + "science_products": [ + { + "type": "quicklook_image", + "filename": "foo_bar-car.pbcor.tt5.subim.fits", + "ancillary_products": [ + { + "type": "quicklook_rms_image", + "filename": "foo_bar-car.pbcor.tt5.rms.subim.fits" + } + ] + } + ], + "ancillary_products": [ + { + "type": "pipeline_weblog", + "filename": "weblog.tgz" + }, + { + "type": "pipeline_auxillary_tar", + "filename": "ABC+DEFG_HIJK-LMNO.tar" + }, + { + "type": "ingestion_artifacts", + "filename": "ingestion_artifacts_2019_07_30_T13_03_00.936.tar" + } + ] + } +} diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_calibration_manifest.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_calibration_manifest.json deleted file mode 100644 index 22c06187772f13ccdd712556ccfcc2287f37cf52..0000000000000000000000000000000000000000 --- a/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_calibration_manifest.json +++ /dev/null @@ -1,30 +0,0 @@ -{ - "parameters": { - "reingest": "false", - "ngas-ingest": "false", - "calibrate": "false", - "ingestion_path": "/home/mchammer/evla/parallel-prod" - }, - "input-group": { - "science_products": [ - { - "type": "calibration", - "locator": "uid://evla/calibration/long-freakin-uuid-22" - } - ] - }, - "output-group": { - "science_products": [ - { - "type": "calibration", - "filename": "19A-321_2019_more_stuff.tar" - } - ], - "ancillary_products": [ - { - "type": "weblog", - "filename": "weblog.tgz" - } - ] - } -} diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/quicklook_manifest.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/quicklook_manifest.json deleted file mode 100644 index 72a30aec21e9b04a7f3008e7969e8dd78ab554be..0000000000000000000000000000000000000000 --- a/apps/cli/executables/pexable/ingest_envoy/test/examples/quicklook_manifest.json +++ /dev/null @@ -1,40 +0,0 @@ -{ - "parameters": { - "reingest": "false", - "ngas-ingest": "false", - "calibrate": "false", - "ingestion_path": "/lustre/aoc/cluster/pipeline/vlass_auto/cache/quicklook/VLASS1.1/T29t04/VLASS1.1.ql.T29t04.J094850+743000.10.2048.v5" - }, - "input-group": { - "science_products": [ - { - "type": "calibration", - "locator": "uid://evla/calibration/a47c2e78-4f4e-4516-ab95-8bbb4057e9bb" - }, - { - "type": "execution_block", - "locator": "uid://evla/execblock/52dd9e10-63fb-4fa8-b6ff-fcf6240b97f4" - } - ] - }, - "output-group": { - "science_products": [ - { - "type": "quicklook_image", - "filename": "VLASS1.1.ql.T29t04.J094850+743000.10.2048.v5.I.iter1.image.pbcor.tt0.subim.fits", - "ancillary-products": [ - { - "type": "quicklook_rms", - "filename": "VLASS1.1.ql.T29t04.J094850+743000.10.2048.v5.I.iter1.image.pbcor.tt0.rms.subim.fits" - } - ] - } - ], - "ancillary-products": [ - { - "type": "tar", - "filename": "VLASS1.1.ql.T29t04.J094850+743000.10.2048.v5.tar" - } - ] - } -} diff --git a/apps/cli/executables/pexable/ingest_envoy/test/test_evla_cal_manifest.py b/apps/cli/executables/pexable/ingest_envoy/test/test_evla_cal_manifest.py deleted file mode 100644 index 4c27e1c60f4e0ca6b244345d08644e0439df31ac..0000000000000000000000000000000000000000 --- a/apps/cli/executables/pexable/ingest_envoy/test/test_evla_cal_manifest.py +++ /dev/null @@ -1,268 +0,0 @@ -""" Tests for EVLA calibration ingestion manifest generation ONLY """ - -import json -import logging -import re -import shutil -import sys -from pathlib import Path - -# pylint: disable=E0401, E0402, R0914 - -from ingest_envoy.ingestion_manifest import IngestionManifest -from ingest_envoy.ingestion_manifest_writer import ( - EvlaCalIngestionManifestWriter, -) -from ingest_envoy.utilities import ( - WEBLOG, - ScienceProductType, - MANIFEST_NAME_BASE, - MANIFEST_NAME_EXT, - EvlaCalOutputScienceProduct, - Weblog, - EvlaCalIngestionManifest, - IngestionManifestKey, -) -from .conftest import ( - find_example_manifest, - populate_fake_ingest_path, - WANTED_FILENAMES, - UNWANTED, -) - -logger = logging.getLogger(IngestionManifest.__name__) -logger.setLevel(logging.INFO) -logger.addHandler(logging.StreamHandler(sys.stdout)) - - -# pylint: disable=E0402, R1721 - - -def test_creates_empty_evla_cal_manifest(ingest_path: Path): - """ - Make sure we create the manifest we expect. - - :param ingest_path: ingestion location - :return: - """ - manifest = EvlaCalIngestionManifest(ingest_path, "") - - sps_in = manifest.input_group.science_products - assert len(sps_in) == 1 - sp_in = sps_in[0] - assert sp_in.type == ScienceProductType.EVLA_CAL - assert sp_in.locator == "" - - outgrp = manifest.output_group - sps_out = outgrp.science_products - assert len(sps_out) == 1 - sp_out = sps_out[0] - assert sp_out.type == sp_in.type - - weblogs = outgrp.ancillary_products - assert len(weblogs) == 1 - weblog = weblogs[0] - assert isinstance(weblog, Weblog) - ap_out = weblog.ancillary_product - assert ap_out["type"] == "weblog" - assert ap_out["filename"] == "None" - - ingest_path.rmdir() - - -def test_creates_expected_manifest(ingest_path: Path): - """ - Make sure we create the manifest we expect. - - :param ingest_path: ingestion location - :return: - """ - - locator = "uid://evla/calibration/abcdefg_2_bubble_bubble" - all_files = populate_fake_ingest_path(ingest_path) - manifest = EvlaCalIngestionManifest(ingest_path, locator) - sps_in = manifest.input_group.science_products - assert len(sps_in) == 1 - sp_in = sps_in[0] - assert sp_in.type == ScienceProductType.EVLA_CAL - assert sp_in.locator == locator - - outgrp = manifest.output_group - sps_out = outgrp.science_products - assert len(sps_out) == 1 - sp_out = sps_out[0] - assert isinstance(sp_out, EvlaCalOutputScienceProduct) - assert sp_out.type == sp_in.type - - files_after = [file for file in ingest_path.iterdir()] - # nothing should have been removed from staging dir - assert len(files_after) == len(all_files) - assert Path(sp_out.filename) in files_after - - aps = outgrp.ancillary_products - assert len(aps) == 1 - anc_prod = aps[0] - assert isinstance(anc_prod, Weblog) - assert Path(ingest_path / WEBLOG).exists() - - shutil.rmtree(ingest_path) - - -def test_writes_real_manifest_to_file(ingest_path: Path): - """ - We should get a pretty, formatted, human-readable JSON text file - - :param ingest_path: the staging dir - :return: - """ - - populate_fake_ingest_path(ingest_path) - - writer = EvlaCalIngestionManifestWriter(ingest_path=ingest_path) - locator = "uid://evla/calibration/my_devastating_observation" - manifest_file, _ = writer.write_evla_cal_manifest(locator) - - with open(manifest_file, "r") as mf_in: - manifest_content = dict(json.load(mf_in).items()) - - assert len(manifest_content.keys()) >= len(IngestionManifestKey) - 1 - for key in ["parameters", "input_group", "output_group"]: - assert key in manifest_content.keys() - - shutil.rmtree(ingest_path) - - -def test_builds_expected_manifest_filename(ingest_path: Path): - """ - We expect the manifest to be named like - - ingestion_manifest_2019_07_30_T13_03_00.936.json - - :param ingest_path: ingestion location - :return: - """ - - filename = EvlaCalIngestionManifestWriter.manifest_filename() - - assert filename.startswith(MANIFEST_NAME_BASE) - assert filename.endswith(MANIFEST_NAME_EXT) - - filename_parts = filename.split("_") - assert len(filename_parts) == 7 - - # get just the timestamp - timestamp = filename.replace(MANIFEST_NAME_BASE, "").replace(MANIFEST_NAME_EXT, "") - - # we should have gotten year, month, day, hours, minutes, seconds to 3 decimal places - assert re.match(r"\d{4}_\d{2}_\d{2}T\d{2}_\d{2}_\d{2}\.\d{0,3}", timestamp) - - shutil.rmtree(ingest_path) - - -def test_writes_expected_output_files(ingest_path: Path): - """ - We expect to get the manifest file, the weblog, and the science product tar. - - :param ingest_path: ingestion location - :return: - """ - - populate_fake_ingest_path(ingest_path) - - example_manifest = find_example_manifest("evla_calibration") - content = "" - with open(example_manifest, "r") as infile: - for line in infile.readlines(): - content += line - mf_json = json.loads(content) - - ig_in = mf_json["input-group"] - ig_sp = ig_in["science_products"][0] - - ip_in = mf_json["parameters"]["ingestion_path"] - assert ip_in == "/home/mchammer/evla/parallel-prod" - - writer = EvlaCalIngestionManifestWriter(ingest_path=ingest_path) - manifest_file, more_ingestion_files = writer.write_evla_cal_manifest(ig_sp["locator"]) - assert manifest_file.exists() - for file in more_ingestion_files: - assert file.exists() - assert len(more_ingestion_files) == 1 - - # make sure that one file is the artifacts tar - file = more_ingestion_files[0] - assert file.exists() - assert file.name.startswith("ingestion_artifacts") and file.name.endswith(".tar") - - shutil.rmtree(ingest_path) - - -def test_filters_cal_input_files(ingest_path: Path): - """ - We'll be getting calibration products from a directory under - /lustre/aoc/cluster/pipeline/{CAPO_PROFILE}/workspaces/staging - - Make sure we take -only- the files to be ingested. - - :param ingest_path: our temporary dir - :return: - """ - populate_fake_ingest_path(ingest_path) - - writer = EvlaCalIngestionManifestWriter(ingest_path=ingest_path) - locator = "uid://evla/calibration/im_a_one-touch_espresso_machine" - manifest_file, _ = writer.write_evla_cal_manifest(locator) - - with open(manifest_file, "r") as mf_in: - manifest_content = dict(json.load(mf_in).items()) - input_group = manifest_content[IngestionManifestKey.INPUT_GROUP.value] - assert isinstance(input_group, dict) - assert len(input_group) == 1 - - for val in input_group.values(): - sci_prod = val.replace("'", '"', len(val)) - sps = json.loads(sci_prod) - - assert isinstance(sps, list) - assert len(sps) == 1 - sci_prod = sps[0] - assert sci_prod["type"] == ScienceProductType.EVLA_CAL.value - assert sci_prod["locator"] == locator - - output_group = manifest_content[IngestionManifestKey.OUTPUT_GROUP.value] - assert isinstance(output_group, dict) - assert len(output_group) == 2 - - for key, val in output_group.items(): - if key == IngestionManifestKey.SCIENCE_PRODUCTS.value: - sci_prod = val.replace("'", '"', len(val)) - sps = json.loads(sci_prod) - assert len(sps) == 1 - else: - assert key == IngestionManifestKey.ANCILLARY_PRODUCTS.value - anc_prod = val.replace("'", '"', len(val)) - aps = json.loads(anc_prod) - assert len(aps) == 1 - - for sci_prod in sps: - filename = Path(sci_prod["filename"]).name - file = Path(ingest_path / filename) - assert file.exists() - assert file.parent == ingest_path - assert filename in WANTED_FILENAMES - assert filename not in UNWANTED - - anc_prod_dict1 = aps[0] - for key, val in anc_prod_dict1.items(): - anc_prod_dict2 = val - - assert isinstance(anc_prod_dict2, dict) - for key, val in anc_prod_dict2.items(): - if key == "type": - assert val == "weblog" - else: - assert key == "filename" - file = Path(ingest_path / val) - assert file.exists() - - shutil.rmtree(ingest_path) diff --git a/apps/cli/executables/pexable/ingest_envoy/test/test_manifest_if.py b/apps/cli/executables/pexable/ingest_envoy/test/test_manifest_if.py new file mode 100644 index 0000000000000000000000000000000000000000..b7dff537a6cf4d56024a9743e6daec8c4e397081 --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/test/test_manifest_if.py @@ -0,0 +1,454 @@ +""" Tests for generic and EVLA CAL manifests """ + +# TODO when the time comes: tests of ingestion manifests for each remaining ingestion type +# as they're implemented, in separate modules so they don't get too unwieldy + +import json +import logging +import re +import shutil +import sys +from pathlib import Path + +# pylint: disable=E0401, E0402, R1721, W0621 + +import pytest + +from ingest_envoy.ingestion_manifest import ( + IngestionManifestBuilder, + build_manifest_filename, + find_manifest, +) +from ingest_envoy.manifest_components import ( + MANIFEST_NAME_BASE, + MANIFEST_NAME_EXT, + IngestionManifestKey, + ParamsKey, + InputScienceProduct, + InputGroup, + ManifestParameters, + OutputScienceProduct, + AncillaryProduct, + OutputGroup, + ARTIFACT_EXT, + ARTIFACT_NAME, +) +from ingest_envoy.utilities import ( + ScienceProductType, + AncillaryProductType, + Telescope, +) + +# pylint: disable=E0401, E1120 + +# ingest_path is NOT unused! Don't let IJ remove the import! +from .conftest import ingest_path, populate_fake_evla_cal_ingest_path, WANTED_FILENAMES, UNWANTED + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +logger.addHandler(logging.StreamHandler(sys.stdout)) + +FAKE_LOCATOR = "uid://evla/calibration/doo-wah-ditty-ditty-af123" + + +def test_manifest_is_complete(ingest_path: Path): + """ + Most ingestion manifests should have parameters, an input group, and an output group. + An output group will contain one or more science products, and sometimes ancillary products. + + :return: + """ + + populate_fake_evla_cal_ingest_path(ingest_path) + assert isinstance(ingest_path, Path) + params_expected = ManifestParameters( + telescope=Telescope.EVLA, + ngas_ingest=False, + reingest=False, + calibrate=False, + staging_source_dir=ingest_path, + ) + + sp1 = InputScienceProduct( + sp_type=ScienceProductType.EVLA_CAL, + locator=FAKE_LOCATOR, + ) + + ig_in = InputGroup(science_products=[sp1]) + osp_in = OutputScienceProduct( + type=ScienceProductType.EVLA_CAL, filename="my_science_products.tar" + ) + ap_in = AncillaryProduct(type=AncillaryProductType.PIPELINE_WEBLOG, filename="weblog.tgz") + + manifest, _ = IngestionManifestBuilder( + staging_source_dir=ingest_path, + telescope=Telescope.EVLA, + sp_type=ScienceProductType.EVLA_CAL, + locator=FAKE_LOCATOR, + ).build() + + assert manifest.parameters == params_expected + assert manifest.input_group == ig_in + assert manifest.output_group.science_products[0] == osp_in + assert ap_in in manifest.output_group.ancillary_products + + af_tar_candidates = [ + file + for file in ingest_path.iterdir() + if file.name.startswith(ARTIFACT_NAME) and file.name.endswith(ARTIFACT_EXT) + ] + assert len(af_tar_candidates) == 1 + + shutil.rmtree(ingest_path) + + +def test_builds_expected_manifest_filename(): + """ + We expect the manifest to be named like + + ingestion_manifest_2019_07_30_T13_03_00.936.json + + :return: + """ + filename = build_manifest_filename() + + assert filename.startswith(MANIFEST_NAME_BASE) + assert filename.endswith(MANIFEST_NAME_EXT) + + filename_parts = filename.split("_") + assert len(filename_parts) == 7 + + # get just the timestamp + timestamp = filename.replace(MANIFEST_NAME_BASE, "").replace(MANIFEST_NAME_EXT, "") + + # we should have gotten year, month, day, hours, minutes, seconds to 3 decimal places + assert re.match(r"\d{4}_\d{2}_\d{2}T\d{2}_\d{2}_\d{2}\.\d{0,3}", timestamp) + + +def test_filters_cal_input_files(ingest_path: Path): + """ + We'll be getting calibration/image/eb, etc. science products from a directory under + /lustre/aoc/cluster/pipeline/{CAPO_PROFILE}/workspaces/staging + + Make sure we take -only- the files to be ingested. + + :param ingest_path: our temporary dir + """ + + populate_fake_evla_cal_ingest_path(ingest_path) + locator = "uid://evla/calibration/twinkle-twinkle-little-quasar" + manifest, _ = IngestionManifestBuilder( + telescope=Telescope.EVLA, + staging_source_dir=ingest_path, + sp_type=ScienceProductType.EVLA_CAL, + locator=locator, + ).build() + + # does manifest contain all the data it should, and none of the data it shouldn't? + + assert manifest.locator == locator + params = manifest.parameters + assert not params.reingest and not params.ngas_ingest and not params.calibrate + + input_group = manifest.input_group + assert len(input_group.science_products) == 1 + sp_in = input_group.science_products[0] + assert sp_in.type == ScienceProductType.EVLA_CAL + + output_group = manifest.output_group + assert len(output_group.science_products) == 1 + assert len(output_group.ancillary_products) == 2 + for product in output_group.ancillary_products: + if product.filename not in WANTED_FILENAMES: + assert product.filename.startswith(ARTIFACT_NAME) and product.filename.endswith( + ARTIFACT_EXT + ) + assert product.filename not in UNWANTED + + sp_out = output_group.science_products[0] + assert sp_out.type == ScienceProductType.EVLA_CAL + + assert sp_out.filename in WANTED_FILENAMES + assert sp_out.filename not in UNWANTED + + shutil.rmtree(ingest_path) + + +def test_writes_expected_output_files(ingest_path: Path): + """ + Did the manifest builder produce the manifest file, the weblog, and the science product tar? + + :param ingest_path: ingestion location + :return: + """ + populate_fake_evla_cal_ingest_path(ingest_path) + manifest_file, manifest = IngestionManifestBuilder( + telescope=Telescope.EVLA, + staging_source_dir=ingest_path, + locator="uid://evla/calibration/fee-fi-fo-fum-acdf23", + sp_type=ScienceProductType.EVLA_CAL, + ).build() + assert manifest_file + assert manifest + + ingestion_files = [file for file in ingest_path.iterdir()] + + # at a minimum, we expect the manifest, the ingestion artifact, and the science products tar + assert len(ingestion_files) >= 3 + mf_json = [ + file + for file in ingestion_files + if file.name.startswith(MANIFEST_NAME_BASE) and file.name.endswith(MANIFEST_NAME_EXT) + ][0] + assert mf_json + tars = [file for file in ingestion_files if file.name.endswith(".tar")] + assert len(tars) >= 2 + + shutil.rmtree(ingest_path) + + +def test_params_json_well_formed(): + """ + Make sure our ManifestParameters makes nice JSON + + :return: + """ + telescope = Telescope.EVLA + + params_dict = { + ParamsKey.PARAMETERS.value: { + ParamsKey.TELESCOPE.value: telescope, + ParamsKey.REINGEST.value: False, + ParamsKey.NGAS_INGEST.value: False, + ParamsKey.CALIBRATE.value: False, + ParamsKey.INGESTION_PATH.value: "/home/mchammer/evla/parallel-prod", + } + } + param_values_dict = params_dict[ParamsKey.PARAMETERS.value] + + params = ManifestParameters( + telescope=param_values_dict[ParamsKey.TELESCOPE.value], + reingest=param_values_dict[ParamsKey.REINGEST.value], + ngas_ingest=param_values_dict[ParamsKey.NGAS_INGEST.value], + calibrate=param_values_dict[ParamsKey.CALIBRATE.value], + staging_source_dir=param_values_dict[ParamsKey.INGESTION_PATH.value], + ) + + params_json = params.to_json() + for key, val in params_json.items(): + assert ( + val == params_dict[key] if isinstance(params_dict[key], bool) else str(params_dict[key]) + ) + + +@pytest.mark.skip("TODO") +def test_params_properly_formatted(): + """ + TODO + :return: + """ + raise NotImplementedError + + +def test_input_sp_well_formed(): + """ + Make sure our InputScienceProduct makes nice JSON + + :return: + """ + locator = "uid://evla/calibration/vanilla_heath_bar_crunch_1a23e" + # single science product + sp_dict = { + "type": ScienceProductType.EVLA_CAL.value, + "locator": locator, + } + + sp_in = InputScienceProduct(sp_type=ScienceProductType.EVLA_CAL.value, locator=locator) + assert sp_in.to_json() == sp_dict + + +def test_input_group_well_formed(): + """ + Make sure our InputGroup makes nice JSON + + :return: + """ + sp1 = InputScienceProduct( + sp_type=ScienceProductType.EXEC_BLOCK.value, + locator="uid://evla/execblock/coffee_heath_bar_crunch_7a23f", + ) + sp1_json = sp1.to_json() + + sp2 = InputScienceProduct( + sp_type=ScienceProductType.EVLA_CAL.value, locator="uid://evla/execblock/mint_oreo_omg_omg" + ) + sp2_json = sp2.to_json() + + expected = { + IngestionManifestKey.INPUT_GROUP.value: { + IngestionManifestKey.SCIENCE_PRODUCTS.value: [sp1_json, sp2_json] + } + } + + ingroup = InputGroup(science_products=[sp1, sp2]) + actual = ingroup.to_json() + assert actual.keys() == expected.keys() + + actual = actual[IngestionManifestKey.INPUT_GROUP.value] + expected = expected[IngestionManifestKey.INPUT_GROUP.value] + assert actual.keys() == expected.keys() + + actual = actual[IngestionManifestKey.SCIENCE_PRODUCTS.value] + expected = expected[IngestionManifestKey.SCIENCE_PRODUCTS.value] + assert len(actual) == len(expected) == 2 + marvin = actual[0] + trillian = expected[0] + for key, _ in marvin.items(): + assert trillian[key] == marvin[key] + + marvin = actual[1] + trillian = expected[1] + for key, _ in marvin.items(): + assert trillian[key] == marvin[key] + + +def test_ancillary_product_well_formed(): + """ + The JSON shouldn't contain empty fields + + :return: + """ + ap1 = AncillaryProduct(type=AncillaryProductType.LOG, filename="without_feathers.tar") + expected = {"type": AncillaryProductType.LOG.value, "filename": "without_feathers.tar"} + actual = ap1.to_json() + + assert actual == expected + + +def test_output_group_well_formed(): + """ + Make sure our OutputScienceProduct makes nice JSON + + :return: + """ + sp1 = OutputScienceProduct(type=ScienceProductType.IMAGE, filename="see_no_evil.fits") + sp2 = OutputScienceProduct(type=ScienceProductType.IMAGE, filename="hear_no_evil.fits") + ap1 = AncillaryProduct( + type=AncillaryProductType.PIPELINE_ARTIFACTS, filename="without_feathers.tar" + ) + ap2 = AncillaryProduct( + type=AncillaryProductType.PIPELINE_ARTIFACTS, filename="with_feathers.tar" + ) + opg = OutputGroup(science_products=[sp1, sp2], ancillary_products=[ap1, ap2]) + opg_json = opg.to_json() + dumped = json.dumps(opg_json) + + assert ( + dumped == '{"output_group": ' + '{"science_products": ' + '[{"type": "image", "filename": "see_no_evil.fits"}, ' + '{"type": "image", "filename": "hear_no_evil.fits"}], ' + '"ancillary_products": [{"type": "pipeline_artifacts", ' + '"filename": "without_feathers.tar"}, ' + '{"type": "pipeline_artifacts", "filename": "with_feathers.tar"}]}}' + ) + + +@pytest.mark.skip("TODO, or maybe not") +def test_input_group_properly_formatted(): + """ + Does the InputGroup get written to the file in the expected format? + + :return: + """ + sp1 = InputScienceProduct( + sp_type=ScienceProductType.EXEC_BLOCK.value, + locator="uid://evla/execblock/coffee_heath_bar_crunch_7a23f", + ) + + ingroup = InputGroup(science_products=[sp1]) + ig_dict = json.loads(ingroup.to_json()) + ig_text = json.dumps(ig_dict, indent=4) + expected = """ + "input-group": { + "science_products": [ + { + "type": "calibration", + "locator": "uid://evla/execblock/coffee_heath_bar_crunch_7a23f" + } + ] + }, + """ + assert ig_text == expected + + +@pytest.mark.skip("TODO") +def test_output_group_properly_formatted(): + """ + TODO + Does the OutputGroup get written to the file in the expected format? + + :return: + """ + raise NotImplementedError + + +def test_builds_cal_manifest_as_expected(ingest_path: Path): + """ + When we create an EVLA calibration ingestion manifest, does it contain all it should? + We'll make a manifest that should look like our example and make sure it does. + + :return: + """ + + populate_ingest_path_for_manifest_evla_cal_example(ingest_path) + + locator = "uid://evla/execblock/fjdsakljfkdlsajfkldsa" + IngestionManifestBuilder( + telescope=Telescope.EVLA, + staging_source_dir=ingest_path, + sp_type=ScienceProductType.EVLA_CAL, + locator=locator, + ).build() + + manifest_file = find_manifest(ingest_path) + with open(manifest_file, "r") as mf_in: + manifest_content = dict(json.load(mf_in).items()) + + # check parameters + parameters = manifest_content["parameters"]["parameters"] + for param in ["reingest", "ngas_ingest", "calibrate"]: + assert parameters[param] is False + assert parameters[ParamsKey.INGESTION_PATH.value] == str(ingest_path) + + # check input group + input_group = manifest_content[IngestionManifestKey.INPUT_GROUP.value][ + IngestionManifestKey.INPUT_GROUP.value + ] + assert len(input_group[IngestionManifestKey.SCIENCE_PRODUCTS.value]) == 1 + science_product = input_group[IngestionManifestKey.SCIENCE_PRODUCTS.value][0] + assert science_product["locator"] == locator + + # check output group + output_group = manifest_content[IngestionManifestKey.OUTPUT_GROUP.value][ + IngestionManifestKey.OUTPUT_GROUP.value + ] + science_products = output_group[IngestionManifestKey.SCIENCE_PRODUCTS.value] + assert len(science_products) == 1 + ancillary_products = output_group[IngestionManifestKey.ANCILLARY_PRODUCTS.value] + assert len(ancillary_products) == 2 + + shutil.rmtree(ingest_path) + + +def populate_ingest_path_for_manifest_evla_cal_example(ingestion_path: Path): + """ + Create fake input files to match EVLA CAL manifest example + + :param ingestion_path: + :return: + """ + weblog_file = ingestion_path / "qrs.weblog.tgz" + weblog_file.touch() + cal_file = ingestion_path / "XYZ-abc+TMN.O00.tar" + cal_file.touch() diff --git a/apps/cli/executables/pexable/ingest_envoy/test/test_miscellaneous_manifests.py b/apps/cli/executables/pexable/ingest_envoy/test/test_miscellaneous_manifests.py index fc043286d6880653b80d718174a8c2d6a57c5876..4c85e801e40326dd8ba9b64874be8543d93e8aa5 100644 --- a/apps/cli/executables/pexable/ingest_envoy/test/test_miscellaneous_manifests.py +++ b/apps/cli/executables/pexable/ingest_envoy/test/test_miscellaneous_manifests.py @@ -1,18 +1,25 @@ """ Miscellaneous manifest-building tests """ -import json import logging -import shutil import sys from pathlib import Path -# pylint: disable=E0401, E0402, R1721 +# pylint: disable=E0401, E0402, R1721, W0611, W0621 import pytest -from ingest_envoy.ingestion_manifest import IngestionManifest -from ingest_envoy.utilities import ScienceProductType, IngestionManifestKey -from .conftest import populate_fake_ingest_path, WANTED_FILENAMES, UNWANTED +from ingest_envoy.ingestion_manifest import ( + IngestionManifest, + IngestionManifestBuilder, +) +from ingest_envoy.manifest_components import ( + MANIFEST_NAME_BASE, + MANIFEST_NAME_EXT, + ARTIFACT_NAME, + ARTIFACT_EXT, +) +from ingest_envoy.utilities import ScienceProductType, Telescope +from .conftest import ingest_path, populate_fake_evla_cal_ingest_path logger = logging.getLogger(IngestionManifest.__name__) logger.setLevel(logging.INFO) @@ -21,60 +28,41 @@ logger.addHandler(logging.StreamHandler(sys.stdout)) def test_entry_point_for_evla_cal(ingest_path: Path): """ - Confirm that the ingestion launcher entrypoint functions as expected. + Confirm that the ingestion launcher entrypoint kicks off production of ingestion manifest :param ingest_path: fake tmp ingestion path :return: """ - populate_fake_ingest_path(ingest_path) - manifest = IngestionManifest( - str(ingest_path), ScienceProductType.EVLA_CAL.value, "uid://evla/calibration/meeniemyniemoe" + locator = "uid://evla/calibration/but-we-had-this-for-lunch-yesterday" + populate_fake_evla_cal_ingest_path(ingest_path) + sp_tar = [file for file in ingest_path.iterdir() if file.name.endswith(".tar")][0] + assert sp_tar + + builder = IngestionManifestBuilder( + telescope=Telescope.EVLA, + locator=locator, + sp_type=ScienceProductType.EVLA_CAL, + staging_source_dir=ingest_path, ) - manifest.create() - - files = [file for file in ingest_path.iterdir()] - manifest_file = [file for file in files if file.name.endswith(".json")][0] - assert manifest_file.exists() - - assert len(files) == len(WANTED_FILENAMES) + len(UNWANTED) + 2 - - # make sure manifest_file contains an IngestionManifest - with open(manifest_file, "r") as out: - manifest_content = dict(json.load(out).items()) - - for key in ["parameters", "input_group", "output_group", "ingestion_path"]: - assert key in manifest_content.keys() - - input_group = manifest_content[IngestionManifestKey.INPUT_GROUP.value] - assert isinstance(input_group, dict) - assert len(input_group) == 1 - - output_group = manifest_content[IngestionManifestKey.OUTPUT_GROUP.value] - assert isinstance(output_group, dict) - for key, val in output_group.items(): - val = val.replace("'", '"', len(val)) - sci_prods = json.loads(val) - - assert isinstance(sci_prods, list) - assert len(sci_prods) == 1 - - ancillary_products = output_group[IngestionManifestKey.ANCILLARY_PRODUCTS.value] - assert ancillary_products[0] == "[" - assert ancillary_products[-1] == "]" - assert "weblog" in ancillary_products - assert "type" in ancillary_products - assert "filename" in ancillary_products - - a_prods = json.loads(ancillary_products.replace("'", '"', len(ancillary_products))) - assert isinstance(a_prods, list) - a_prods = a_prods[0] - assert isinstance(a_prods, dict) - for key, val in a_prods.items(): - assert isinstance(val, dict) - a_prods = val - assert len(a_prods) == 2 - - shutil.rmtree(ingest_path) + builder.build() + ingestion_files = [file for file in ingest_path.iterdir()] + + # there should be one ingestion manifest.... + mf_jsons = [ + file + for file in ingestion_files + if file.name.startswith(MANIFEST_NAME_BASE) and file.name.endswith(MANIFEST_NAME_EXT) + ] + assert len(mf_jsons) == 1 + + # ...and an artifacts tar, and the science products tar we started with + assert sp_tar in ingestion_files + artifact_tars = [ + file + for file in ingestion_files + if file.name.startswith(ARTIFACT_NAME) and file.name.endswith(ARTIFACT_EXT) + ] + assert len(artifact_tars) == 1 @pytest.mark.skip("TODO: test_builds_realfast_sdm_manifest")