Skip to content
Snippets Groups Projects

Post Apocalypse Round 5 - restore & image

Merged Charlotte Hausman requested to merge testing_barrage_fixes into main
17 files
+ 1485
858
Compare changes
  • Side-by-side
  • Inline
Files
17
""" This is the entrypoint for ingestion launching """
import abc
import json
import logging
import re
import sys
import tarfile
from pathlib import Path
# pylint: disable=R0903
# pylint: disable=E0401, R0903, R1721
from typing import Tuple, List
import pendulum
from pendulum import DateTime
from ingest_envoy.ingestion_manifest_writer import EvlaCalIngestionManifestWriter
from ingest_envoy.utilities import ScienceProductType
from ingest_envoy.manifest_components import (
MANIFEST_NAME_BASE,
MANIFEST_NAME_EXT,
ARTIFACT_NAME,
ARTIFACT_EXT,
WEBLOG,
JSON,
IngestionManifestKey,
ManifestComponentIF,
InputScienceProduct,
InputGroup,
ManifestParameters,
OutputScienceProduct,
AncillaryProduct,
OutputGroup,
SCIENCE_PRODUCT_PATTERN,
)
from ingest_envoy.utilities import (
ScienceProductType,
Telescope,
IngestionManifestException,
AncillaryProductType,
find_output_science_products,
)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
# pylint: disable=R0902, R0913
class IngestionManifest:
"""needed for ingestion-launching interface"""
def __init__(self, staging_source_dir: str, ingestion_type: str, locator: str):
self.ingest_path = Path(staging_source_dir)
self.sp_type = ScienceProductType.from_str(ingestion_type)
class ManifestIF(ManifestComponentIF):
"""Interface for all ingestion manifests"""
def __init__(
self,
telescope: Telescope,
sp_type: ScienceProductType,
staging_source_dir: Path,
locator: str,
# all except EVLA_EB and VLASS catalog manifest have input group
input_group: InputGroup,
# every manifest has at least one output group
output_group: OutputGroup,
):
self.staging_source_dir = staging_source_dir
self.sp_type = sp_type
self.locator = locator
self.input_group = input_group
self.output_group = output_group
self.telescope = telescope
self.parameters = self.build_ingest_parameters()
self.files_found = [file for file in self.staging_source_dir.iterdir()]
@abc.abstractmethod
def create(self):
"""
Build and write the manifest, which includes gathering various items in
ingestion_path to get info for the manifest.
:return:
"""
@abc.abstractmethod
def write(self):
"""
Write this manifest to a file, along with the artifacts tar and any other files required
for this type of ingestion, at the ingest_path
:param: location of files to be ingested, which is where we'll put the manifest
:return:
"""
raise NotImplementedError
def __eq__(self, other):
if isinstance(other, IngestionManifest):
return other.input_group == self.input_group and other.output_group == self.output_group
return False
@abc.abstractmethod
def to_json(self) -> JSON:
"""
Turn this object into a JSON string suitable for writing to a file
:return:
"""
raise NotImplementedError
class IngestionManifestBuilder:
"""Builds ingestion manifest and associated files from files in ingestion_path"""
def __init__(
self,
staging_source_dir: Path,
sp_type: ScienceProductType,
locator: str,
telescope: Telescope,
):
self.telescope = telescope
self.staging_source_dir = staging_source_dir
self.sp_type = sp_type
self.locator = locator
self.files_found = [file for file in staging_source_dir.iterdir()]
if len(self.files_found) == 0:
raise IngestionManifestException(f"No ingestion files found at {staging_source_dir}")
def build(self) -> Tuple[ManifestIF, Path]:
"""
Using only -relevant- files in ingestion_path, write the manifest
and produce other files required for ingestion.
:return: the ingestion manifest and the file containing its JSON
"""
# # create any other ingestion files needed for this type of ingestion
# self._find_additional_ingestion_files()
# create the manifest
manifest = IngestionManifest(
telescope=self.telescope,
locator=self.locator,
sp_type=self.sp_type,
staging_source_dir=self.staging_source_dir,
input_group=self._build_input_group(),
output_group=self._build_output_group(),
)
manifest_file = manifest.write()
self.write_ingestion_artifacts_tar()
return manifest, manifest_file
def _build_input_group(self):
"""
Create the input group using the parameters.
:return:
"""
# N.B. this is sufficient for most types of ingestion,
# but ALMA CALs will have multiple EB SPs, identified only by locator,
# and VLBAs have no input group at all.
sp_in = InputScienceProduct(sp_type=self.sp_type, locator=self.locator)
return InputGroup([sp_in])
def _define_output_science_products(self):
sp_files = find_output_science_products(self.files_found, self.staging_source_dir)
sps_out = [OutputScienceProduct(self.sp_type, file.name) for file in sp_files]
return sps_out
def _build_output_group(self) -> OutputGroup:
"""
Create the output group using the parameters.
:return:
"""
# find ancillary products, if any
ancillary_products = self._find_ancillary_products()
tar_filename = self.build_artifacts_filename()
artifacts_ap = AncillaryProduct(
type=AncillaryProductType.PIPELINE_ARTIFACTS, filename=tar_filename
)
ancillary_products.append(artifacts_ap)
return OutputGroup(self._define_output_science_products(), ancillary_products)
@staticmethod
def build_artifacts_filename() -> str:
"""
Build unique manifest filename in standard format.
:return: the filename
"""
current_time = pendulum.now()
timestamp = format_timestamp(current_time)
return f"{ARTIFACT_NAME}{timestamp}{ARTIFACT_EXT}"
def write_ingestion_artifacts_tar(self) -> Path:
"""
Take the list of files and build a tar for inclusion into the archive.
This happens in the staging area for ingestion.
The EVLA CAL tar will contain just the manifest.
:return: a .tar archive of the ingestion artifacts
"""
ingestion_files = [file for file in self.staging_source_dir.iterdir() if file.is_file]
manifest_file = find_manifest(self.staging_source_dir)
ing_tar = self.staging_source_dir / self.build_artifacts_filename()
with tarfile.open(ing_tar, "w") as ingestion_artifacts_tar:
for file in ingestion_files:
ingestion_artifacts_tar.add(file)
# include the manifest
if manifest_file not in ingestion_files:
ingestion_artifacts_tar.add(manifest_file)
return ing_tar
def _find_ancillary_products(self) -> List[AncillaryProduct]:
"""
Round up any ancillary files found in ingestion path
:return: ancillary product(s) found
"""
ancillary_products = []
# if there's a weblog in here, grab it
maybe_weblogs = [file for file in self.files_found if file.name.endswith(WEBLOG)]
if len(maybe_weblogs) > 0:
weblog = maybe_weblogs[0]
weblog_ap = AncillaryProduct(
type=AncillaryProductType.PIPELINE_WEBLOG, filename=weblog.name
)
ancillary_products.append(weblog_ap)
more_aps = self._find_additional_ingestion_files()
if len(more_aps) > 0:
ancillary_products.append(more_aps)
return ancillary_products
def _find_additional_ingestion_files(self) -> List[Path]:
"""
Round up any other necessary ingestion file(s)
:return: additional relevant files found in ingestion path, if any
"""
if self.sp_type == ScienceProductType.EVLA_CAL:
# there won't be any others
return []
# TODO when the time comes: we'll have extra information for other ingestion types;
# see archive-metaproject
raise NotImplementedError
class IngestionManifest(ManifestIF):
"""needed for ingestion-launching interface"""
def build_ingest_parameters(self):
"""
Make the "parameters" section of the manifest
:return:
"""
if self.sp_type != ScienceProductType.EVLA_CAL:
raise NotImplementedError()
return ManifestParameters(
telescope=self.telescope,
reingest=False,
ngas_ingest=False,
calibrate=False,
staging_source_dir=self.staging_source_dir,
)
def _build_input_group(self):
"""
Create the input group using the parameters.
:return:
"""
# N.B. this is sufficient for most types of ingestion,
# but ALMA CALs will have multiple EB SPs, identified only by locator,
# and VLBAs have no input group at all.
sp_in = InputScienceProduct(sp_type=self.sp_type, locator=self.locator)
return InputGroup([sp_in])
def _build_output_group(self) -> OutputGroup:
"""
Create the output group using the parameters.
:return:
"""
sp_tar = self._find_science_product_tar()
find_output_science_products(self.files_found, self.staging_source_dir)
sps_out = [OutputScienceProduct(self.sp_type, sp_tar.name)]
# find ancillary products, if any
ancillary_products = self._find_ancillary_products()
weblog = Path(self.ingestion_path / WEBLOG)
if weblog.exists():
ancillary_products.append(AncillaryProduct(type=WEBLOG, filename=str(weblog)))
return OutputGroup(sps_out)
# @property
def ingestion_path(self) -> Path:
return self.parameters.ingestion_path
def write(self) -> Path:
"""
Write the manifest .json file.
:return:
"""
output_path = self.staging_source_dir / build_manifest_filename()
to_write = json.dumps(self.to_json(), indent=4)
with open(output_path, "w") as out:
out.write(to_write)
return output_path
def create(self):
"""
@@ -31,9 +335,85 @@ class IngestionManifest:
"""
if self.sp_type != ScienceProductType.EVLA_CAL:
return NotImplementedError(
f"Don't yet know how to handle {self.sp_type.value} science product"
raise NotImplementedError(
f"Don't yet know how to handle {self.sp_type.value} ingestion"
)
writer = EvlaCalIngestionManifestWriter(self.ingest_path)
writer.write_evla_cal_manifest(self.locator)
builder = IngestionManifestBuilder(
staging_source_dir=Path(self.staging_source_dir),
sp_type=self.sp_type,
locator=self.locator,
telescope=self.telescope,
)
builder.build()
def to_json(self) -> JSON:
"""
Turn this object into a JSON string suitable for writing to a file
:return:
"""
to_return = dict(self.__dict__)
return {
"locator": to_return["locator"],
IngestionManifestKey.PARAMETERS.value: self.build_ingest_parameters().to_json(),
IngestionManifestKey.INGESTION_PATH.value: str(self.ingestion_path),
IngestionManifestKey.INPUT_GROUP.value: to_return[
IngestionManifestKey.INPUT_GROUP.value
].to_json(),
IngestionManifestKey.OUTPUT_GROUP.value: to_return[
IngestionManifestKey.OUTPUT_GROUP.value
].to_json(),
}
def _find_science_product_tar(self) -> Path:
"""
A calibration ingestion staging dir should have ONE science product tar; ignore any others
:return:
"""
files = [file for file in self.staging_source_dir.iterdir() if file.is_file]
for file in files:
if re.match(SCIENCE_PRODUCT_PATTERN, file.name):
return file
raise FileNotFoundError(f"no science product found at {self.staging_source_dir}")
def format_timestamp(datetime: DateTime) -> str:
"""
Format the current time as
2021_07_01T13_49_17.237
:param datetime: current timestamp
:return: timestamp suitable for ingestion manifest filename
"""
return datetime.format("YYYY_MM_DDThh_mm_ss.SSS")
def build_manifest_filename() -> str:
"""
Build unique manifest filename in standard format.
:return: the filename
"""
current_time = pendulum.now()
timestamp = format_timestamp(current_time)
return f"{MANIFEST_NAME_BASE}{timestamp}{MANIFEST_NAME_EXT}"
def find_manifest(ingestion_path: Path) -> Path:
"""
Find the ingestion manifest at this ingestion path.
:param ingestion_path: home of ingestion files
:return:
"""
for file in ingestion_path.iterdir():
if file.name.startswith(MANIFEST_NAME_BASE) and file.name.endswith(MANIFEST_NAME_EXT):
return file
raise FileNotFoundError(f"No ingestion manifest found at {ingestion_path}")
Loading