diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py index dcb23d33766df6e61812e343787ef5da0ebbafd2..7fed83714c4efc7448507379a5dc9439073d2449 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py @@ -21,13 +21,15 @@ import logging import sys import tarfile from pathlib import Path + from pycapo import CapoConfig # pylint: disable=C0301, E0401, R0903, R1721 -from typing import Tuple, List +from typing import List, Tuple import arrow from ingest_envoy.manifest_components import ( + CURATOR_MANIFEST_FILENAME, INGESTION_ARTIFACTS_NAME, INIT_WEBLOG_FILENAME, JSON, @@ -43,20 +45,19 @@ from ingest_envoy.manifest_components import ( OutputGroup, OutputScienceProduct, ReingestGroup, - CURATOR_MANIFEST_FILENAME, ) from ingest_envoy.schema import AbstractTextFile from ingest_envoy.std_img_manifest_utils import ImageIngestionProductsFinder from ingest_envoy.std_obs_manifest_utils import ObservationIngestionProductsFinder from ingest_envoy.utilities import ( AncillaryProductType, + CuratorType, IngestionManifestException, NoScienceProductException, ScienceProductType, Telescope, find_output_tars, find_weblogs, - CuratorType, ) logger = logging.getLogger(__name__) @@ -189,8 +190,10 @@ class IngestionManifest(ManifestComponentIF): :return: """ me_dict = self.to_dict() - if self.reingest is not None: + if self.reingest is not None and self.reingest.targets is not None: output_path = Path.cwd() / CURATOR_MANIFEST_FILENAME + elif self.reingest is not None: + output_path = self.staging_source_dir / CURATOR_MANIFEST_FILENAME else: output_path = self.staging_source_dir / MANIFEST_FILENAME @@ -216,7 +219,7 @@ class IngestionManifest(ManifestComponentIF): if IngestionManifestKey.REINGEST.value in me_dict and me_dict[IngestionManifestKey.REINGEST.value] is not None: to_return[IngestionManifestKey.REINGEST.value] = me_dict[IngestionManifestKey.REINGEST.value].to_dict() - # curator manifests have no output groups + # partial-curation manifests have no output groups if ( IngestionManifestKey.OUTPUT_GROUP.value in me_dict and me_dict[IngestionManifestKey.OUTPUT_GROUP.value] is not None @@ -278,12 +281,16 @@ class IngestionManifestBuilder: raise NotImplementedError(f"Don't know yet how to build a {self.sp_type.value} manifest") if self.curation_type is not None: - if self.curation_type not in [CuratorType.PARTIAL]: + if self.curation_type not in [CuratorType.PARTIAL, CuratorType.FULL]: raise NotImplementedError( f"Don't know how to build a {self.curation_type.value} curation {self.sp_type} manifest" ) self.curation_source = Path(curate[1]) if curate and curate[1] else None self.target_list = curate[2] + if self.curation_type == CuratorType.FULL and self.target_list != None: + raise IngestionManifestException( + f"Expected targets=None for full curation, got targets={self.target_list}" + ) if locator is not None: # we are not running observation ingestion, use a locator @@ -306,18 +313,18 @@ class IngestionManifestBuilder: :return: the ingestion manifest and the file containing its JSON """ if self.curation_type == CuratorType.PARTIAL: - return self._build_curation_manifest() + return self._build_partial_curation_manifest() if self.sp_type == ScienceProductType.EVLA_CAL: return self._build_evla_cal_manifest() elif self.sp_type == ScienceProductType.EXEC_BLOCK: - return self._build_observation_manifest() + return self._build_observation_manifest(self.curation_type == CuratorType.FULL) return self._build_image_manifest() - def _build_curation_manifest(self) -> (IngestionManifest, Path): + def _build_partial_curation_manifest(self) -> (IngestionManifest, Path): """ - Build a manifest for curator + Build a manifest for partial curator Partial curation is simple, only parameters and reingest groups are needed :return: the manifest file object and path @@ -351,8 +358,7 @@ class IngestionManifestBuilder: # We can't create the ingestion artifacts tar quite yet, # because it will contain the yet-to-be-written manifest itself # (required for ingestion, evidently) - artifacts_filename = self._build_artifacts_filename() - artifacts_ap = AncillaryProduct(AncillaryProductType.INGESTION_ARTIFACTS, filename=artifacts_filename) + artifacts_ap = self._build_artifacts_product() if artifacts_ap not in manifest.output_group.ancillary_products: manifest.output_group.ancillary_products.append(artifacts_ap) @@ -363,35 +369,34 @@ class IngestionManifestBuilder: manifest.output_group.ancillary_products.append(weblog_ap) manifest_file = manifest.write() - artifacts_file = self.staging_source_dir / artifacts_filename - self.write_ingestion_artifacts_tar(artifacts_file) + self.write_ingestion_artifacts_tar(self.staging_source_dir / artifacts_ap.filename) return manifest, manifest_file - def _build_observation_manifest(self) -> (IngestionManifest, Path): + def _build_observation_manifest(self, is_full_curation: bool = False) -> tuple[IngestionManifest, Path]: + reingest_group = None + if is_full_curation: + reingest_group = self._build_reingest_group() # create the manifest manifest = IngestionManifest( telescope=self.telescope, locator=None, sp_type=self.sp_type, - staging_source_dir=self.staging_source_dir, - reingest_group=None, + staging_source_dir=self.source_dir, + reingest_group=reingest_group, input_group=InputGroup([]), output_group=self._build_observation_output_group(), filename=self.filename, ) - artifacts_filename = self._build_artifacts_filename() - artifacts_ap = AncillaryProduct(AncillaryProductType.INGESTION_ARTIFACTS, filename=artifacts_filename) - if artifacts_ap not in manifest.output_group.ancillary_products: - manifest.output_group.ancillary_products.append(artifacts_ap) - if not manifest.output_group.ancillary_products: manifest.output_group.ancillary_products = [] + artifacts_ap = self._build_artifacts_product() + if artifacts_ap not in manifest.output_group.ancillary_products: + manifest.output_group.ancillary_products.append(artifacts_ap) manifest_file = manifest.write() - artifacts_file = self.staging_source_dir / artifacts_filename - self.write_ingestion_artifacts_tar(artifacts_file) + self.write_ingestion_artifacts_tar(self.source_dir / artifacts_ap.filename) return manifest, manifest_file @@ -426,12 +431,11 @@ class IngestionManifestBuilder: output_group=self._build_imaging_output_group(), ) - artifacts_file = self.staging_source_dir / self._build_artifacts_filename() - artifacts_ap = AncillaryProduct(type=AncillaryProductType.INGESTION_ARTIFACTS, filename=artifacts_file.name) + artifacts_ap = self._build_artifacts_product() if artifacts_ap not in manifest.output_group.ancillary_products: manifest.output_group.ancillary_products.append(artifacts_ap) manifest_file = manifest.write() - self.write_ingestion_artifacts_tar(artifacts_file) + self.write_ingestion_artifacts_tar(self.staging_source_dir / artifacts_ap.filename) return manifest, manifest_file @@ -511,7 +515,7 @@ class IngestionManifestBuilder: :return: """ - products_finder = ObservationIngestionProductsFinder(self.staging_source_dir, self.sp_type) + products_finder = ObservationIngestionProductsFinder(self.source_dir, self.sp_type) science_products = products_finder.output_science_products ancillary_products = products_finder.ancillary_products @@ -528,20 +532,27 @@ class IngestionManifestBuilder: timestamp = format_timestamp(current_time) return f"{INGESTION_ARTIFACTS_NAME}{timestamp}{TARFILE_EXT}" + @staticmethod + def _build_artifacts_product() -> AncillaryProduct: + return AncillaryProduct( + AncillaryProductType.INGESTION_ARTIFACTS, IngestionManifestBuilder._build_artifacts_filename() + ) + def write_ingestion_artifacts_tar(self, artifacts_path: Path) -> tarfile.TarFile: """ - Take the list of files and build a tar for inclusion into the archive. + Take the list of files and write a tar file for inclusion in the archive. This happens in the staging area for ingestion. The EVLA CAL tar will contain just the manifest. - :return: a .tar archive of the ingestion artifacts + :param artifacts_path: Path to create the resulting tar file at + :return: tar file of the ingestion artifacts """ addl_md_file = None if self.additional_metadata: # find the additional metadata addl_md_filename = self.additional_metadata.filename - addl_md_file = self.staging_source_dir / addl_md_filename + addl_md_file = self.source_dir / addl_md_filename with tarfile.open(artifacts_path, "w") as ingestion_artifacts_tar: if addl_md_file: @@ -549,11 +560,26 @@ class IngestionManifestBuilder: # The manifest file itself is considered an ingestion artifact. # (It's turtles all the way down.) - manifest_file = self.staging_source_dir / MANIFEST_FILENAME + manifest_file = self.source_dir / MANIFEST_FILENAME + if not manifest_file.exists(): + manifest_file = self.source_dir / CURATOR_MANIFEST_FILENAME + if not manifest_file.exists(): + raise FileNotFoundError( + f"No manifest (i.e. {MANIFEST_FILENAME} or {CURATOR_MANIFEST_FILENAME}) found in {self.source_dir}" + ) ingestion_artifacts_tar.add(manifest_file) return ingestion_artifacts_tar + @property + def source_dir(self) -> Path: + source_dir = self.staging_source_dir if self.staging_source_dir is not None else self.curation_source + if source_dir is None: + raise IngestionManifestException( + "IngestionManifestBuilder needs a value for either staging_source_dir or curation_source, found neither" + ) + return source_dir + def format_timestamp(datetime: arrow.Arrow) -> str: """ diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/std_obs_manifest_utils.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/std_obs_manifest_utils.py index 6d835d337c20cfa7bd98e42e954a37394ecd4e66..018a57cac0837f53a7b6a773026c9468af34dfcc 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/std_obs_manifest_utils.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/std_obs_manifest_utils.py @@ -44,6 +44,7 @@ PB = "pb" MASK = "mask" ALPHA = "alpha" PBCOR = "pbcor" +BIN = "bin" # pylint: disable=R1721 @@ -68,20 +69,21 @@ class ObservationIngestionProductsFinder: :return: """ - # Currently we only support ingestion of GMVA/VLBI observations. If there - # is not yet implemented - # is an SDM file present we know it's a different type of observation which - for file in self.files_found: - if file.name.endswith(XML): - self.logger.error("Non-VLBA/GMVA observation ingestion is not currently implemented!") - sys.exit(1) - - # Ingest all *fits files as science products - fits_files = [file for file in self.files_found if file.name.endswith(IDIFITS)] - self.logger.info(f"Science Product(s) to ingest: {fits_files}") + # Ingest all *fits files, or directories that contain XML files (assumed to be SDMs), as science products + fits_files_and_sdm_dirs = [ + file + for file in self.files_found + if file.name.endswith(IDIFITS) or + # SDM check + ( + file.is_dir() + and all(inner_file.name.endswith(XML) or inner_file.name.endswith(BIN) for inner_file in file.iterdir()) + ) + ] + self.logger.info(f"Science Product(s) to ingest: {fits_files_and_sdm_dirs}") science_products = [] - for file in fits_files: + for file in fits_files_and_sdm_dirs: # Add all science products and their ancillaries science_products.append( OutputScienceProduct( diff --git a/apps/cli/executables/pexable/ingest_envoy/test/test_curator_manifest.py b/apps/cli/executables/pexable/ingest_envoy/test/test_curator_manifest.py index 7f67ad231eb42027935b4176bdc3053a4384f797..8c06e164928ca42e4c8fa163ca609714b4551dc9 100644 --- a/apps/cli/executables/pexable/ingest_envoy/test/test_curator_manifest.py +++ b/apps/cli/executables/pexable/ingest_envoy/test/test_curator_manifest.py @@ -66,7 +66,8 @@ def test_manifest_full_curation_eb_manifest_metadata(ingest_path: Path): assert params.staging_source_dir == ingest_path assert not hasattr(params, "calibrate") - assert not actual_manifest.input_group + assert actual_manifest.input_group + assert not actual_manifest.input_group.science_products output_group = actual_manifest.output_group assert output_group @@ -113,7 +114,8 @@ def test_manifest_full_curation_eb_manifest_file(ingest_path: Path): expected_params = expected_manifest["parameters"] actual_params = actual_manifest_deser["parameters"] assert actual_params["ngas_ingest"] == expected_params["ngas_ingest"] - assert not actual_manifest_deser.get("input_group") and not expected_manifest.get("input_group") + assert actual_manifest_deser["reingest"] == expected_manifest["reingest"] + assert actual_manifest_deser.get("input_group") == expected_manifest.get("input_group") expected_outgroup = expected_manifest["output_group"] expected_osp = expected_outgroup["science_products"]