Skip to content
Snippets Groups Projects

ingestion manifest creation for EVLA CAL ingestion only

Merged Daniel Lyons requested to merge WS-507-evla-cal-ingestion-manifest into main
8 files
+ 559
482
Compare changes
  • Side-by-side
  • Inline
Files
8
""" The ingestion manifest """
import json
""" This is the entrypoint for ingestion launching """
# pylint: disable=E0402, R0903, R0913
from typing import Dict
import logging
import sys
from pathlib import Path
from .utilities import Telescope
# pylint: disable=R0903
from ingest_envoy.ingestion_manifest_writer import EvlaCalIngestionManifestWriter
from ingest_envoy.utilities import ScienceProductType
class Parameters:
"""a manifest's various input parameters"""
def __init__(
self,
telescope: Telescope,
# for JSON parsing, path must be a string
ingestion_path: str,
additional_metadata: str,
collection_metadata: str,
reingest: bool = False,
ngas_ingest: bool = False,
):
self.telescope = telescope
self.ingestion_path = ingestion_path
self.additional_metadata = additional_metadata
self.collection_metadata = collection_metadata
self.reingest = reingest
self.ngas_ingest = ngas_ingest
def __repr__(self):
return repr(self.__dict__)
class InputGroup:
"""
This represents the starting point for processing which generated a science product.
There is not always an input group for every output group (rawdata, for instance).
Initial assumption: Input groups consist only of science products.
"""
def __init__(self):
self.science_products = []
def __repr__(self) -> str:
return repr(self.__dict__)
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
class IngestionManifest:
"""Represents JSON layout of ingestion information, encompassing several potential scenarios.
see ingest_envoy/test/examples, nicked from https://open-confluence.nrao.edu/x/roPCAQ
"""
def __init__(self, parameters: Parameters):
self.parameters = parameters
self.input_group = InputGroup()
if self.parameters.additional_metadata:
self.input_group.science_products.append(
json.loads(self.parameters.additional_metadata)
)
"""needed for ingestion-launching interface"""
self.output_group = OutputGroup()
if self.parameters.collection_metadata:
self.output_group.ancillary_products.append(
json.loads(self.parameters.collection_metadata)
)
self.ingestion_path = self.parameters.ingestion_path
# TODO: what is this, and how do we use it?
self.associate_group = AssociateGroup()
def __init__(self, staging_source_dir: str, ingestion_type: str, locator: str):
self.ingest_path = Path(staging_source_dir)
self.sp_type = ScienceProductType.from_str(ingestion_type)
self.locator = locator
def content(self) -> Dict:
def create(self):
"""
Accessor for manifest content
Create the ingestion manifest in this directory for a product of this type,
identified by this locator.
:return: manifest as dict
:return:
"""
return dict(
input_group=repr(self.input_group),
output_group=repr(self.output_group),
associate_group=repr(self.associate_group),
ingestion_path=repr(self.ingestion_path),
science_products=repr(self.input_group.science_products),
ancillary_products=repr(self.output_group.ancillary_products),
)
class OutputGroup:
"""Represents result of data processing"""
def __init__(self):
self.science_products = []
self.ancillary_products = []
def __repr__(self):
return repr(self.__dict__)
class AssociateGroup:
"""
A representation of Science Products which are not part of the same Input or Output groups
but are still fundamentally linked. Created for RealFast project, to link the RealFast
specific execution block & image to the execution block during which a transient was
discovered.
Associate groups also, by definition, include any science product(s) within the output
group to be ingested. The new locators generated at ingestion time will be added to any
which compose an associate group in the manifest.
"""
def __init__(self):
self.science_products = []
if self.sp_type != ScienceProductType.EVLA_CAL:
return NotImplementedError(
f"Don't yet know how to handle {self.sp_type.value} science product"
)
def __repr__(self):
return repr(self.__dict__)
writer = EvlaCalIngestionManifestWriter(self.ingest_path)
writer.write_evla_cal_manifest(self.locator)
Loading