Skip to content
Snippets Groups Projects

Adding ingest

Merged Daniel Lyons requested to merge adding-ingest into main
All threads resolved!
Files
7
@@ -2,7 +2,6 @@
import abc
import json
import logging
import re
import sys
import tarfile
from pathlib import Path
@@ -26,9 +25,10 @@ from ingest_envoy.manifest_components import (
OutputScienceProduct,
AncillaryProduct,
OutputGroup,
SCIENCE_PRODUCT_PATTERN,
MANIFEST_FILENAME,
ParamsKey,
)
from ingest_envoy.schema import AbstractTextFile
from ingest_envoy.utilities import (
ScienceProductType,
Telescope,
@@ -57,6 +57,8 @@ class ManifestIF(ManifestComponentIF):
input_group: InputGroup,
# every manifest has at least one output group
output_group: OutputGroup,
# image manifest has this:
additional_metadata: AbstractTextFile = None,
):
self.staging_source_dir = staging_source_dir
self.sp_type = sp_type
@@ -65,10 +67,39 @@ class ManifestIF(ManifestComponentIF):
self.output_group = output_group
self.telescope = telescope
self.parameters = self.build_ingest_parameters()
self.parameters = self.build_ingest_parameters(additional_metadata)
self.files_found = [file for file in self.staging_source_dir.iterdir()]
def build_ingest_parameters(self, additional_metadata: AbstractTextFile):
"""
Make the "parameters" section of the manifest
:return:
"""
if self.sp_type not in [ScienceProductType.EVLA_CAL, ScienceProductType.IMAGE]:
raise NotImplementedError()
if additional_metadata:
params = ManifestParameters(
telescope=self.telescope,
reingest=False,
ngas_ingest=False,
calibrate=False,
staging_source_dir=self.staging_source_dir,
additional_metadata=additional_metadata,
)
else:
params = ManifestParameters(
telescope=self.telescope,
reingest=False,
ngas_ingest=False,
calibrate=False,
staging_source_dir=self.staging_source_dir,
)
return params
@abc.abstractmethod
def write(self):
"""
@@ -82,7 +113,11 @@ class ManifestIF(ManifestComponentIF):
def __eq__(self, other):
if isinstance(other, IngestionManifest):
return other.input_group == self.input_group and other.output_group == self.output_group
return (
other.parameters == self.parameters
and other.input_group == self.input_group
and other.output_group == self.output_group
)
return False
@@ -102,14 +137,16 @@ class IngestionManifestBuilder:
def __init__(
self,
staging_source_dir: Path,
sp_type: str,
sp_type: ScienceProductType,
locator: str,
telescope: Telescope,
additional_metadata: AbstractTextFile = None,
):
self.telescope = telescope
self.staging_source_dir = staging_source_dir
self.sp_type = ScienceProductType(sp_type)
self.locator = locator
self.additional_metadata = additional_metadata
self.files_found = [file for file in staging_source_dir.iterdir()]
if len(self.files_found) == 0:
raise IngestionManifestException(f"No ingestion files found at {staging_source_dir}")
@@ -134,6 +171,8 @@ class IngestionManifestBuilder:
input_group=self._build_input_group(),
output_group=self._build_output_group(),
)
if self.additional_metadata:
manifest.parameters.additional_metadata = self.additional_metadata
manifest_file = manifest.write()
@@ -170,15 +209,34 @@ class IngestionManifestBuilder:
# find ancillary products, if any
ancillary_products = self._find_ancillary_products()
# N.B. this is NOT done for EVLA CAL manifest, but keep code for future use
# tar_filename = self.build_artifacts_filename()
# artifacts_ap = AncillaryProduct(
# type=AncillaryProductType.PIPELINE_ARTIFACTS, filename=tar_filename
# )
# ancillary_products.append(artifacts_ap)
return OutputGroup(self._define_output_science_products(), ancillary_products)
def _build_ancillary_product(self, file: Path) -> AncillaryProduct:
"""
If this file is required for ingestion manifest creation,
create an ancillary product from it.
:param file: file found in staging dir
:return: ancillary product represented by this file, if any
"""
if file.name == WEBLOG_FILENAME:
return AncillaryProduct(
type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=str(file)
)
if AncillaryProductType.PIPELINE_ARTIFACTS.value in file.name:
return AncillaryProduct(
type=AncillaryProductType.PIPELINE_ARTIFACTS, filename=str(file)
)
if AncillaryProductType.INGESTION_ARTIFACTS.value in file.name:
return AncillaryProduct(
type=AncillaryProductType.INGESTION_ARTIFACTS, filename=str(file)
)
# this is not an ancillary product
return None
@staticmethod
def build_artifacts_filename() -> str:
"""
@@ -219,6 +277,7 @@ class IngestionManifestBuilder:
"""
ancillary_products = []
# if there's a weblog in here, grab it
maybe_weblogs = [file for file in self.files_found if file.name == WEBLOG_FILENAME]
if len(maybe_weblogs) > 0:
@@ -228,9 +287,11 @@ class IngestionManifestBuilder:
)
ancillary_products.append(weblog_ap)
more_aps = self._find_additional_ingestion_files()
if len(more_aps) > 0:
ancillary_products.append(more_aps)
additional_files = self._find_additional_ingestion_files()
for file in additional_files:
maybe_ap = self._build_ancillary_product(file)
if maybe_ap and maybe_ap not in ancillary_products:
ancillary_products.append(maybe_ap)
return ancillary_products
@@ -245,6 +306,9 @@ class IngestionManifestBuilder:
# there won't be any others
return []
if self.sp_type == ScienceProductType.IMAGE:
return [file for file in self.staging_source_dir.iterdir()]
# TODO when the time comes: we'll have extra information for other ingestion types;
# see archive-metaproject
raise NotImplementedError
@@ -253,26 +317,8 @@ class IngestionManifestBuilder:
class IngestionManifest(ManifestIF):
"""write ingestion manifest to file"""
def build_ingest_parameters(self):
"""
Make the "parameters" section of the manifest
:return:
"""
if self.sp_type != ScienceProductType.EVLA_CAL:
raise NotImplementedError()
return ManifestParameters(
telescope=self.telescope,
reingest=False,
ngas_ingest=False,
calibrate=False,
staging_source_dir=self.staging_source_dir,
)
# @property
def ingestion_path(self) -> Path:
return self.parameters.ingestion_path
return self.parameters.staging_source_dir
def write(self) -> Path:
"""
@@ -300,7 +346,7 @@ class IngestionManifest(ManifestIF):
me_dict = dict(self.__dict__)
to_return = {
IngestionManifestKey.PARAMETERS.value: self.build_ingest_parameters().to_json(),
IngestionManifestKey.PARAMETERS.value: self.parameters.to_json(),
IngestionManifestKey.INPUT_GROUP.value: me_dict[
IngestionManifestKey.INPUT_GROUP.value
].to_json(),
@@ -308,22 +354,13 @@ class IngestionManifest(ManifestIF):
IngestionManifestKey.OUTPUT_GROUP.value
].to_json(),
}
if self.parameters.additional_metadata:
to_return[ParamsKey.ADDITIONAL_METADATA.value] = str(
self.parameters.additional_metadata
)
return to_return
def _find_science_product_tar(self) -> Path:
"""
A calibration ingestion staging dir should have ONE science product tar; ignore any others
:return:
"""
files = [file for file in self.staging_source_dir.iterdir() if file.is_file]
for file in files:
if re.match(SCIENCE_PRODUCT_PATTERN, file.name):
return file
raise FileNotFoundError(f"no science product found at {self.staging_source_dir}")
def format_timestamp(datetime: DateTime) -> str:
"""
Loading