Skip to content
Snippets Groups Projects

Implemented manifest generation for EB full curation in ingest_envoy

Merged Sam Kagan requested to merge teach-ingest_envoy-full-curation into 2.8.2.1-DEVELOMENT
Files
5
@@ -21,13 +21,15 @@ import logging
import sys
import tarfile
from pathlib import Path
from pycapo import CapoConfig
# pylint: disable=C0301, E0401, R0903, R1721
from typing import Tuple, List
from typing import List, Tuple
import arrow
from ingest_envoy.manifest_components import (
CURATOR_MANIFEST_FILENAME,
INGESTION_ARTIFACTS_NAME,
INIT_WEBLOG_FILENAME,
JSON,
@@ -43,20 +45,19 @@ from ingest_envoy.manifest_components import (
OutputGroup,
OutputScienceProduct,
ReingestGroup,
CURATOR_MANIFEST_FILENAME,
)
from ingest_envoy.schema import AbstractTextFile
from ingest_envoy.std_img_manifest_utils import ImageIngestionProductsFinder
from ingest_envoy.std_obs_manifest_utils import ObservationIngestionProductsFinder
from ingest_envoy.utilities import (
AncillaryProductType,
CuratorType,
IngestionManifestException,
NoScienceProductException,
ScienceProductType,
Telescope,
find_output_tars,
find_weblogs,
CuratorType,
)
logger = logging.getLogger(__name__)
@@ -189,8 +190,10 @@ class IngestionManifest(ManifestComponentIF):
:return:
"""
me_dict = self.to_dict()
if self.reingest is not None:
if self.reingest is not None and self.reingest.targets is not None:
output_path = Path.cwd() / CURATOR_MANIFEST_FILENAME
elif self.reingest is not None:
output_path = self.staging_source_dir / CURATOR_MANIFEST_FILENAME
else:
output_path = self.staging_source_dir / MANIFEST_FILENAME
@@ -216,7 +219,7 @@ class IngestionManifest(ManifestComponentIF):
if IngestionManifestKey.REINGEST.value in me_dict and me_dict[IngestionManifestKey.REINGEST.value] is not None:
to_return[IngestionManifestKey.REINGEST.value] = me_dict[IngestionManifestKey.REINGEST.value].to_dict()
# curator manifests have no output groups
# partial-curation manifests have no output groups
if (
IngestionManifestKey.OUTPUT_GROUP.value in me_dict
and me_dict[IngestionManifestKey.OUTPUT_GROUP.value] is not None
@@ -278,12 +281,16 @@ class IngestionManifestBuilder:
raise NotImplementedError(f"Don't know yet how to build a {self.sp_type.value} manifest")
if self.curation_type is not None:
if self.curation_type not in [CuratorType.PARTIAL]:
if self.curation_type not in [CuratorType.PARTIAL, CuratorType.FULL]:
raise NotImplementedError(
f"Don't know how to build a {self.curation_type.value} curation {self.sp_type} manifest"
)
self.curation_source = Path(curate[1]) if curate and curate[1] else None
self.target_list = curate[2]
if self.curation_type == CuratorType.FULL and self.target_list != None:
raise IngestionManifestException(
f"Expected targets=None for full curation, got targets={self.target_list}"
)
if locator is not None:
# we are not running observation ingestion, use a locator
@@ -306,7 +313,9 @@ class IngestionManifestBuilder:
:return: the ingestion manifest and the file containing its JSON
"""
if self.curation_type == CuratorType.PARTIAL:
return self._build_curation_manifest()
return self._build_partial_curation_manifest()
elif self.curation_type == CuratorType.FULL:
return self._build_full_curation_manifest()
if self.sp_type == ScienceProductType.EVLA_CAL:
return self._build_evla_cal_manifest()
@@ -315,9 +324,9 @@ class IngestionManifestBuilder:
return self._build_image_manifest()
def _build_curation_manifest(self) -> (IngestionManifest, Path):
def _build_partial_curation_manifest(self) -> (IngestionManifest, Path):
"""
Build a manifest for curator
Build a manifest for partial curator
Partial curation is simple, only parameters and reingest groups are needed
:return: the manifest file object and path
@@ -336,6 +345,32 @@ class IngestionManifestBuilder:
return manifest, manifest_file
def _build_full_curation_manifest(self) -> (IngestionManifest, Path):
"""
Build a manifest for full curation
:return: the manifest file object and path
"""
manifest = IngestionManifest(
telescope=self.telescope,
staging_source_dir=self.curation_source,
locator=self.locator,
sp_type=self.sp_type,
reingest_group=self._build_reingest_group(),
input_group=None,
output_group=self._build_observation_output_group(),
)
if not manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products = []
artifacts_ap = self._build_artifacts_product()
manifest.output_group.ancillary_products.append(artifacts_ap)
manifest_file = manifest.write()
self.write_ingestion_artifacts_tar(self.curation_source / artifacts_ap.filename)
return manifest, manifest_file
def _build_evla_cal_manifest(self) -> (IngestionManifest, Path):
# create the manifest
manifest = IngestionManifest(
@@ -351,8 +386,7 @@ class IngestionManifestBuilder:
# We can't create the ingestion artifacts tar quite yet,
# because it will contain the yet-to-be-written manifest itself
# (required for ingestion, evidently)
artifacts_filename = self._build_artifacts_filename()
artifacts_ap = AncillaryProduct(AncillaryProductType.INGESTION_ARTIFACTS, filename=artifacts_filename)
artifacts_ap = self._build_artifacts_product()
if artifacts_ap not in manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products.append(artifacts_ap)
@@ -363,8 +397,7 @@ class IngestionManifestBuilder:
manifest.output_group.ancillary_products.append(weblog_ap)
manifest_file = manifest.write()
artifacts_file = self.staging_source_dir / artifacts_filename
self.write_ingestion_artifacts_tar(artifacts_file)
self.write_ingestion_artifacts_tar(self.staging_source_dir / artifacts_ap.filename)
return manifest, manifest_file
@@ -381,8 +414,7 @@ class IngestionManifestBuilder:
filename=self.filename,
)
artifacts_filename = self._build_artifacts_filename()
artifacts_ap = AncillaryProduct(AncillaryProductType.INGESTION_ARTIFACTS, filename=artifacts_filename)
artifacts_ap = self._build_artifacts_product()
if artifacts_ap not in manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products.append(artifacts_ap)
@@ -390,8 +422,7 @@ class IngestionManifestBuilder:
manifest.output_group.ancillary_products = []
manifest_file = manifest.write()
artifacts_file = self.staging_source_dir / artifacts_filename
self.write_ingestion_artifacts_tar(artifacts_file)
self.write_ingestion_artifacts_tar(self.staging_source_dir / artifacts_ap.filename)
return manifest, manifest_file
@@ -426,12 +457,11 @@ class IngestionManifestBuilder:
output_group=self._build_imaging_output_group(),
)
artifacts_file = self.staging_source_dir / self._build_artifacts_filename()
artifacts_ap = AncillaryProduct(type=AncillaryProductType.INGESTION_ARTIFACTS, filename=artifacts_file.name)
artifacts_ap = self._build_artifacts_product()
if artifacts_ap not in manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products.append(artifacts_ap)
manifest_file = manifest.write()
self.write_ingestion_artifacts_tar(artifacts_file)
self.write_ingestion_artifacts_tar(self.staging_source_dir / artifacts_ap.filename)
return manifest, manifest_file
@@ -511,7 +541,7 @@ class IngestionManifestBuilder:
:return:
"""
products_finder = ObservationIngestionProductsFinder(self.staging_source_dir, self.sp_type)
products_finder = ObservationIngestionProductsFinder(self.source_dir, self.sp_type)
science_products = products_finder.output_science_products
ancillary_products = products_finder.ancillary_products
@@ -528,20 +558,27 @@ class IngestionManifestBuilder:
timestamp = format_timestamp(current_time)
return f"{INGESTION_ARTIFACTS_NAME}{timestamp}{TARFILE_EXT}"
@staticmethod
def _build_artifacts_product() -> AncillaryProduct:
return AncillaryProduct(
AncillaryProductType.INGESTION_ARTIFACTS, IngestionManifestBuilder._build_artifacts_filename()
)
def write_ingestion_artifacts_tar(self, artifacts_path: Path) -> tarfile.TarFile:
"""
Take the list of files and build a tar for inclusion into the archive.
Take the list of files and write a tar file for inclusion in the archive.
This happens in the staging area for ingestion.
The EVLA CAL tar will contain just the manifest.
:return: a .tar archive of the ingestion artifacts
:param artifacts_path: Path to create the resulting tar file at
:return: tar file of the ingestion artifacts
"""
addl_md_file = None
if self.additional_metadata:
# find the additional metadata
addl_md_filename = self.additional_metadata.filename
addl_md_file = self.staging_source_dir / addl_md_filename
addl_md_file = self.source_dir / addl_md_filename
with tarfile.open(artifacts_path, "w") as ingestion_artifacts_tar:
if addl_md_file:
@@ -549,11 +586,26 @@ class IngestionManifestBuilder:
# The manifest file itself is considered an ingestion artifact.
# (It's turtles all the way down.)
manifest_file = self.staging_source_dir / MANIFEST_FILENAME
manifest_file = self.source_dir / MANIFEST_FILENAME
if not manifest_file.exists():
manifest_file = self.source_dir / CURATOR_MANIFEST_FILENAME
if not manifest_file.exists():
raise FileNotFoundError(
f"No manifest (i.e. {MANIFEST_FILENAME} or {CURATOR_MANIFEST_FILENAME}) found in {self.source_dir}"
)
ingestion_artifacts_tar.add(manifest_file)
return ingestion_artifacts_tar
@property
def source_dir(self) -> Path:
source_dir = self.staging_source_dir if self.staging_source_dir is not None else self.curation_source
if source_dir is None:
raise IngestionManifestException(
"IngestionManifestBuilder needs a value for either staging_source_dir or curation_source, found neither"
)
return source_dir
def format_timestamp(datetime: arrow.Arrow) -> str:
"""
Loading