Skip to content
Snippets Groups Projects

WS-543: Addressed issues found in calibration ingestion testing

Merged Janet Goldstein requested to merge WS-543-fix-manifest-issues-2021-07-22 into main
Files
9
@@ -14,11 +14,9 @@ import pendulum
from pendulum import DateTime
from ingest_envoy.manifest_components import (
MANIFEST_NAME_BASE,
MANIFEST_NAME_EXT,
ARTIFACT_NAME,
ARTIFACT_EXT,
WEBLOG,
TARFILE_EXT,
WEBLOG_FILENAME,
JSON,
IngestionManifestKey,
ManifestComponentIF,
@@ -29,6 +27,7 @@ from ingest_envoy.manifest_components import (
AncillaryProduct,
OutputGroup,
SCIENCE_PRODUCT_PATTERN,
MANIFEST_FILENAME,
)
from ingest_envoy.utilities import (
ScienceProductType,
@@ -70,6 +69,15 @@ class ManifestIF(ManifestComponentIF):
self.files_found = [file for file in self.staging_source_dir.iterdir()]
@abc.abstractmethod
def create(self):
"""
Build and write the manifest, which includes gathering various items in
ingestion_path to get info for the manifest.
:return:
"""
@abc.abstractmethod
def write(self):
"""
@@ -103,13 +111,13 @@ class IngestionManifestBuilder:
def __init__(
self,
staging_source_dir: Path,
sp_type: str,
sp_type: ScienceProductType,
locator: str,
telescope: Telescope,
):
self.telescope = telescope
self.staging_source_dir = staging_source_dir
self.sp_type = ScienceProductType(sp_type)
self.sp_type = sp_type
self.locator = locator
self.files_found = [file for file in staging_source_dir.iterdir()]
if len(self.files_found) == 0:
@@ -152,7 +160,7 @@ class IngestionManifestBuilder:
# N.B. this is sufficient for most types of ingestion,
# but ALMA CALs will have multiple EB SPs, identified only by locator,
# and VLBAs have no input group at all.
sp_in = InputScienceProduct(sp_type=self.sp_type, locator=self.locator)
sp_in = InputScienceProduct(locator=self.locator)
return InputGroup([sp_in])
@@ -171,11 +179,12 @@ class IngestionManifestBuilder:
# find ancillary products, if any
ancillary_products = self._find_ancillary_products()
tar_filename = self.build_artifacts_filename()
artifacts_ap = AncillaryProduct(
type=AncillaryProductType.PIPELINE_ARTIFACTS, filename=tar_filename
)
ancillary_products.append(artifacts_ap)
# N.B. this is NOT done for EVLA CAL manifest, but keep code for future use
# tar_filename = self.build_artifacts_filename()
# artifacts_ap = AncillaryProduct(
# type=AncillaryProductType.PIPELINE_ARTIFACTS, filename=tar_filename
# )
# ancillary_products.append(artifacts_ap)
return OutputGroup(self._define_output_science_products(), ancillary_products)
@@ -188,7 +197,7 @@ class IngestionManifestBuilder:
"""
current_time = pendulum.now()
timestamp = format_timestamp(current_time)
return f"{ARTIFACT_NAME}{timestamp}{ARTIFACT_EXT}"
return f"{ARTIFACT_NAME}{timestamp}{TARFILE_EXT}"
def write_ingestion_artifacts_tar(self) -> Path:
"""
@@ -220,11 +229,11 @@ class IngestionManifestBuilder:
ancillary_products = []
# if there's a weblog in here, grab it
maybe_weblogs = [file for file in self.files_found if file.name.endswith(WEBLOG)]
maybe_weblogs = [file for file in self.files_found if file.name == WEBLOG_FILENAME]
if len(maybe_weblogs) > 0:
weblog = maybe_weblogs[0]
weblog_ap = AncillaryProduct(
type=AncillaryProductType.PIPELINE_WEBLOG, filename=weblog.name
type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=weblog.name
)
ancillary_products.append(weblog_ap)
@@ -251,7 +260,7 @@ class IngestionManifestBuilder:
class IngestionManifest(ManifestIF):
"""write ingestion manifest to file"""
"""needed for ingestion-launching interface"""
def build_ingest_parameters(self):
"""
@@ -270,7 +279,34 @@ class IngestionManifest(ManifestIF):
staging_source_dir=self.staging_source_dir,
)
# @property
def _build_input_group(self):
"""
Create the input group using the parameters.
:return:
"""
# N.B. this is sufficient for most types of ingestion,
# but ALMA CALs will have multiple EB SPs, identified only by locator,
# and VLBAs have no input group at all.
sp_in = InputScienceProduct(locator=self.locator)
return InputGroup([sp_in])
def _build_output_group(self) -> OutputGroup:
"""
Create the output group using the parameters.
:return:
"""
sp_tar = self._find_science_product_tar()
find_output_science_products(self.files_found, self.staging_source_dir)
sps_out = [OutputScienceProduct(self.sp_type, sp_tar.name)]
# find ancillary products, if any
ancillary_products = self._find_ancillary_products()
weblog = Path(self.ingestion_path / WEBLOG_FILENAME)
if weblog.exists():
ancillary_products.append(AncillaryProduct(type=WEBLOG_FILENAME, filename=str(weblog)))
return OutputGroup(sps_out)
def ingestion_path(self) -> Path:
return self.parameters.ingestion_path
@@ -281,14 +317,36 @@ class IngestionManifest(ManifestIF):
:return:
"""
output_path = self.staging_source_dir / build_manifest_filename()
me_dict = self.to_json()
output_path = self.staging_source_dir / MANIFEST_FILENAME
to_write = json.dumps(self.to_json(), indent=4)
to_write = json.dumps(me_dict, indent=4)
with open(output_path, "w") as out:
out.write(to_write)
return output_path
def create(self):
"""
Create the ingestion manifest in this directory for a product of this type,
identified by this locator.
:return:
"""
if self.sp_type != ScienceProductType.EVLA_CAL:
raise NotImplementedError(
f"Don't yet know how to handle {self.sp_type.value} ingestion"
)
builder = IngestionManifestBuilder(
staging_source_dir=Path(self.staging_source_dir),
sp_type=self.sp_type,
locator=self.locator,
telescope=self.telescope,
)
builder.build()
def to_json(self) -> JSON:
"""
Turn this object into a JSON string suitable for writing to a file
@@ -296,20 +354,20 @@ class IngestionManifest(ManifestIF):
:return:
"""
to_return = dict(self.__dict__)
me_dict = dict(self.__dict__)
return {
"locator": to_return["locator"],
to_return = {
IngestionManifestKey.PARAMETERS.value: self.build_ingest_parameters().to_json(),
IngestionManifestKey.INGESTION_PATH.value: str(self.ingestion_path),
IngestionManifestKey.INPUT_GROUP.value: to_return[
IngestionManifestKey.INPUT_GROUP.value: me_dict[
IngestionManifestKey.INPUT_GROUP.value
].to_json(),
IngestionManifestKey.OUTPUT_GROUP.value: to_return[
IngestionManifestKey.OUTPUT_GROUP.value: me_dict[
IngestionManifestKey.OUTPUT_GROUP.value
].to_json(),
}
return to_return
def _find_science_product_tar(self) -> Path:
"""
A calibration ingestion staging dir should have ONE science product tar; ignore any others
@@ -336,17 +394,6 @@ def format_timestamp(datetime: DateTime) -> str:
return datetime.format("YYYY_MM_DDThh_mm_ss.SSS")
def build_manifest_filename() -> str:
"""
Build unique manifest filename in standard format.
:return: the filename
"""
current_time = pendulum.now()
timestamp = format_timestamp(current_time)
return f"{MANIFEST_NAME_BASE}{timestamp}{MANIFEST_NAME_EXT}"
def find_manifest(ingestion_path: Path) -> Path:
"""
Find the ingestion manifest at this ingestion path.
@@ -354,8 +401,7 @@ def find_manifest(ingestion_path: Path) -> Path:
:param ingestion_path: home of ingestion files
:return:
"""
for file in ingestion_path.iterdir():
if file.name.startswith(MANIFEST_NAME_BASE) and file.name.endswith(MANIFEST_NAME_EXT):
return file
for json_file in ingestion_path.glob(MANIFEST_FILENAME):
return json_file
raise FileNotFoundError(f"No ingestion manifest found at {ingestion_path}")
Loading