Skip to content
Snippets Groups Projects

ingestion manifest creation for EVLA CAL ingestion only

Merged Daniel Lyons requested to merge WS-507-evla-cal-ingestion-manifest into main
8 files
+ 559
482
Compare changes
  • Side-by-side
  • Inline
Files
8
""" The ingestion manifest """
import json
# pylint: disable=E0402, R0903, R0913
# pylint: disable=C0415, E0401, E0402, R0903, R0913, R1721
import re
from pathlib import Path
from typing import Dict, List
from typing import Dict, Tuple, List
from .utilities import Telescope
import pendulum
from pendulum import DateTime
from .utilities import (
Telescope,
SCIENCE_PRODUCT_PATTERN,
WEBLOG,
ScienceProductType,
MANIFEST_NAME_BASE,
MANIFEST_NAME_EXT,
)
class IngestionManifest:
"""Entrypoint for ingestion manifest creation"""
def __init__(self, ingest_path: Path, sp_type: ScienceProductType, locator: str):
self.ingest_path = ingest_path
self.type = sp_type
self.locator = locator
if self.type == ScienceProductType.EVLA_CAL:
self.manifest = EvlaCalIngestionManifest(self.ingest_path, self.locator)
else:
raise NotImplementedError(
f"ingestion manifest creation not yet implemented for {self.type}"
)
def create(self) -> Tuple[Path, List[Path]]:
"""
Writes manifest and associated ingestion files to the staging dir (ingestion_path)
:return: ingestion manifest file for specified ScienceProductType
"""
# can't import up top due to circular dependency
from .ingestion_manifest_writer import IngestionManifestWriter
return IngestionManifestWriter(self.manifest, self.ingest_path).write()
def content(self) -> Dict:
"""
Accessor for manifest content
:return: manifest as dict
"""
return dict(
input_group=repr(self.manifest.input_group),
output_group=repr(self.manifest.output_group),
# associate_group=repr(self.associate_group), # TODO when we need it
ingestion_path=repr(self.manifest.ingestion_path),
science_products=repr(self.manifest.input_group.science_products),
ancillary_products=repr(self.manifest.output_group.ancillary_products),
)
@staticmethod
def format_timestamp(start_time: DateTime) -> str:
"""
Format the current time as follows:
input format:
2021-07-01T13:49:17.237119+00:00
desired output format as yyyy_MM_dd_'T'HH_mm_ss.SSS:
2021_07_01'T'13_49_17.237
:param start_time: current pendulum timestamp
:return: timestamp suitable for ingestion manifest filename
"""
time_str = str(start_time)
# change hyphens and colons to underscores
timestamp = time_str.replace("-", "_", len(time_str))
timestamp = timestamp.replace(":", "_", len(timestamp))
# chop off the last bit
timestamp = timestamp.split("+")[0]
# now timestamp ends with ss.###....; round to 3 places
ts_parts = timestamp.split("_")
seconds = float(ts_parts[len(ts_parts) - 1])
rounded = round(seconds, 3)
timestamp = timestamp.replace(str(seconds), str(rounded))
# finally, the T in single quotes
timestamp = timestamp.replace("T", "'T'")
return timestamp
@staticmethod
def filename() -> str:
"""
Build unique manifest filename in standard format.
:return: the filename
"""
current_time = pendulum.now()
timestamp = IngestionManifest.format_timestamp(current_time)
return f"{MANIFEST_NAME_BASE}{timestamp}{MANIFEST_NAME_EXT}"
# TODO: This class is not used for EVLA CAL manifests, but other ingestion manifests may be
# complex enough to need it, or something like it
class Parameters:
"""a manifest's various input parameters"""
def __init__(
self,
telescope: Telescope,
# for JSON parsing, path must be a string
ingestion_path: str,
additional_metadata: str,
collection_metadata: str,
# not needed for calibrations?
additional_metadata: str = None,
collection_metadata: str = None,
telescope: Telescope = Telescope.EVLA,
reingest: bool = False,
ngas_ingest: bool = False,
calibrate: bool = False,
@@ -34,7 +131,15 @@ class Parameters:
return repr(self.__dict__)
class InputGroup:
class EvlaCalInputScienceProduct:
"""Represents the "science_product" in EvlaCalInputGroup"""
def __init__(self, locator: str):
self.locator = locator
self.type = ScienceProductType.EVLA_CAL
class EvlaCalInputGroup:
"""
This represents the starting point for processing which generated a science product.
@@ -43,97 +148,122 @@ class InputGroup:
Initial assumption: Input groups consist only of science products.
"""
def __init__(self):
self.science_products = []
def __init__(self, science_product: EvlaCalInputScienceProduct):
# science product locator
self.science_products = [science_product]
def __repr__(self) -> str:
"""
Create the "input-group" section of the manifest as a JSON string
:return: JSONified InputGroup
"""
return repr(self.__dict__)
class OutputGroup:
"""Represents result of data processing"""
class EvlaCalOutputScienceProduct:
"""The science product in the output group"""
def __init__(self):
self.science_products = []
self.ancillary_products = []
def __init__(self, filename: str):
self.filename = filename
self.type = ScienceProductType.EVLA_CAL
def __repr__(self):
return repr(self.__dict__)
class IngestionManifest:
"""Represents JSON layout of ingestion information, encompassing several potential scenarios.
see ingest_envoy/test/examples, nicked from https://open-confluence.nrao.edu/x/roPCAQ
"""
class Weblog:
"""Represents a weblog.tgz as an ancillary product"""
def __init__(self, parameters: Parameters):
self.parameters = parameters
def __init__(self, weblog_path: Path):
self.ancillary_product = {"type": "weblog", "filename": str(weblog_path)}
self.input_group = InputGroup()
if self.parameters.additional_metadata:
self.input_group.science_products.append(
json.loads(self.parameters.additional_metadata)
)
def __repr__(self):
return repr(self.__dict__)
self.output_group = self.init_output_group()
if self.parameters.collection_metadata:
self.output_group.ancillary_products.append(
json.loads(self.parameters.collection_metadata)
)
self.ingestion_path = self.parameters.ingestion_path
def init_output_group(self) -> OutputGroup:
"""
Scan `ingestion_path` for files to be added to the output group
class EvlaCalOutputGroup:
"""Represents result of data processing. Will have a single calibration tar
plus a weblog.
"""
:return: List of paths to output group files
"""
output_group = OutputGroup()
ingest_dir_contents = [f for f in self.ingestion_path.iterdir()]
output_group.science_products.append(
self._filter_files_for_science_products(ingest_dir_contents)
)
def __init__(self, science_product: EvlaCalOutputScienceProduct, weblog: Path):
self.science_products = [science_product]
self.ancillary_products = [Weblog(weblog)]
def _filter_files_for_science_products(dir_contents: List[Path]) -> List[Path]:
def __repr__(self):
"""
Search for science products in a list of files and return them
Create the "output-group" section of the manifest as a JSON string
:return: List of paths to science products found
:return: JSONified OutputGroup
"""
# for f in dir_contents:
raise NotImplementedError
return repr(self.__dict__)
def content(self) -> Dict:
class EvlaCalIngestionManifest:
"""this is JUST the ingestion manifest JSON"""
def __init__(self, ingestion_path: Path, spl: str):
"""
Accessor for manifest content
:return: manifest as dict
:param ingestion_path: staging directory
:param spl: science product locator
"""
return dict(
input_group=repr(self.input_group),
output_group=repr(self.output_group),
ingestion_path=repr(self.ingestion_path),
science_products=repr(self.input_group.science_products),
ancillary_products=repr(self.output_group.ancillary_products),
)
self.ingestion_path = ingestion_path
self.locator = spl
self.sp_type = ScienceProductType.EVLA_CAL
self.infiles = [file for file in self.ingestion_path.iterdir()]
self.input_group = EvlaCalInputGroup(EvlaCalInputScienceProduct(self.locator))
self.output_group = self._make_output_group()
class AssociateGroup:
"""
NOTE: Currently only used in the case of RealFast execution blocks
def __str__(self):
params = self._make_params_section()
input_group = repr(self.input_group)
output_group = repr(self.output_group)
return f"{params}\n{input_group}\n{output_group}"
A representation of Science Products which are not part of the same Input or Output groups
but are still fundamentally linked. Created for RealFast project, to link the RealFast
specific execution block & image to the execution block during which a transient was
discovered.
def _make_params_section(self) -> str:
"""
Create the JSON for the "parameters" section of the manifest.
It's always the same for any EVLA CAL ingestion manifest, except for the ingestion path.
Associate groups also, by definition, include any science product(s) within the output
group to be ingested. The new locators generated at ingestion time will be added to any
which compose an associate group in the manifest.
"""
:return: stringified JSON
"""
params = {
"parameters": {
"reingest": "false",
"ngas-ingest": "false",
"calibrate": "false",
"ingestion_path": str(self.ingestion_path),
}
}
return json.dumps(params)
def _make_output_group(self) -> EvlaCalOutputGroup:
"""
Create the JSON for the "output-group" section of the manifest.
An EVLA CAL OutputGroup contains a science product(s) and an ancillary product
(the weblog)
def __init__(self):
self.science_products = []
:return: manifest output group
"""
sp_tar = self._find_science_product_tar()
science_product = EvlaCalOutputScienceProduct(str(sp_tar))
weblog = Path(self.ingestion_path / WEBLOG)
if weblog.exists():
return EvlaCalOutputGroup(science_product, weblog)
def __repr__(self):
return repr(self.__dict__)
return EvlaCalOutputGroup(science_product, None)
def _find_science_product_tar(self) -> Path:
"""
A calibration ingestion staging dir should have ONE science product tar; ignore any others
:return:
"""
for file in self.infiles:
if re.match(SCIENCE_PRODUCT_PATTERN, file.name):
return file
return None
Loading