Skip to content
Snippets Groups Projects

WS-797: Polishing QA process

Merged Nathan Hertz requested to merge WS-797-set-request-state-using-version-state into main
1 unresolved thread
Files
6
@@ -16,19 +16,19 @@
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
""" This is the entrypoint for ingestion launching """
import abc
import json
import logging
import sys
import tarfile
from pathlib import Path
# pylint: disable=E0401, R0903, R1721
# pylint: disable=C0301, E0401, R0903, R1721
from typing import Tuple
import pendulum
from ingest_envoy.manifest_components import (
INGESTION_ARTIFACTS_NAME,
INIT_WEBLOG_FILENAME,
JSON,
MANIFEST_FILENAME,
TARFILE_EXT,
@@ -47,9 +47,11 @@ from ingest_envoy.std_img_manifest_utils import ImageIngestionProductsFinder
from ingest_envoy.utilities import (
AncillaryProductType,
IngestionManifestException,
NoScienceProductException,
ScienceProductType,
Telescope,
find_output_tars,
find_weblogs,
)
from pendulum import DateTime
@@ -60,8 +62,8 @@ logger.addHandler(logging.StreamHandler(sys.stdout))
# pylint: disable=R0902, R0913
class ManifestIF(ManifestComponentIF):
"""Interface for all ingestion manifests"""
class IngestionManifest(ManifestComponentIF):
"""Encapsulates an ingestion manifest"""
def __init__(
self,
@@ -124,17 +126,6 @@ class ManifestIF(ManifestComponentIF):
return params
@abc.abstractmethod
def write(self):
"""
Write this manifest to a file, along with the artifacts tar and any other files required
for this type of ingestion, at the ingest_path
:param: location of files to be ingested, which is where we'll put the manifest
:return:
"""
raise NotImplementedError
def __eq__(self, other):
if isinstance(other, IngestionManifest):
return (
@@ -145,21 +136,51 @@ class ManifestIF(ManifestComponentIF):
return False
@abc.abstractmethod
def ingestion_path(self) -> Path:
"""
Get the ingestion path associated with this manifest.
:return:
"""
return self.parameters.staging_source_dir
def write(self) -> Path:
"""
Write the manifest .json file.
:return:
"""
me_dict = self.to_json()
output_path = self.staging_source_dir / MANIFEST_FILENAME
to_write = json.dumps(me_dict, indent=4)
with open(output_path, "w") as out:
out.write(to_write)
return output_path
def to_json(self) -> JSON:
"""
Turn this object into a JSON string suitable for writing to a file
:return:
"""
raise NotImplementedError
me_dict = dict(self.__dict__)
to_return = {
IngestionManifestKey.PARAMETERS.value: self.parameters.to_json(),
IngestionManifestKey.INPUT_GROUP.value: me_dict[IngestionManifestKey.INPUT_GROUP.value].to_json(),
IngestionManifestKey.OUTPUT_GROUP.value: me_dict[IngestionManifestKey.OUTPUT_GROUP.value].to_json(),
}
return to_return
class IngestionManifestBuilder:
"""Builds ingestion manifest and associated files from files in ingestion_path"""
# TODO?: We are making assumptions about the values of "reingest", "ngas_ingest",
# and "calibrate"; is this valid?
def __init__(
self,
staging_source_dir: Path,
@@ -192,7 +213,7 @@ class IngestionManifestBuilder:
if len(self.files_found) == 0:
raise IngestionManifestException(f"No ingestion files found at {staging_source_dir}")
def build(self) -> Tuple[ManifestIF, Path]:
def build(self) -> Tuple[IngestionManifest, Path]:
"""
Using only -relevant- files in ingestion_path, write the manifest
and produce other files required for ingestion.
@@ -221,12 +242,20 @@ class IngestionManifestBuilder:
# (required for ingestion, evidently)
artifacts_filename = self._build_artifacts_filename()
artifacts_ap = AncillaryProduct(AncillaryProductType.INGESTION_ARTIFACTS, filename=artifacts_filename)
manifest.output_group.ancillary_products.append(artifacts_ap)
if not manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products = []
weblog_ap = AncillaryProduct(type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=WEBLOG_FILENAME)
manifest.output_group.ancillary_products.append(weblog_ap)
manifest.output_group.ancillary_products.append(artifacts_ap)
if weblog_ap not in manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products.append(weblog_ap)
# # If this isn't version 1, there should be an initial weblog from v1
# init_weblog = self._find_init_weblog_if_any()
# if init_weblog:
# init_weblog_ap = AncillaryProduct(type=AncillaryProductType.PIPELINE_WEBLOG_TYPE,
# filename=INIT_WEBLOG_FILENAME)
# manifest.output_group.ancillary_products.append(init_weblog_ap)
manifest_file = manifest.write()
artifacts_file = self.staging_source_dir / artifacts_filename
@@ -234,6 +263,19 @@ class IngestionManifestBuilder:
return manifest, manifest_file
def _find_init_weblog_if_any(self):
"""
Is there an initial weblog in the staging source dir?
(If so, this calibration is v2 or higher)
:return: initial weblog, if any
"""
for file in [file for file in self.staging_source_dir.iterdir()]:
if file.name == INIT_WEBLOG_FILENAME:
return file
return None
def _build_image_manifest(self):
"""
Image manifest has additional_metadata, and output group is way more complicated
@@ -275,13 +317,13 @@ class IngestionManifestBuilder:
def _build_evla_cal_output_group(self):
"""
Create imaging manifest output group using the parameters
Create EVLA standard calibration manifest output group using the parameters
and the contents of the staging dir.
:return:
:return: an output group, if a science product is found
"""
# find science product (we expect just one for this SP type)
# find science product (we expect just one for this SP type)
tars_found = find_output_tars(self.files_found, self.staging_source_dir)
sci_prod = None
@@ -290,7 +332,16 @@ class IngestionManifestBuilder:
break
if sci_prod:
return OutputGroup(science_products=[sci_prod])
weblog_files = find_weblogs(self.files_found, self.staging_source_dir)
weblogs = []
for file in weblog_files:
ap = AncillaryProduct(type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=file.name)
if ap not in weblogs:
weblogs.append(ap)
else:
raise NoScienceProductException(f">>> NO SCIENCE PRODUCT FOUND in {self.staging_source_dir}")
return OutputGroup(science_products=[sci_prod], ancillary_products=weblogs)
def _build_imaging_output_group(self) -> OutputGroup:
"""
@@ -344,46 +395,6 @@ class IngestionManifestBuilder:
return ingestion_artifacts_tar
class IngestionManifest(ManifestIF):
"""write ingestion manifest to file"""
def ingestion_path(self) -> Path:
return self.parameters.staging_source_dir
def write(self) -> Path:
"""
Write the manifest .json file.
:return:
"""
me_dict = self.to_json()
output_path = self.staging_source_dir / MANIFEST_FILENAME
to_write = json.dumps(me_dict, indent=4)
with open(output_path, "w") as out:
out.write(to_write)
return output_path
def to_json(self) -> JSON:
"""
Turn this object into a JSON string suitable for writing to a file
:return:
"""
me_dict = dict(self.__dict__)
to_return = {
IngestionManifestKey.PARAMETERS.value: self.parameters.to_json(),
IngestionManifestKey.INPUT_GROUP.value: me_dict[IngestionManifestKey.INPUT_GROUP.value].to_json(),
IngestionManifestKey.OUTPUT_GROUP.value: me_dict[IngestionManifestKey.OUTPUT_GROUP.value].to_json(),
}
return to_return
def format_timestamp(datetime: DateTime) -> str:
"""
Format the current time as
Loading