Skip to content
Snippets Groups Projects

ingestion manifest creation for EVLA CAL ingestion only

Merged Daniel Lyons requested to merge WS-507-evla-cal-ingestion-manifest into main
2 files
+ 177
0
Compare changes
  • Side-by-side
  • Inline
Files
2
""" Objects pertaining to the various ingestion manifests """
from __future__ import annotations
import json
import re
import tarfile
from enum import Enum
from pathlib import Path
from typing import List, Dict
# pylint: disable=E0401, R0903, R1721
MANIFEST_NAME_BASE = "ingestion_manifest_"
MANIFEST_NAME_EXT = ".json"
ARTIFACT_NAME = "ingestion_artifacts_"
ARTIFACT_EXT = ".tar"
WEBLOG = "weblog.tgz"
# pylint: disable=R0903
SCIENCE_PRODUCT_PATTERN = re.compile("[a-zA-Z0-9._\\-+]*\\.tar")
class Telescope(Enum):
"""Codifying the names of our telescopes, because Janet can't abide magic strings"""
VLA = 1
EVLA = 2
ALMA = 3
VLBA = 4
GBT = 5
NONE = 6
VLA = "VLA"
EVLA = "EVLA"
ALMA = "ALMA"
VLBA = "VLBA"
GBT = "GBT"
NONE = "NONE"
class IngestionType(Enum):
@@ -55,24 +71,35 @@ class ScienceProductType(Enum):
"""Canonical collection of ingestible types of science products"""
EXEC_BLOCK = "execution_block"
CAL = "calibration"
EVLA_CAL = "calibration"
CATALOG = "catalog"
IMAGE = "image"
def __str__(self):
return f'"{str(self.value)}"'
class ScienceProduct:
"""Represents a science product in an ingestion manifest"""
@staticmethod
def from_str(sp_type_in) -> ScienceProductType:
"""
In comes a string; out comes the corresponding ScienceProductType, if any.
Or maybe it's already a ScienceProductType, in which case we can just return it.
def __init__(self, sp_type: ScienceProductType, filename: str, locator: str, group_with: str):
self.sp_type = sp_type
self.filename = filename
# product locator, used for input groups; locator for a known science product
self.locator = locator
# for "late" science products; they get added to an existing output group
self.group_with = group_with
:param sp_type_in: a string that "should" represent a ScienceProductType
:return:
"""
if isinstance(sp_type_in, ScienceProductType):
return sp_type_in
for spt in ScienceProductType:
if spt.value == sp_type_in:
return spt
raise ValueError(
f"unrecognized ScienceProductType: {sp_type_in}; it's a {type(sp_type_in)}"
)
class AncillaryProductType:
class AncillaryProductType(Enum):
"""The various types of ancillary products we'll encounter"""
INGESTION_ARTIFACTS = "ingestion_artifacts"
@@ -92,6 +119,9 @@ class AncillaryProductType:
CANDIDATE_IMG = "candidate_image"
THUMBNAIL_IMG = "thumbnail_image"
def __str__(self):
return f'"{self.value}"'
class AncillaryProduct:
"""Represents an ancillary product in an ingestion manifest"""
@@ -105,3 +135,189 @@ class AncillaryProduct:
self.science_associate = science_associate # TODO, possibly: enum?
# make this an ancillary to the group of a science product (assumes locator string)
self.group_with = group_with
def write_ingestion_artifact_tar(ingestion_location: Path, ingestion_files: List[Path]) -> Path:
"""
Take the list of files and build a tar for inclusion into the archive.
This happens in the staging area for ingestion.
:param ingestion_location: path to ingestion location (probably the spool directory)
:param ingestion_files: all the files needed for ingestion
:return: a .tar archive of the ingestion artifacts
"""
ingestion_artifacts_tar = ingestion_location / "ingestion_artifacts.tar"
with tarfile.open(ingestion_artifacts_tar, "w") as ingestion_artifacts_tar:
for file in ingestion_files:
ingestion_artifacts_tar.add(file)
return Path(ingestion_artifacts_tar.name)
class EvlaCalInputScienceProduct:
"""Represents the "science_product" in EvlaCalInputGroup"""
def __init__(self, locator: str):
self.locator = locator
self.type = ScienceProductType.EVLA_CAL
def __json__(self) -> Dict[str, str]:
json_out = self.__dict__
json_out["type"] = ScienceProductType.EVLA_CAL.value
return json_out
class EvlaCalInputGroup:
"""
This represents the starting point for processing which generated a science product.
There is not always an input group for every output group (rawdata, for instance).
Initial assumption: Input groups consist only of science products.
"""
def __init__(self, science_product: EvlaCalInputScienceProduct):
# science product locator
self.science_products = [science_product]
def __json__(self) -> Dict[str, str]:
"""
Create the "input-group" section of the manifest as a JSON string
:return: JSONified InputGroup
"""
json_out = self.__dict__
sps = json_out["science_products"]
sci_prod = sps[0]
sp_str = sci_prod if isinstance(sci_prod, str) else sci_prod.__json__()
json_out["science_products"] = f"[{sp_str}]"
return json_out
class EvlaCalOutputScienceProduct:
"""The science product in the output group"""
def __init__(self, filename: str):
self.filename = filename
self.type = ScienceProductType.EVLA_CAL
def __json__(self) -> Dict[str, str]:
json_out = self.__dict__
json_out["type"] = ScienceProductType.EVLA_CAL.value
return json_out
class Weblog:
"""Represents a weblog.tgz as an ancillary product"""
def __init__(self, weblog_path: Path):
self.ancillary_product = {"type": "weblog", "filename": str(weblog_path)}
def __json__(self) -> Dict[str, str]:
return dict(self.__dict__)
class EvlaCalOutputGroup:
"""Represents result of data processing. Will have a single calibration tar
plus a weblog.
"""
def __init__(self, science_product: EvlaCalOutputScienceProduct, weblog: Path):
self.science_products = [science_product]
self.ancillary_products = [Weblog(weblog)]
def __json__(self) -> Dict[str, str]:
"""
Create the "output-group" section of the manifest as a JSON string.
__json__() will not work; __repr__() is necessary for json.loads() to succeed.
:return: JSONified OutputGroup
"""
json_out = self.__dict__
anc_prod = self.ancillary_products[0]
ap_str = anc_prod if isinstance(anc_prod, str) else anc_prod.__json__()
json_out[IngestionManifestKey.ANCILLARY_PRODUCTS.value] = f"[{ap_str}]"
sci_prod = self.science_products[0]
sp_str = sci_prod if isinstance(sci_prod, str) else sci_prod.__json__()
json_out[IngestionManifestKey.SCIENCE_PRODUCTS.value] = f"[{sp_str}]"
return json_out
class EvlaCalIngestionManifest:
"""TODO: this is JUST the ingestion manifest JSON, not a bespoke object"""
def __init__(self, ingestion_path: Path, spl: str):
"""
:param ingestion_path: staging directory
:param spl: science product locator
"""
self.ingestion_path = ingestion_path
self.locator = spl
self.sp_type = ScienceProductType.EVLA_CAL
self.infiles = [file for file in self.ingestion_path.iterdir()]
self.input_group = EvlaCalInputGroup(EvlaCalInputScienceProduct(self.locator))
self.output_group = self._make_output_group()
def __str__(self):
params = self._make_params_section()
input_group = self.input_group.__json__()
output_group = self.output_group.__json__()
return f"{params}\n{input_group}\n{output_group}"
def _make_params_section(self) -> str:
"""
Create the JSON for the "parameters" section of the manifest.
It's always the same for any EVLA CAL ingestion manifest, except for the ingestion path.
:return: stringified JSON
"""
params = {
"parameters": {
"reingest": "false",
"ngas-ingest": "false",
"calibrate": "false",
"ingestion_path": str(self.ingestion_path),
}
}
return json.dumps(params)
def _make_output_group(self) -> EvlaCalOutputGroup:
"""
Create the JSON for the "output-group" section of the manifest.
An EVLA CAL OutputGroup contains a science product(s) and an ancillary product
(the weblog)
:return: manifest output group
"""
sp_tar = self._find_science_product_tar()
science_product = EvlaCalOutputScienceProduct(str(sp_tar))
weblog = Path(self.ingestion_path / WEBLOG)
if weblog.exists():
return EvlaCalOutputGroup(science_product, weblog)
return EvlaCalOutputGroup(science_product, None)
def _find_science_product_tar(self) -> Path:
"""
A calibration ingestion staging dir should have ONE science product tar; ignore any others
:return:
"""
for file in self.infiles:
if re.match(SCIENCE_PRODUCT_PATTERN, file.name):
return file
class IngestionManifestKey(Enum):
"""Sections we expect to see in a manifest"""
INPUT_GROUP = "input_group"
OUTPUT_GROUP = "output_group"
INGESTION_PATH = "ingestion_path"
SCIENCE_PRODUCTS = "science_products"
ANCILLARY_PRODUCTS = "ancillary_products"
Loading