Skip to content
Snippets Groups Projects

WS-600: implementing tests for image ingestion manifest output group

Merged Janet Goldstein requested to merge WS-600-image-ingestion-output-group-tests into main
2 unresolved threads
7 files
+ 431
85
Compare changes
  • Side-by-side
  • Inline
Files
7
@@ -13,7 +13,7 @@ import pendulum
from pendulum import DateTime
from ingest_envoy.manifest_components import (
ARTIFACT_NAME,
INGESTION_ARTIFACTS_NAME,
TARFILE_EXT,
WEBLOG_FILENAME,
JSON,
@@ -34,8 +34,9 @@ from ingest_envoy.utilities import (
Telescope,
IngestionManifestException,
AncillaryProductType,
find_output_science_products,
find_output_tars,
)
from ingest_envoy.std_img_manifest_utils import ImageIngestionProductsFinder
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
@@ -144,12 +145,19 @@ class IngestionManifestBuilder:
):
self.telescope = telescope
self.staging_source_dir = staging_source_dir
if sp_type not in [ScienceProductType.EVLA_CAL, ScienceProductType.IMAGE]:
raise NotImplementedError(f"Don't know yet how to build a {sp_type.value} manifest")
self.sp_type = ScienceProductType(sp_type)
self.locator = locator
# (for EVLA CAL, this will be None)
self.additional_metadata = additional_metadata
self.files_found = [file for file in staging_source_dir.iterdir()]
if len(self.files_found) == 0:
raise IngestionManifestException(f"No ingestion files found at {staging_source_dir}")
# self.files_found = [file for file in staging_source_dir.iterdir()]
# if len(self.files_found) == 0:
# raise IngestionManifestException(f"No ingestion files found at {staging_source_dir}")
def build(self) -> Tuple[ManifestIF, Path]:
"""
@@ -162,6 +170,11 @@ class IngestionManifestBuilder:
# # create any other ingestion files needed for this type of ingestion
# self._find_additional_ingestion_files()
if self.sp_type == ScienceProductType.EVLA_CAL:
return self._build_evla_cal_manifest()
return self._build_image_manifest()
# create the manifest
manifest = IngestionManifest(
telescope=self.telescope,
@@ -180,6 +193,28 @@ class IngestionManifestBuilder:
return manifest, manifest_file
def _build_evla_cal_manifest(self):
# create the manifest
manifest = IngestionManifest(
telescope=self.telescope,
locator=self.locator,
sp_type=self.sp_type,
staging_source_dir=self.staging_source_dir,
input_group=self._build_input_group(),
output_group=self._build_output_group(),
)
manifest_file = manifest.write()
self.write_ingestion_artifacts_tar()
return manifest, manifest_file
def _build_image_manifest(self):
"""
Image manifest has additional_metadata, and output group is way more complicated
:return:
"""
# TODO:
def _build_input_group(self):
"""
Create the input group using the parameters.
@@ -194,48 +229,129 @@ class IngestionManifestBuilder:
return InputGroup([sp_in])
def _define_output_science_products(self):
sp_files = find_output_science_products(self.files_found, self.staging_source_dir)
sps_out = [OutputScienceProduct(self.sp_type, file.name) for file in sp_files]
return sps_out
def _build_output_group(self) -> OutputGroup:
"""
Create the output group using the parameters.
:return:
def _define_output_science_products(self) -> List[OutputScienceProduct]:
"""
Find in the staging dir a science product and, if applicable, its ancillary products.
# find ancillary products, if any
ancillary_products = self._find_ancillary_products()
HEADS UP! ASSUMPTION: only one science product in the staging dir.
This works for EVLA CAL and image ingestion but may need an overhaul
for future ingestion types. We return a list because that's what there is
in our example image ingestion manifest. YMMV, void where prohibited,
professional driver on closed course, not FDIC insured.
return OutputGroup(self._define_output_science_products(), ancillary_products)
def _build_ancillary_product(self, file: Path) -> AncillaryProduct:
:return: output science products found
"""
If this file is required for ingestion manifest creation,
create an ancillary product from it.
:param file: file found in staging dir
:return: ancillary product represented by this file, if any
if self.sp_type == ScienceProductType.EVLA_CAL:
tars_found = find_output_tars(self.files_found, self.staging_source_dir)
for file in tars_found:
sci_prod = OutputScienceProduct(type=self.sp_type, filename=str(file))
return [sci_prod]
elif self.sp_type == ScienceProductType.IMAGE:
products_finder = ImageIngestionProductsFinder(self.staging_source_dir)
science_products = products_finder.science_products
ancillary_products = products_finder.ancillary_products
# image_products = self._find_image_products()
# sps = []
# for ip in image_products:
# if ip.type == AncillaryProductType.QUICKLOOK_IMAGE:
# # this is the science product, a quicklook image
# sp_itself = ip
# elif ip.type == AncillaryProductType.QUICKLOOK_RMS_IMAGE:
# sps.append(ip)
# elif ip.type == AncillaryProductType.PIPELINE_WEBLOG_TYPE:
# sps.append(ip)
# # elif ip.type == AncillaryProductType.
#
# aips = []
# for file in tars_found:
# ap = self._build_ancillary_image_science_product(file)
# if ap:
# aips.append(ap)
#
# maybe_weblogs = [file for file in self.staging_source_dir.glob(WEBLOG_FILENAME)]
# if len(maybe_weblogs) > 0:
# weblog = Path(maybe_weblogs[0])
# weblog_ap = AncillaryProduct(
# type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=str(weblog)
# )
# aips.append(weblog_ap)
#
# sci_prod = OutputScienceProduct(
# type=sp_itself.type, filename=sp_itself.filename, ancillary_products=aips
# )
# return [sci_prod]
# else:
# raise ValueError(f"Don't know yet how to handle a {self.sp_type.vaue}")
# def _build_evla_cal_output_group(self) -> OutputGroup:
# # find science products
# science_products = self._define_output_science_products()
#
# return OutputGroup(science_products=science_products)
def _build_imaging_output_group(self) -> OutputGroup:
"""
if file.name == WEBLOG_FILENAME:
return AncillaryProduct(
type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=str(file)
)
if AncillaryProductType.PIPELINE_ARTIFACTS.value in file.name:
return AncillaryProduct(
type=AncillaryProductType.PIPELINE_ARTIFACTS, filename=str(file)
)
Create the output group using the parameters.
if AncillaryProductType.INGESTION_ARTIFACTS.value in file.name:
return AncillaryProduct(
type=AncillaryProductType.INGESTION_ARTIFACTS, filename=str(file)
)
:return:
"""
# this is not an ancillary product
return None
products_finder = ImageIngestionProductsFinder(self.staging_source_dir)
science_products = products_finder.science_products
ancillary_products = products_finder.ancillary_products
return OutputGroup(science_products=science_products, ancillary_products=ancillary_products)
# def _build_ancillary_image_science_product(self, file: Path):
# """
# Image science products will have ancillary products of their very own,
# distinct from other ancillary products that might be in the staging dir.
#
# :param file: a possible ancillary image product
# :return: the corresponding AncillaryProduct, if applicable
# """
# filename = str(file)
# if "image" in file.name:
# if file.name.endswith(".png"):
# return AncillaryProduct(type=AncillaryProductType.THUMBNAIL_IMG, filename=filename)
# elif file.name.endswith(".fits"):
# if "rms" in file.name:
# return AncillaryProduct(
# type=AncillaryProductType.QUICKLOOK_RMS_IMAGE, filename=filename
# )
# else:
# return AncillaryProduct(
# type=AncillaryProductType.QUICKLOOK_IMAGE, filename=filename
# )
#
# def _build_ancillary_product(self, file: Path) -> AncillaryProduct:
# """
# If this file is required for ingestion manifest creation,
# create an ancillary product from it.
#
# :param file: file found in staging dir
# :return: ancillary product represented by this file, if any
# """
# if file.name == WEBLOG_FILENAME:
# return AncillaryProduct(
# type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=str(file)
# )
#
# if AncillaryProductType.PIPELINE_ARTIFACTS.value in file.name:
# return AncillaryProduct(
# type=AncillaryProductType.PIPELINE_ARTIFACTS, filename=str(file)
# )
#
# if AncillaryProductType.INGESTION_ARTIFACTS.value in file.name:
# return AncillaryProduct(
# type=AncillaryProductType.INGESTION_ARTIFACTS, filename=str(file)
# )
#
# # this is not an ancillary product
# return None
@staticmethod
def build_artifacts_filename() -> str:
@@ -246,7 +362,7 @@ class IngestionManifestBuilder:
"""
current_time = pendulum.now()
timestamp = format_timestamp(current_time)
return f"{ARTIFACT_NAME}{timestamp}{TARFILE_EXT}"
return f"{INGESTION_ARTIFACTS_NAME}{timestamp}{TARFILE_EXT}"
def write_ingestion_artifacts_tar(self) -> Path:
"""
@@ -269,33 +385,55 @@ class IngestionManifestBuilder:
return ing_tar
def _find_ancillary_products(self) -> List[AncillaryProduct]:
"""
Round up any ancillary files found in ingestion path
:return: ancillary product(s) found
"""
ancillary_products = []
# if there's a weblog in here, grab it
maybe_weblogs = [file for file in self.files_found if file.name == WEBLOG_FILENAME]
if len(maybe_weblogs) > 0:
weblog = maybe_weblogs[0]
weblog_ap = AncillaryProduct(
type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=weblog.name
)
ancillary_products.append(weblog_ap)
additional_files = self._find_additional_ingestion_files()
for file in additional_files:
maybe_ap = self._build_ancillary_product(file)
if maybe_ap and maybe_ap not in ancillary_products:
ancillary_products.append(maybe_ap)
return ancillary_products
def _find_additional_ingestion_files(self) -> List[Path]:
# def _find_image_products(self) -> List[AncillaryProduct]:
# """
# Get ancillary products that belong to science products
#
# :return:
# """
# ingestion_files = self._find_ingestion_files()
# aips = []
# for file in ingestion_files:
# aip = self._build_ancillary_image_science_product(file)
# if aip:
# aips.append(aip)
#
# return aips
# def _find_ancillary_products_for_img_ingest(self) -> List[AncillaryProduct]:
# """
# Round up any ancillary files found in image ingestion staging dir
#
# :return: ancillary product(s) found
# """
#
# ancillary_products = []
#
# # TODO: START HERE THU 2021-07-29: the image files are science product ancillaries; the tars and the weblog are
# # ancillary products. in test_creates_expected_manifest(), we expect 1 sci prod w/2 sp ancillaries,
# # and 3 ancillary prods.
#
# ingestion_files = self._find_ingestion_files()
# for file in ingestion_files:
# maybe_ap = self._build_ancillary_product(file)
# if maybe_ap and maybe_ap not in ancillary_products:
# ancillary_products.append(maybe_ap)
#
# # There should be a weblog in here; grab it
# maybe_weblogs = [file for file in self.files_found if file.name == WEBLOG_FILENAME]
# if len(maybe_weblogs) > 0:
# file = self.staging_source_dir / WEBLOG_FILENAME
# weblog_ap = AncillaryProduct(
# type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=str(file)
# )
# if weblog_ap not in ancillary_products:
# ancillary_products.append(weblog_ap)
# else:
# raise FileNotFoundError(f"No weblog found in {self.staging_source_dir}")
#
# return ancillary_products
def _find_ingestion_files(self) -> List[Path]:
"""
Round up any other necessary ingestion file(s)
Loading