Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (3)
......@@ -16,19 +16,19 @@
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
""" This is the entrypoint for ingestion launching """
import abc
import json
import logging
import sys
import tarfile
from pathlib import Path
# pylint: disable=E0401, R0903, R1721
# pylint: disable=C0301, E0401, R0903, R1721
from typing import Tuple
import pendulum
from ingest_envoy.manifest_components import (
INGESTION_ARTIFACTS_NAME,
INIT_WEBLOG_FILENAME,
JSON,
MANIFEST_FILENAME,
TARFILE_EXT,
......@@ -47,9 +47,11 @@ from ingest_envoy.std_img_manifest_utils import ImageIngestionProductsFinder
from ingest_envoy.utilities import (
AncillaryProductType,
IngestionManifestException,
NoScienceProductException,
ScienceProductType,
Telescope,
find_output_tars,
find_weblogs,
)
from pendulum import DateTime
......@@ -60,8 +62,8 @@ logger.addHandler(logging.StreamHandler(sys.stdout))
# pylint: disable=R0902, R0913
class ManifestIF(ManifestComponentIF):
"""Interface for all ingestion manifests"""
class IngestionManifest(ManifestComponentIF):
"""Encapsulates an ingestion manifest"""
def __init__(
self,
......@@ -124,17 +126,6 @@ class ManifestIF(ManifestComponentIF):
return params
@abc.abstractmethod
def write(self):
"""
Write this manifest to a file, along with the artifacts tar and any other files required
for this type of ingestion, at the ingest_path
:param: location of files to be ingested, which is where we'll put the manifest
:return:
"""
raise NotImplementedError
def __eq__(self, other):
if isinstance(other, IngestionManifest):
return (
......@@ -145,21 +136,51 @@ class ManifestIF(ManifestComponentIF):
return False
@abc.abstractmethod
def ingestion_path(self) -> Path:
"""
Get the ingestion path associated with this manifest.
:return:
"""
return self.parameters.staging_source_dir
def write(self) -> Path:
"""
Write the manifest .json file.
:return:
"""
me_dict = self.to_json()
output_path = self.staging_source_dir / MANIFEST_FILENAME
to_write = json.dumps(me_dict, indent=4)
with open(output_path, "w") as out:
out.write(to_write)
return output_path
def to_json(self) -> JSON:
"""
Turn this object into a JSON string suitable for writing to a file
:return:
"""
raise NotImplementedError
me_dict = dict(self.__dict__)
to_return = {
IngestionManifestKey.PARAMETERS.value: self.parameters.to_json(),
IngestionManifestKey.INPUT_GROUP.value: me_dict[IngestionManifestKey.INPUT_GROUP.value].to_json(),
IngestionManifestKey.OUTPUT_GROUP.value: me_dict[IngestionManifestKey.OUTPUT_GROUP.value].to_json(),
}
return to_return
class IngestionManifestBuilder:
"""Builds ingestion manifest and associated files from files in ingestion_path"""
# TODO?: We are making assumptions about the values of "reingest", "ngas_ingest",
# and "calibrate"; is this valid?
def __init__(
self,
staging_source_dir: Path,
......@@ -192,7 +213,7 @@ class IngestionManifestBuilder:
if len(self.files_found) == 0:
raise IngestionManifestException(f"No ingestion files found at {staging_source_dir}")
def build(self) -> Tuple[ManifestIF, Path]:
def build(self) -> Tuple[IngestionManifest, Path]:
"""
Using only -relevant- files in ingestion_path, write the manifest
and produce other files required for ingestion.
......@@ -221,12 +242,20 @@ class IngestionManifestBuilder:
# (required for ingestion, evidently)
artifacts_filename = self._build_artifacts_filename()
artifacts_ap = AncillaryProduct(AncillaryProductType.INGESTION_ARTIFACTS, filename=artifacts_filename)
manifest.output_group.ancillary_products.append(artifacts_ap)
if not manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products = []
weblog_ap = AncillaryProduct(type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=WEBLOG_FILENAME)
manifest.output_group.ancillary_products.append(weblog_ap)
manifest.output_group.ancillary_products.append(artifacts_ap)
if weblog_ap not in manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products.append(weblog_ap)
# # If this isn't version 1, there should be an initial weblog from v1
# init_weblog = self._find_init_weblog_if_any()
# if init_weblog:
# init_weblog_ap = AncillaryProduct(type=AncillaryProductType.PIPELINE_WEBLOG_TYPE,
# filename=INIT_WEBLOG_FILENAME)
# manifest.output_group.ancillary_products.append(init_weblog_ap)
manifest_file = manifest.write()
artifacts_file = self.staging_source_dir / artifacts_filename
......@@ -234,6 +263,19 @@ class IngestionManifestBuilder:
return manifest, manifest_file
def _find_init_weblog_if_any(self):
"""
Is there an initial weblog in the staging source dir?
(If so, this calibration is v2 or higher)
:return: initial weblog, if any
"""
for file in [file for file in self.staging_source_dir.iterdir()]:
if file.name == INIT_WEBLOG_FILENAME:
return file
return None
def _build_image_manifest(self):
"""
Image manifest has additional_metadata, and output group is way more complicated
......@@ -275,13 +317,13 @@ class IngestionManifestBuilder:
def _build_evla_cal_output_group(self):
"""
Create imaging manifest output group using the parameters
Create EVLA standard calibration manifest output group using the parameters
and the contents of the staging dir.
:return:
:return: an output group, if a science product is found
"""
# find science product (we expect just one for this SP type)
# find science product (we expect just one for this SP type)
tars_found = find_output_tars(self.files_found, self.staging_source_dir)
sci_prod = None
......@@ -290,7 +332,16 @@ class IngestionManifestBuilder:
break
if sci_prod:
return OutputGroup(science_products=[sci_prod])
weblog_files = find_weblogs(self.files_found, self.staging_source_dir)
weblogs = []
for file in weblog_files:
ap = AncillaryProduct(type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=file.name)
if ap not in weblogs:
weblogs.append(ap)
else:
raise NoScienceProductException(f">>> NO SCIENCE PRODUCT FOUND in {self.staging_source_dir}")
return OutputGroup(science_products=[sci_prod], ancillary_products=weblogs)
def _build_imaging_output_group(self) -> OutputGroup:
"""
......@@ -344,46 +395,6 @@ class IngestionManifestBuilder:
return ingestion_artifacts_tar
class IngestionManifest(ManifestIF):
"""write ingestion manifest to file"""
def ingestion_path(self) -> Path:
return self.parameters.staging_source_dir
def write(self) -> Path:
"""
Write the manifest .json file.
:return:
"""
me_dict = self.to_json()
output_path = self.staging_source_dir / MANIFEST_FILENAME
to_write = json.dumps(me_dict, indent=4)
with open(output_path, "w") as out:
out.write(to_write)
return output_path
def to_json(self) -> JSON:
"""
Turn this object into a JSON string suitable for writing to a file
:return:
"""
me_dict = dict(self.__dict__)
to_return = {
IngestionManifestKey.PARAMETERS.value: self.parameters.to_json(),
IngestionManifestKey.INPUT_GROUP.value: me_dict[IngestionManifestKey.INPUT_GROUP.value].to_json(),
IngestionManifestKey.OUTPUT_GROUP.value: me_dict[IngestionManifestKey.OUTPUT_GROUP.value].to_json(),
}
return to_return
def format_timestamp(datetime: DateTime) -> str:
"""
Format the current time as
......
......@@ -30,6 +30,7 @@ MANIFEST_FILENAME = "ingestion-manifest.json"
INGESTION_ARTIFACTS_NAME = "ingestion_artifacts_"
TARFILE_EXT = ".tar"
WEBLOG_FILENAME = "weblog.tgz"
INIT_WEBLOG_FILENAME = "initial_weblog.tgz"
SCIENCE_PRODUCT_PATTERN = re.compile("[a-zA-Z0-9._\\-+]*\\.tar")
JSON = Union[int, float, str, List["JSON"], Dict[str, "JSON"]]
......@@ -197,19 +198,14 @@ class AncillaryProduct(ManifestComponentIF):
# make this an ancillary to a particular science product (assumes locator string)
if science_associate:
self.science_associate = science_associate # TODO, possibly: enum?
self.science_associate = science_associate
# make this an ancillary to the group of a science product (assumes locator string)
if group_with:
self.group_with = group_with
def __eq__(self, other):
if isinstance(other, AncillaryProduct):
return (
other.type == self.type
and other.filename == self.filename
and other.group_with == self.group_with
and other.science_associate == self.science_associate
)
if isinstance(other, AncillaryProduct) and other.type == self.type and other.filename == self.filename:
return other.group_with == self.group_with and other.science_associate == self.science_associate
return False
......@@ -256,7 +252,7 @@ class OutputScienceProduct(ManifestComponentIF):
return False
def __str__(self):
return f"{Path(self.filename).name}: {self.type.value}, " f"{len(self.ancillary_products)} ancillary products"
return f"{Path(self.filename).name}: {self.type.value}, {len(self.ancillary_products)} ancillary products"
def to_json(self) -> JSON:
json_dict = {"type": self.type.value, "filename": self.filename}
......@@ -303,18 +299,3 @@ class OutputGroup(ManifestComponentIF):
me_dict[IngestionManifestKey.ANCILLARY_PRODUCTS.value] = ap_jsons
return me_dict
class Weblog:
"""Represents a weblog.tgz as an ancillary product"""
def __init__(self, weblog_path: Path):
self.ancillary_product = {"type": "weblog", "filename": str(weblog_path)}
def to_json(self) -> JSON:
"""
JSON-ify this object
:return: json.load()-able string
"""
return dict(self.__dict__)
......@@ -77,6 +77,10 @@ class IngestionManifestException(Exception):
"""Throw this if we're unable to construct an ingestion manifest using supplied inputs"""
class NoScienceProductException(Exception):
"""Throw this if no science product is found in the staging source directory"""
def find_output_tars(files_found, staging_source_dir) -> List[Path]:
"""
Round up the output science products associated with this SP type.
......@@ -89,3 +93,19 @@ def find_output_tars(files_found, staging_source_dir) -> List[Path]:
raise IngestionManifestException(f"No output science products found at {staging_source_dir}")
return tar_files
def find_weblogs(files_found, staging_source_dir) -> List[Path]:
"""
Get the weblog in the ingest source directory.
If there's an initial weblog (i.e., this calibration is v2 or higher),
get that too.
:return:
"""
weblogs = [file for file in files_found if file.name.endswith(".tgz") and "weblog" in file.name]
if len(weblogs) == 0:
raise IngestionManifestException(f"No weblogs found at {staging_source_dir}")
return weblogs
......@@ -24,6 +24,7 @@ from typing import List
import pytest
from ingest_envoy.manifest_components import (
INIT_WEBLOG_FILENAME,
WEBLOG_FILENAME,
AncillaryProduct,
OutputScienceProduct,
......@@ -88,6 +89,21 @@ def populate_fake_evla_cal_ingest_path(staging_dir: Path) -> List[Path]:
return files
def populate_fake_final_evla_cal_ingest_path(staging_dir: Path) -> List[Path]:
"""
Create a directory containing fake calibration products, plus other stuff
that we -don't- want to ingest, PLUS an initial weblog
:param staging_dir: our temporary dir
:return:
"""
files = populate_fake_evla_cal_ingest_path(staging_dir)
init_weblog = staging_dir / "initial_weblog.tgz"
init_weblog.touch()
files.append(init_weblog)
return files
# -----------------------------
# Image manifest test data
# -----------------------------
......@@ -112,6 +128,7 @@ OUTPUT_SCIENCE_PRODUCT = OutputScienceProduct(
# input files
WEBLOG_ANCILLARY = AncillaryProduct(type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=WEBLOG_FILENAME)
INIT_WEBLOG_ANCILLARY = AncillaryProduct(type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=INIT_WEBLOG_FILENAME)
PIPELINE_AF_ANCILLARY = AncillaryProduct(
type=AncillaryProductType.PIPELINE_ARTIFACTS,
filename="pipeline_artifacts_2021_08_04T15_46_02.tar",
......@@ -129,9 +146,10 @@ STAGING_DIR_FILES = [
]
def populate_fake_tmpx_ratuqh_ingest_path(staging_source_dir: Path) -> List[Path]:
def populate_fake_tmpx_ratuqh_ingest_path(staging_source_dir: Path, is_final: bool = False) -> List[Path]:
"""
make a bunch of fake files that should result in the example manifest
Make a bunch of fake files that should result in the example manifest.
If this is version 2 or later of a standard calibration, include the initial weblog.
:return:
"""
......@@ -142,6 +160,8 @@ def populate_fake_tmpx_ratuqh_ingest_path(staging_source_dir: Path) -> List[Path
fake_files_to_create.append(PIPELINE_AF_ANCILLARY.filename)
fake_files_to_create.append(WEBLOG_ANCILLARY.filename)
if is_final:
fake_files_to_create.append(INIT_WEBLOG_ANCILLARY.filename)
fake_files_to_create.append(OUTPUT_SCIENCE_PRODUCT.filename)
......
{
"parameters": {
"reingest": false,
"ngas_ingest": true,
"telescope": "EVLA",
"ingestion_path": "/lustre/aoc/cluster/pipeline/dsoc-prod/stage_products/20A-346_2021_07_23_T13_37_08.376"
},
"input_group": {
"science_products": [
{
"locator": "uid://evla/execblock/50bb85af-ce52-49d8-b9d8-9221bfce939d"
}
]
},
"output_group": {
"science_products": [
{
"type": "calibration",
"filename": "20A-346_2021_07_23_T13_37_08.376.tar"
}
],
"ancillary_products": [
{
"type": "pipeline_weblog",
"filename": "weblog.tgz"
},
{
"type": "pipeline_weblog",
"filename": "initial_weblog.tgz"
},
{
"type": "ingestion_artifacts",
"filename": "ingestion_artifacts_2021_07_27_T17_35_11.463.tar"
}
]
}
}
......@@ -24,14 +24,18 @@ import sys
import tarfile
from pathlib import Path
# pylint: disable=E0401, E0402, R1721, W0621
# pylint: disable=C0103, C0301, E0401, E0402, R0914, R1721, W0621, W1514
from unittest.mock import patch
import pytest
from ingest_envoy.ingestion_manifest import IngestionManifestBuilder, find_manifest
from ingest_envoy.manifest_components import (
INGESTION_ARTIFACTS_NAME,
INIT_WEBLOG_FILENAME,
MANIFEST_FILENAME,
TARFILE_EXT,
WEBLOG_FILENAME,
AncillaryProduct,
IngestionManifestKey,
InputGroup,
......@@ -47,8 +51,8 @@ from .conftest import (
EVLA_CAL_INPUT_FILENAMES,
UNWANTED,
find_example_manifest,
ingest_path,
populate_fake_evla_cal_ingest_path,
populate_fake_final_evla_cal_ingest_path,
)
# pylint: disable=E0401, E1120
......@@ -71,7 +75,7 @@ def test_filters_cal_input_files(ingest_path: Path):
:param ingest_path: our temporary dir
"""
populate_fake_evla_cal_ingest_path(ingest_path)
populate_fake_evla_cal_ingest_path(staging_dir=ingest_path)
locator = "uid://evla/calibration/twinkle-twinkle-little-quasar"
manifest, _ = IngestionManifestBuilder(
telescope=Telescope.EVLA.value,
......@@ -91,7 +95,6 @@ def test_filters_cal_input_files(ingest_path: Path):
assert len(input_group.science_products) == 1
output_group = manifest.output_group
assert len(output_group.science_products) == 1
assert len(output_group.ancillary_products) == 2
for product in output_group.ancillary_products:
......@@ -137,6 +140,19 @@ def test_writes_expected_evla_cal_output_files(ingest_path: Path):
shutil.rmtree(ingest_path)
@pytest.mark.skip("TODO")
def test_writes_final_evla_cal_output(ingest_path: Path):
"""
For a second or subsequent version of a standard calibration,
id the manifest builder produce the manifest file, the science product tar,
the weblog, -and- the initial weblog?
:param ingest_path: ingestion location
:return:
"""
raise NotImplementedError
def test_params_json_well_formed():
"""
Make sure our ManifestParameters makes nice JSON
......@@ -333,7 +349,80 @@ def test_evla_cal_manifest_matches_example(ingest_path: Path):
assert len(actual_osp) == len(expected_osp)
expected_aps = expected_outgroup["ancillary_products"]
assert len(expected_aps) == 2
actual_aps = manifest.output_group.ancillary_products
logger.info(">>> ACTUAL ANCILLARY PRODUCTS")
for ap in actual_aps:
logger.info(f"{ap.filename}: {ap.type}, group with {ap.group_with}")
assert len(actual_aps) == len(expected_aps)
shutil.rmtree(ingest_path)
def test_evla_cal_final_manifest_matches_example(ingest_path: Path):
"""
Given the correct parameters, manifest that matches evla_cal_final_manifest
should be generated; i.e., ancillary)products should contain initial_weblog.tgz
in addition to weblog.tgz
:return:
"""
expected_dir_name = "/lustre/aoc/cluster/pipeline/dsoc-prod/stage_products/20A-346_2021_07_23_T13_37_08.376"
example = find_example_manifest("evla_cal_final_manifest")
with open(example, "r") as infile:
expected_json = dict(json.load(infile).items())
# populate ingestion path with fake files for manifest builder to find
populate_fake_final_evla_cal_ingest_path(ingest_path)
builder = IngestionManifestBuilder(
staging_source_dir=ingest_path,
telescope=Telescope.EVLA.value,
sp_type=ScienceProductType.EVLA_CAL.value,
locator="uid://evla/execblock/50bb85af-ce52-49d8-b9d8-9221bfce939d",
)
manifest, manifest_file = builder.build()
with open(manifest_file, "r") as infile:
actual_json = dict(json.load(infile).items())
actual_json[IngestionManifestKey.PARAMETERS.value][IngestionManifestKey.INGESTION_PATH.value] = expected_dir_name
expected_params = expected_json["parameters"]
actual_params = manifest.parameters.to_json()
expected_reingest = "true" if expected_params["reingest"] else "false"
assert actual_params["reingest"] == expected_reingest
expected_ngas_ingest = "true" if expected_params["ngas_ingest"] else "false"
assert actual_params["ngas_ingest"] == expected_ngas_ingest
assert "calibrate" not in actual_params.keys()
assert manifest.input_group.to_json() == expected_json["input_group"]
expected_outgroup = expected_json["output_group"]
expected_osp = expected_outgroup["science_products"]
actual_osp = manifest.output_group.science_products
assert len(actual_osp) == len(expected_osp)
logger.info(">>> SCIENCE PRODUCTS:")
for sp in actual_osp:
logger.info(f"{sp.filename}")
if sp.ancillary_products is not None:
for ap in sp.ancillary_products:
logger.info(f"{ap.filename}")
expected_aps = expected_outgroup["ancillary_products"]
assert len(expected_aps) == 3
actual_aps = manifest.output_group.ancillary_products
assert len(actual_aps) == len(expected_aps) == 2
logger.info(">>> ACTUAL ANCILLARY PRODUCTS")
for ap in actual_aps:
logger.info(f"{ap.filename}: {ap.type}, group with {ap.group_with}")
assert len(actual_aps) == len(expected_aps)
found_count = 0
for ap in actual_aps:
if ap.filename == WEBLOG_FILENAME or ap.filename == INIT_WEBLOG_FILENAME:
found_count += 1
assert found_count == 2
shutil.rmtree(ingest_path)