Skip to content
Snippets Groups Projects

WS-543: Addressed issues found in calibration ingestion testing

Merged Janet Goldstein requested to merge WS-543-fix-manifest-issues-2021-07-22 into main
Files
11
@@ -14,11 +14,9 @@ import pendulum
from pendulum import DateTime
from ingest_envoy.manifest_components import (
MANIFEST_NAME_BASE,
MANIFEST_NAME_EXT,
ARTIFACT_NAME,
ARTIFACT_EXT,
WEBLOG,
TARFILE_EXT,
WEBLOG_FILENAME,
JSON,
IngestionManifestKey,
ManifestComponentIF,
@@ -29,6 +27,7 @@ from ingest_envoy.manifest_components import (
AncillaryProduct,
OutputGroup,
SCIENCE_PRODUCT_PATTERN,
MANIFEST_FILENAME,
)
from ingest_envoy.utilities import (
ScienceProductType,
@@ -152,7 +151,7 @@ class IngestionManifestBuilder:
# N.B. this is sufficient for most types of ingestion,
# but ALMA CALs will have multiple EB SPs, identified only by locator,
# and VLBAs have no input group at all.
sp_in = InputScienceProduct(sp_type=self.sp_type, locator=self.locator)
sp_in = InputScienceProduct(locator=self.locator)
return InputGroup([sp_in])
@@ -171,11 +170,12 @@ class IngestionManifestBuilder:
# find ancillary products, if any
ancillary_products = self._find_ancillary_products()
tar_filename = self.build_artifacts_filename()
artifacts_ap = AncillaryProduct(
type=AncillaryProductType.PIPELINE_ARTIFACTS, filename=tar_filename
)
ancillary_products.append(artifacts_ap)
# N.B. this is NOT done for EVLA CAL manifest, but keep code for future use
# tar_filename = self.build_artifacts_filename()
# artifacts_ap = AncillaryProduct(
# type=AncillaryProductType.PIPELINE_ARTIFACTS, filename=tar_filename
# )
# ancillary_products.append(artifacts_ap)
return OutputGroup(self._define_output_science_products(), ancillary_products)
@@ -188,7 +188,7 @@ class IngestionManifestBuilder:
"""
current_time = pendulum.now()
timestamp = format_timestamp(current_time)
return f"{ARTIFACT_NAME}{timestamp}{ARTIFACT_EXT}"
return f"{ARTIFACT_NAME}{timestamp}{TARFILE_EXT}"
def write_ingestion_artifacts_tar(self) -> Path:
"""
@@ -220,11 +220,11 @@ class IngestionManifestBuilder:
ancillary_products = []
# if there's a weblog in here, grab it
maybe_weblogs = [file for file in self.files_found if file.name.endswith(WEBLOG)]
maybe_weblogs = [file for file in self.files_found if file.name == WEBLOG_FILENAME]
if len(maybe_weblogs) > 0:
weblog = maybe_weblogs[0]
weblog_ap = AncillaryProduct(
type=AncillaryProductType.PIPELINE_WEBLOG, filename=weblog.name
type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=weblog.name
)
ancillary_products.append(weblog_ap)
@@ -281,9 +281,10 @@ class IngestionManifest(ManifestIF):
:return:
"""
output_path = self.staging_source_dir / build_manifest_filename()
me_dict = self.to_json()
output_path = self.staging_source_dir / MANIFEST_FILENAME
to_write = json.dumps(self.to_json(), indent=4)
to_write = json.dumps(me_dict, indent=4)
with open(output_path, "w") as out:
out.write(to_write)
@@ -296,20 +297,20 @@ class IngestionManifest(ManifestIF):
:return:
"""
to_return = dict(self.__dict__)
me_dict = dict(self.__dict__)
return {
"locator": to_return["locator"],
to_return = {
IngestionManifestKey.PARAMETERS.value: self.build_ingest_parameters().to_json(),
IngestionManifestKey.INGESTION_PATH.value: str(self.ingestion_path),
IngestionManifestKey.INPUT_GROUP.value: to_return[
IngestionManifestKey.INPUT_GROUP.value: me_dict[
IngestionManifestKey.INPUT_GROUP.value
].to_json(),
IngestionManifestKey.OUTPUT_GROUP.value: to_return[
IngestionManifestKey.OUTPUT_GROUP.value: me_dict[
IngestionManifestKey.OUTPUT_GROUP.value
].to_json(),
}
return to_return
def _find_science_product_tar(self) -> Path:
"""
A calibration ingestion staging dir should have ONE science product tar; ignore any others
@@ -336,17 +337,6 @@ def format_timestamp(datetime: DateTime) -> str:
return datetime.format("YYYY_MM_DDThh_mm_ss.SSS")
def build_manifest_filename() -> str:
"""
Build unique manifest filename in standard format.
:return: the filename
"""
current_time = pendulum.now()
timestamp = format_timestamp(current_time)
return f"{MANIFEST_NAME_BASE}{timestamp}{MANIFEST_NAME_EXT}"
def find_manifest(ingestion_path: Path) -> Path:
"""
Find the ingestion manifest at this ingestion path.
@@ -354,8 +344,7 @@ def find_manifest(ingestion_path: Path) -> Path:
:param ingestion_path: home of ingestion files
:return:
"""
for file in ingestion_path.iterdir():
if file.name.startswith(MANIFEST_NAME_BASE) and file.name.endswith(MANIFEST_NAME_EXT):
return file
for json_file in ingestion_path.glob(MANIFEST_FILENAME):
return json_file
raise FileNotFoundError(f"No ingestion manifest found at {ingestion_path}")
Loading