Skip to content
Snippets Groups Projects

Implemented manifest generation for EB full curation in ingest_envoy

Merged Sam Kagan requested to merge teach-ingest_envoy-full-curation into 2.8.2.1-DEVELOMENT
5 unresolved threads
Files
7
@@ -25,7 +25,7 @@ from pathlib import Path
from pycapo import CapoConfig
# pylint: disable=C0301, E0401, R0903, R1721
from typing import List, Tuple
from typing import List, Optional, Tuple
import arrow
from ingest_envoy.manifest_components import (
@@ -85,6 +85,7 @@ class IngestionManifest(ManifestComponentIF):
# image manifest has this:
additional_metadata: AbstractTextFile = None,
filename: str = None,
destination_dir: Path | None = None,
):
self.staging_source_dir = staging_source_dir
self.sp_type = sp_type
@@ -94,6 +95,9 @@ class IngestionManifest(ManifestComponentIF):
self.output_group = output_group
self.telescope = telescope
self.filename = filename
self.destination_dir = destination_dir
if destination_dir is None:
self.destination_dir = self.staging_source_dir
# Check if NGAS ingestion should be enabled for all manifests in this environment
self.ngas_ingest = self.get_ngas_flag()
@@ -190,12 +194,10 @@ class IngestionManifest(ManifestComponentIF):
:return:
"""
me_dict = self.to_dict()
if self.reingest is not None and self.reingest.targets is not None:
output_path = Path.cwd() / CURATOR_MANIFEST_FILENAME
elif self.reingest is not None:
output_path = self.staging_source_dir / CURATOR_MANIFEST_FILENAME
if self.reingest is not None:
output_path = self.destination_dir / CURATOR_MANIFEST_FILENAME
else:
output_path = self.staging_source_dir / MANIFEST_FILENAME
output_path = self.destination_dir / MANIFEST_FILENAME
to_write = json.dumps(me_dict, indent=4)
with open(output_path, "w") as out:
@@ -253,6 +255,8 @@ class IngestionManifestBuilder:
additional_metadata: AbstractTextFile = None,
filename: str = None,
curate: (CuratorType, str, List[str]) = None,
# Curation doesn't need a curation_source, but it does need a place to stick the manifest
manifest_destination_dir: Optional[Path] = None,
):
# get the telescope
self.telescope = Telescope(telescope)
@@ -287,7 +291,7 @@ class IngestionManifestBuilder:
)
self.curation_source = Path(curate[1]) if curate and curate[1] else None
self.target_list = curate[2]
if self.curation_type == CuratorType.FULL and self.target_list != None:
if self.curation_type == CuratorType.FULL and not self.target_list:
raise IngestionManifestException(
f"Expected targets=None for full curation, got targets={self.target_list}"
)
@@ -296,6 +300,15 @@ class IngestionManifestBuilder:
# we are not running observation ingestion, use a locator
self.locator = locator
# directory to put the manifest in
self.manifest_destination_dir = manifest_destination_dir
if self.manifest_destination_dir is None:
self.manifest_destination_dir = self.staging_source_dir
if self.manifest_destination_dir is None:
raise IngestionManifestException(
f"IngestionManifestBuilder: Expected a directory to house the manifest, found instead staging_source_dir={self.staging_source_dir}, manifest_destination_dir={self.manifest_destination_dir}"
)
if staging_source_dir is not None:
# we are not running curation, look for files
self.files_found = [file for file in staging_source_dir.iterdir()]
@@ -314,13 +327,11 @@ class IngestionManifestBuilder:
"""
if self.curation_type == CuratorType.PARTIAL:
return self._build_partial_curation_manifest()
elif self.curation_type == CuratorType.FULL:
return self._build_full_curation_manifest()
if self.sp_type == ScienceProductType.EVLA_CAL:
return self._build_evla_cal_manifest()
elif self.sp_type == ScienceProductType.EXEC_BLOCK:
return self._build_observation_manifest()
return self._build_observation_manifest(self.curation_type == CuratorType.FULL)
return self._build_image_manifest()
@@ -339,38 +350,13 @@ class IngestionManifestBuilder:
reingest_group=self._build_reingest_group(),
input_group=None,
output_group=None,
destination_dir=self.manifest_destination_dir,
)
manifest_file = manifest.write()
return manifest, manifest_file
def _build_full_curation_manifest(self) -> (IngestionManifest, Path):
"""
Build a manifest for full curation
:return: the manifest file object and path
"""
manifest = IngestionManifest(
telescope=self.telescope,
staging_source_dir=self.curation_source,
locator=self.locator,
sp_type=self.sp_type,
reingest_group=self._build_reingest_group(),
input_group=None,
output_group=self._build_observation_output_group(),
)
if not manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products = []
artifacts_ap = self._build_artifacts_product()
manifest.output_group.ancillary_products.append(artifacts_ap)
manifest_file = manifest.write()
self.write_ingestion_artifacts_tar(self.curation_source / artifacts_ap.filename)
return manifest, manifest_file
def _build_evla_cal_manifest(self) -> (IngestionManifest, Path):
# create the manifest
manifest = IngestionManifest(
@@ -381,6 +367,7 @@ class IngestionManifestBuilder:
reingest_group=None,
input_group=self._build_input_group(),
output_group=self._build_evla_cal_output_group(),
destination_dir=self.manifest_destination_dir,
)
# We can't create the ingestion artifacts tar quite yet,
@@ -397,32 +384,35 @@ class IngestionManifestBuilder:
manifest.output_group.ancillary_products.append(weblog_ap)
manifest_file = manifest.write()
self.write_ingestion_artifacts_tar(self.staging_source_dir / artifacts_ap.filename)
self.write_ingestion_artifacts_tar(self.manifest_destination_dir / artifacts_ap.filename)
return manifest, manifest_file
def _build_observation_manifest(self) -> (IngestionManifest, Path):
def _build_observation_manifest(self, is_full_curation: bool = False) -> tuple[IngestionManifest, Path]:
reingest_group = None
if is_full_curation:
reingest_group = self._build_reingest_group()
# create the manifest
manifest = IngestionManifest(
telescope=self.telescope,
locator=None,
sp_type=self.sp_type,
staging_source_dir=self.staging_source_dir,
reingest_group=None,
staging_source_dir=self.manifest_source_dir,
reingest_group=reingest_group,
input_group=InputGroup([]),
output_group=self._build_observation_output_group(),
filename=self.filename,
destination_dir=self.manifest_destination_dir,
)
if not manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products = []
artifacts_ap = self._build_artifacts_product()
if artifacts_ap not in manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products.append(artifacts_ap)
if not manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products = []
manifest_file = manifest.write()
self.write_ingestion_artifacts_tar(self.staging_source_dir / artifacts_ap.filename)
self.write_ingestion_artifacts_tar(self.manifest_destination_dir / artifacts_ap.filename)
return manifest, manifest_file
@@ -455,13 +445,14 @@ class IngestionManifestBuilder:
reingest_group=None,
input_group=self._build_input_group(),
output_group=self._build_imaging_output_group(),
destination_dir=self.manifest_destination_dir,
)
artifacts_ap = self._build_artifacts_product()
if artifacts_ap not in manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products.append(artifacts_ap)
manifest_file = manifest.write()
self.write_ingestion_artifacts_tar(self.staging_source_dir / artifacts_ap.filename)
self.write_ingestion_artifacts_tar(self.manifest_destination_dir / artifacts_ap.filename)
return manifest, manifest_file
@@ -541,7 +532,7 @@ class IngestionManifestBuilder:
:return:
"""
products_finder = ObservationIngestionProductsFinder(self.source_dir, self.sp_type)
products_finder = ObservationIngestionProductsFinder(self.manifest_source_dir, self.sp_type)
science_products = products_finder.output_science_products
ancillary_products = products_finder.ancillary_products
@@ -578,7 +569,7 @@ class IngestionManifestBuilder:
if self.additional_metadata:
# find the additional metadata
addl_md_filename = self.additional_metadata.filename
addl_md_file = self.source_dir / addl_md_filename
addl_md_file = self.manifest_source_dir / addl_md_filename
with tarfile.open(artifacts_path, "w") as ingestion_artifacts_tar:
if addl_md_file:
@@ -586,25 +577,20 @@ class IngestionManifestBuilder:
# The manifest file itself is considered an ingestion artifact.
# (It's turtles all the way down.)
manifest_file = self.source_dir / MANIFEST_FILENAME
manifest_file = self.manifest_destination_dir / MANIFEST_FILENAME
if not manifest_file.exists():
manifest_file = self.source_dir / CURATOR_MANIFEST_FILENAME
manifest_file = self.manifest_destination_dir / CURATOR_MANIFEST_FILENAME
if not manifest_file.exists():
raise FileNotFoundError(
f"No manifest (i.e. {MANIFEST_FILENAME} or {CURATOR_MANIFEST_FILENAME}) found in {self.source_dir}"
f"No manifest (i.e. {MANIFEST_FILENAME} or {CURATOR_MANIFEST_FILENAME}) found in {self.manifest_destination_dir}"
)
ingestion_artifacts_tar.add(manifest_file)
return ingestion_artifacts_tar
@property
def source_dir(self) -> Path:
source_dir = self.staging_source_dir if self.staging_source_dir is not None else self.curation_source
if source_dir is None:
raise IngestionManifestException(
"IngestionManifestBuilder needs a value for either staging_source_dir or curation_source, found neither"
)
return source_dir
def manifest_source_dir(self) -> Path | None:
return self.staging_source_dir if self.staging_source_dir is not None else self.curation_source
def format_timestamp(datetime: arrow.Arrow) -> str:
Loading