Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (6)
Showing
with 320 additions and 88 deletions
...@@ -26,6 +26,7 @@ from ingest_envoy.manifest_components import ( ...@@ -26,6 +26,7 @@ from ingest_envoy.manifest_components import (
OutputGroup, OutputGroup,
MANIFEST_FILENAME, MANIFEST_FILENAME,
ParamsKey, ParamsKey,
WEBLOG_FILENAME,
) )
from ingest_envoy.schema import AbstractTextFile from ingest_envoy.schema import AbstractTextFile
from ingest_envoy.std_img_manifest_utils import ImageIngestionProductsFinder from ingest_envoy.std_img_manifest_utils import ImageIngestionProductsFinder
...@@ -84,21 +85,30 @@ class ManifestIF(ManifestComponentIF): ...@@ -84,21 +85,30 @@ class ManifestIF(ManifestComponentIF):
params = ManifestParameters( params = ManifestParameters(
telescope=self.telescope, telescope=self.telescope,
reingest=False, reingest=False,
ngas_ingest=False, ngas_ingest=True,
calibrate=False, calibrate=False,
staging_source_dir=self.staging_source_dir, staging_source_dir=self.staging_source_dir,
additional_metadata=additional_metadata, additional_metadata=additional_metadata,
) )
else: self.additional_metadata = additional_metadata
elif self.sp_type == ScienceProductType.IMAGE:
params = ManifestParameters( params = ManifestParameters(
telescope=self.telescope, telescope=self.telescope,
reingest=False, reingest=False,
ngas_ingest=False, ngas_ingest=True,
calibrate=False, calibrate=False,
staging_source_dir=self.staging_source_dir, staging_source_dir=self.staging_source_dir,
) )
self.additional_metadata = additional_metadata else:
params = ManifestParameters(
telescope=self.telescope,
reingest=False,
ngas_ingest=True,
staging_source_dir=self.staging_source_dir,
)
return params return params
@abc.abstractmethod @abc.abstractmethod
...@@ -135,6 +145,8 @@ class ManifestIF(ManifestComponentIF): ...@@ -135,6 +145,8 @@ class ManifestIF(ManifestComponentIF):
class IngestionManifestBuilder: class IngestionManifestBuilder:
"""Builds ingestion manifest and associated files from files in ingestion_path""" """Builds ingestion manifest and associated files from files in ingestion_path"""
# TODO?: We are making assumptions about the values of "reingest", "ngas_ingest",
# and "calibrate"; is this valid?
def __init__( def __init__(
self, self,
staging_source_dir: Path, staging_source_dir: Path,
...@@ -169,9 +181,6 @@ class IngestionManifestBuilder: ...@@ -169,9 +181,6 @@ class IngestionManifestBuilder:
:return: the ingestion manifest and the file containing its JSON :return: the ingestion manifest and the file containing its JSON
""" """
# # create any other ingestion files needed for this type of ingestion
# self._find_additional_ingestion_files()
if self.sp_type == ScienceProductType.EVLA_CAL: if self.sp_type == ScienceProductType.EVLA_CAL:
return self._build_evla_cal_manifest() return self._build_evla_cal_manifest()
...@@ -187,13 +196,26 @@ class IngestionManifestBuilder: ...@@ -187,13 +196,26 @@ class IngestionManifestBuilder:
input_group=self._build_input_group(), input_group=self._build_input_group(),
output_group=self._build_evla_cal_output_group(), output_group=self._build_evla_cal_output_group(),
) )
manifest_file = manifest.write()
artifacts_tar = self.write_ingestion_artifacts_tar() # We can't create the ingestion artifacts tar quite yet,
# because it will contain the yet-to-be-written manifest itself
# (required for ingestion, evidently)
artifacts_filename = self._build_artifacts_filename()
artifacts_ap = AncillaryProduct( artifacts_ap = AncillaryProduct(
AncillaryProductType.INGESTION_ARTIFACTS, filename=artifacts_tar.name AncillaryProductType.INGESTION_ARTIFACTS, filename=artifacts_filename
) )
manifest.output_group.ancillary_products = [artifacts_ap] if not manifest.output_group.ancillary_products:
manifest.output_group.ancillary_products = []
weblog_ap = AncillaryProduct(
type=AncillaryProductType.PIPELINE_WEBLOG_TYPE, filename=WEBLOG_FILENAME
)
manifest.output_group.ancillary_products.append(weblog_ap)
manifest.output_group.ancillary_products.append(artifacts_ap)
manifest_file = manifest.write()
artifacts_file = self.staging_source_dir / artifacts_filename
self.write_ingestion_artifacts_tar(artifacts_file)
return manifest, manifest_file return manifest, manifest_file
...@@ -215,11 +237,12 @@ class IngestionManifestBuilder: ...@@ -215,11 +237,12 @@ class IngestionManifestBuilder:
manifest_file = manifest.write() manifest_file = manifest.write()
artifacts_tar = self.write_ingestion_artifacts_tar() artifacts_file = self.staging_source_dir / self._build_artifacts_filename()
artifacts_ap = AncillaryProduct( artifacts_ap = AncillaryProduct(
type=AncillaryProductType.INGESTION_ARTIFACTS, filename=artifacts_tar.name type=AncillaryProductType.INGESTION_ARTIFACTS, filename=str(artifacts_file)
) )
manifest.output_group.ancillary_products.append(artifacts_ap) manifest.output_group.ancillary_products.append(artifacts_ap)
self.write_ingestion_artifacts_tar(artifacts_file)
return manifest, manifest_file return manifest, manifest_file
...@@ -248,7 +271,7 @@ class IngestionManifestBuilder: ...@@ -248,7 +271,7 @@ class IngestionManifestBuilder:
tars_found = find_output_tars(self.files_found, self.staging_source_dir) tars_found = find_output_tars(self.files_found, self.staging_source_dir)
for file in tars_found: for file in tars_found:
sci_prod = OutputScienceProduct(type=self.sp_type, filename=str(file)) sci_prod = OutputScienceProduct(type=self.sp_type, filename=file.name)
return OutputGroup(science_products=[sci_prod]) return OutputGroup(science_products=[sci_prod])
...@@ -267,7 +290,7 @@ class IngestionManifestBuilder: ...@@ -267,7 +290,7 @@ class IngestionManifestBuilder:
return OutputGroup(science_products=science_products, ancillary_products=ancillary_products) return OutputGroup(science_products=science_products, ancillary_products=ancillary_products)
@staticmethod @staticmethod
def build_artifacts_filename() -> str: def _build_artifacts_filename() -> str:
""" """
Build unique manifest filename in standard format. Build unique manifest filename in standard format.
...@@ -277,7 +300,7 @@ class IngestionManifestBuilder: ...@@ -277,7 +300,7 @@ class IngestionManifestBuilder:
timestamp = format_timestamp(current_time) timestamp = format_timestamp(current_time)
return f"{INGESTION_ARTIFACTS_NAME}{timestamp}{TARFILE_EXT}" return f"{INGESTION_ARTIFACTS_NAME}{timestamp}{TARFILE_EXT}"
def write_ingestion_artifacts_tar(self) -> tarfile.TarFile: def write_ingestion_artifacts_tar(self, artifacts_path: Path) -> tarfile.TarFile:
""" """
Take the list of files and build a tar for inclusion into the archive. Take the list of files and build a tar for inclusion into the archive.
This happens in the staging area for ingestion. This happens in the staging area for ingestion.
...@@ -286,13 +309,14 @@ class IngestionManifestBuilder: ...@@ -286,13 +309,14 @@ class IngestionManifestBuilder:
:return: a .tar archive of the ingestion artifacts :return: a .tar archive of the ingestion artifacts
""" """
manifest_file = find_manifest(self.staging_source_dir) with tarfile.open(artifacts_path, "w") as ingestion_artifacts_tar:
ing_tar = self.staging_source_dir / self.build_artifacts_filename()
with tarfile.open(ing_tar, "w") as ingestion_artifacts_tar:
for file in self.files_found: for file in self.files_found:
ingestion_artifacts_tar.add(file) if self.sp_type == ScienceProductType.IMAGE:
ingestion_artifacts_tar.add(file)
# include the manifest # The manifest file itself is considered an ingestion artifact.
# (It's turtles all the way down.)
manifest_file = self.staging_source_dir / MANIFEST_FILENAME
ingestion_artifacts_tar.add(manifest_file) ingestion_artifacts_tar.add(manifest_file)
return ingestion_artifacts_tar return ingestion_artifacts_tar
......
...@@ -9,7 +9,7 @@ from typing import Union, List, Dict ...@@ -9,7 +9,7 @@ from typing import Union, List, Dict
from ingest_envoy.schema import AbstractTextFile from ingest_envoy.schema import AbstractTextFile
from ingest_envoy.utilities import ScienceProductType, Telescope, AncillaryProductType from ingest_envoy.utilities import ScienceProductType, Telescope, AncillaryProductType
MANIFEST_FILENAME = "ingestion_manifest.json" MANIFEST_FILENAME = "ingestion-manifest.json"
INGESTION_ARTIFACTS_NAME = "ingestion_artifacts_" INGESTION_ARTIFACTS_NAME = "ingestion_artifacts_"
TARFILE_EXT = ".tar" TARFILE_EXT = ".tar"
WEBLOG_FILENAME = "weblog.tgz" WEBLOG_FILENAME = "weblog.tgz"
...@@ -106,21 +106,28 @@ class InputGroup(ManifestComponentIF): ...@@ -106,21 +106,28 @@ class InputGroup(ManifestComponentIF):
class ManifestParameters(ManifestComponentIF): class ManifestParameters(ManifestComponentIF):
"""Represents "parameters" section of ingestion manifest""" """Represents "parameters" section of ingestion manifest.
ASSUMPTIONS:
* EVLA CAL manifest has no "calibrate" parameter
* "ngas_ingest" is always True (per our testing examples)
"""
def __init__( def __init__(
self, self,
telescope: Telescope, telescope: Telescope,
reingest: bool, reingest: bool,
ngas_ingest: bool, ngas_ingest: bool,
calibrate: bool,
staging_source_dir: Path, staging_source_dir: Path,
additional_metadata: AbstractTextFile = None, additional_metadata: AbstractTextFile = None,
calibrate: bool = None,
): ):
self.telescope = telescope self.telescope = telescope
self.reingest = reingest self.reingest = reingest
self.ngas_ingest = ngas_ingest self.ngas_ingest = ngas_ingest
self.calibrate = calibrate if calibrate is not None:
self.calibrate = calibrate
self.staging_source_dir = staging_source_dir self.staging_source_dir = staging_source_dir
self.additional_metadata = additional_metadata self.additional_metadata = additional_metadata
...@@ -139,13 +146,17 @@ class ManifestParameters(ManifestComponentIF): ...@@ -139,13 +146,17 @@ class ManifestParameters(ManifestComponentIF):
return False return False
def to_json(self) -> JSON: def to_json(self) -> JSON:
json_dict = { json_dict = {
ParamsKey.TELESCOPE.value: self.telescope.value, ParamsKey.TELESCOPE.value: self.telescope.value,
ParamsKey.REINGEST.value: str(self.reingest).lower(), # The ingestion manifest must have "true" and "false"
ParamsKey.NGAS_INGEST.value: str(self.ngas_ingest).lower(), # rather than "True" and "False"
ParamsKey.CALIBRATE.value: str(self.calibrate).lower(), ParamsKey.REINGEST.value: "true" if self.reingest else "false",
ParamsKey.NGAS_INGEST.value: "true" if self.ngas_ingest else "false",
ParamsKey.INGESTION_PATH.value: str(self.staging_source_dir), ParamsKey.INGESTION_PATH.value: str(self.staging_source_dir),
} }
if hasattr(self, "calibrate"):
json_dict[ParamsKey.CALIBRATE.value] = "true" if self.calibrate else "false"
if self.additional_metadata: if self.additional_metadata:
json_dict[ParamsKey.ADDITIONAL_METADATA.value] = str(self.additional_metadata) json_dict[ParamsKey.ADDITIONAL_METADATA.value] = str(self.additional_metadata)
...@@ -228,7 +239,10 @@ class OutputScienceProduct(ManifestComponentIF): ...@@ -228,7 +239,10 @@ class OutputScienceProduct(ManifestComponentIF):
return False return False
def __str__(self): def __str__(self):
return f"{Path(self.filename).name}: {self.type.value}, {len(self.ancillary_products)} ancillary products" return (
f"{Path(self.filename).name}: {self.type.value}, "
f"{len(self.ancillary_products)} ancillary products"
)
def to_json(self) -> JSON: def to_json(self) -> JSON:
json_dict = {"type": self.type.value, "filename": self.filename} json_dict = {"type": self.type.value, "filename": self.filename}
......
...@@ -9,7 +9,7 @@ import pytest ...@@ -9,7 +9,7 @@ import pytest
from ingest_envoy.manifest_components import WEBLOG_FILENAME from ingest_envoy.manifest_components import WEBLOG_FILENAME
EVLA_CAL_WANTED_FILENAMES = ["my_science_products.tar", WEBLOG_FILENAME] EVLA_CAL_INPUT_FILENAMES = ["20A-346_2021_07_23_T13_37_08.376.tar", WEBLOG_FILENAME]
UNWANTED = ["ignore_me.fits", "just_a_lotta_nothing", "uninteresting_metadata.xml"] UNWANTED = ["ignore_me.fits", "just_a_lotta_nothing", "uninteresting_metadata.xml"]
IMG_MANIFEST_INPUT_FILENAMES = [ IMG_MANIFEST_INPUT_FILENAMES = [
...@@ -80,7 +80,7 @@ def populate_fake_evla_cal_ingest_path(staging_dir: Path) -> List[Path]: ...@@ -80,7 +80,7 @@ def populate_fake_evla_cal_ingest_path(staging_dir: Path) -> List[Path]:
""" """
files = [] files = []
filenames = [filename for filename in EVLA_CAL_WANTED_FILENAMES] filenames = [filename for filename in EVLA_CAL_INPUT_FILENAMES]
for filename in UNWANTED: for filename in UNWANTED:
filenames.append(filename) filenames.append(filename)
......
...@@ -27,4 +27,4 @@ ...@@ -27,4 +27,4 @@
} }
] ]
} }
} }
\ No newline at end of file
{ {
"parameters": { "parameters": {
"reingest": false, "reingest": false,
"ngas_ingest": false, "ngas_ingest": true,
"telescope": "EVLA", "telescope": "EVLA",
"ingestion_path": "/lustre/yea/and/here/we/go" "ingestion_path": "/lustre/aoc/cluster/pipeline/dsoc-prod/stage_products/20A-346_2021_07_23_T13_37_08.376"
}, },
"input_group": { "input_group": {
"science_products": [ "science_products": [
{ {
"locator": "uid://evla/execblock/fjdsakljfkdlsajfkldsa" "locator": "uid://evla/execblock/50bb85af-ce52-49d8-b9d8-9221bfce939d"
} }
] ]
}, },
...@@ -16,17 +16,17 @@ ...@@ -16,17 +16,17 @@
"science_products": [ "science_products": [
{ {
"type": "calibration", "type": "calibration",
"filename": "XYZ-abc+TMN.O00.tar" "filename": "20A-346_2021_07_23_T13_37_08.376.tar"
} }
], ],
"ancillary_products": [ "ancillary_products": [
{ {
"type": "pipeline_weblog", "type": "pipeline_weblog",
"filename": "qrs.weblog.tgz" "filename": "weblog.tgz"
}, },
{ {
"type": "ingestion_artifacts", "type": "ingestion_artifacts",
"filename": "ingestion_artifacts_2019_07_25_T15_43_33.144.tar" "filename": "ingestion_artifacts_2021_07_27_T17_35_11.463.tar"
} }
] ]
} }
......
...@@ -7,7 +7,10 @@ import sys ...@@ -7,7 +7,10 @@ import sys
from pathlib import Path from pathlib import Path
# pylint: disable=E0401, E0402, R1721, W0621 # pylint: disable=E0401, E0402, R1721, W0621
from unittest.mock import patch
import pytest import pytest
import tarfile
from ingest_envoy.ingestion_manifest import ( from ingest_envoy.ingestion_manifest import (
IngestionManifestBuilder, IngestionManifestBuilder,
...@@ -23,7 +26,7 @@ from ingest_envoy.manifest_components import ( ...@@ -23,7 +26,7 @@ from ingest_envoy.manifest_components import (
OutputGroup, OutputGroup,
TARFILE_EXT, TARFILE_EXT,
INGESTION_ARTIFACTS_NAME, INGESTION_ARTIFACTS_NAME,
WEBLOG_FILENAME, MANIFEST_FILENAME,
) )
from ingest_envoy.utilities import ( from ingest_envoy.utilities import (
ScienceProductType, ScienceProductType,
...@@ -37,7 +40,7 @@ from ingest_envoy.utilities import ( ...@@ -37,7 +40,7 @@ from ingest_envoy.utilities import (
from .conftest import ( from .conftest import (
ingest_path, ingest_path,
populate_fake_evla_cal_ingest_path, populate_fake_evla_cal_ingest_path,
EVLA_CAL_WANTED_FILENAMES, EVLA_CAL_INPUT_FILENAMES,
UNWANTED, UNWANTED,
find_example_manifest, find_example_manifest,
) )
...@@ -49,10 +52,9 @@ logger.addHandler(logging.StreamHandler(sys.stdout)) ...@@ -49,10 +52,9 @@ logger.addHandler(logging.StreamHandler(sys.stdout))
FAKE_LOCATOR = "uid://evla/calibration/doo-wah-ditty-ditty-af123" FAKE_LOCATOR = "uid://evla/calibration/doo-wah-ditty-ditty-af123"
@pytest.mark.skip("TODO: broken temporarily, pending fix to output group creation")
def test_filters_cal_input_files(ingest_path: Path): def test_filters_cal_input_files(ingest_path: Path):
""" """
We'll be getting calibration/image/eb, etc. science products from a directory under We'll be getting science products from a directory under
/lustre/aoc/cluster/pipeline/{CAPO_PROFILE}/workspaces/staging /lustre/aoc/cluster/pipeline/{CAPO_PROFILE}/workspaces/staging
Make sure we take -only- the files to be ingested. Make sure we take -only- the files to be ingested.
...@@ -73,16 +75,18 @@ def test_filters_cal_input_files(ingest_path: Path): ...@@ -73,16 +75,18 @@ def test_filters_cal_input_files(ingest_path: Path):
assert manifest.locator == locator assert manifest.locator == locator
params = manifest.parameters params = manifest.parameters
assert not params.reingest and not params.ngas_ingest and not params.calibrate assert params.reingest == False and params.ngas_ingest == True
assert not hasattr(params, "calibrate")
input_group = manifest.input_group input_group = manifest.input_group
assert len(input_group.science_products) == 1 assert len(input_group.science_products) == 1
output_group = manifest.output_group output_group = manifest.output_group
assert len(output_group.science_products) == 1 assert len(output_group.science_products) == 1
assert len(output_group.ancillary_products) == 1 assert len(output_group.ancillary_products) == 2
for product in output_group.ancillary_products: for product in output_group.ancillary_products:
if product.filename not in EVLA_CAL_WANTED_FILENAMES: if product.filename not in EVLA_CAL_INPUT_FILENAMES:
assert product.filename.startswith( assert product.filename.startswith(
INGESTION_ARTIFACTS_NAME INGESTION_ARTIFACTS_NAME
) and product.filename.endswith(TARFILE_EXT) ) and product.filename.endswith(TARFILE_EXT)
...@@ -91,13 +95,12 @@ def test_filters_cal_input_files(ingest_path: Path): ...@@ -91,13 +95,12 @@ def test_filters_cal_input_files(ingest_path: Path):
sp_out = output_group.science_products[0] sp_out = output_group.science_products[0]
assert sp_out.type == ScienceProductType.EVLA_CAL assert sp_out.type == ScienceProductType.EVLA_CAL
assert sp_out.filename in EVLA_CAL_WANTED_FILENAMES assert sp_out.filename in EVLA_CAL_INPUT_FILENAMES
assert sp_out.filename not in UNWANTED assert sp_out.filename not in UNWANTED
shutil.rmtree(ingest_path) shutil.rmtree(ingest_path)
@pytest.mark.skip("TODO: broken temporarily, pending fix to output group creation")
def test_writes_expected_evla_cal_output_files(ingest_path: Path): def test_writes_expected_evla_cal_output_files(ingest_path: Path):
""" """
Did the manifest builder produce the manifest file, the weblog, and the science product tar? Did the manifest builder produce the manifest file, the weblog, and the science product tar?
...@@ -209,54 +212,105 @@ def test_output_group_well_formed(): ...@@ -209,54 +212,105 @@ def test_output_group_well_formed():
type=AncillaryProductType.PIPELINE_ARTIFACTS, filename="with_feathers.tar" type=AncillaryProductType.PIPELINE_ARTIFACTS, filename="with_feathers.tar"
) )
opg = OutputGroup(science_products=[osp], ancillary_products=[ap1, ap2])
# all filenames in the manifest should be bare filenames, not full paths
for science_product in opg.science_products:
assert "/" not in science_product.filename
for ancillary_product in opg.ancillary_products:
assert "/" not in ancillary_product.filename
expected_json = { expected_json = {
IngestionManifestKey.SCIENCE_PRODUCTS.value: [osp.to_json()], IngestionManifestKey.SCIENCE_PRODUCTS.value: [osp.to_json()],
IngestionManifestKey.ANCILLARY_PRODUCTS.value: [ap1.to_json(), ap2.to_json()], IngestionManifestKey.ANCILLARY_PRODUCTS.value: [ap1.to_json(), ap2.to_json()],
} }
opg = OutputGroup(science_products=[osp], ancillary_products=[ap1, ap2])
actual_json = opg.to_json() actual_json = opg.to_json()
assert actual_json == expected_json assert actual_json == expected_json
@pytest.mark.skip("TODO") def test_ingestion_artifacts_tar_filename_built_just_once(ingest_path: Path):
def test_ingestion_artifacts_tar_correct():
""" """
TODO We run into trouble if manifest builder's _build_artifacts_filename()
is called more than once.
:param ingest_path:
:return: :return:
""" """
raise NotImplementedError populate_fake_evla_cal_ingest_path(ingest_path)
with patch(
"ingest_envoy.ingestion_manifest.IngestionManifestBuilder._build_artifacts_filename",
return_value="ingestion_artifacts_mine_mine_mine.tar",
) as mock:
# build the manifest
IngestionManifestBuilder(
staging_source_dir=ingest_path,
sp_type=ScienceProductType.EVLA_CAL.value,
locator="uid://evla/calibration/are-we-there-yet",
telescope=Telescope.EVLA.value,
).build()
mock.assert_called_once()
shutil.rmtree(ingest_path)
def test_ingestion_artifacts_tar_correct(ingest_path: Path):
"""
Do we build the correct ingestion_artifacts tar for for standard imaging ingestion?
:param ingest_path: the ingestion staging dir
:return:
"""
populate_fake_evla_cal_ingest_path(ingest_path)
locator = "uid://evla/calibration/3dfa528b-9870-46c9-a200-131dbac701cc"
# if you build it, they will come
IngestionManifestBuilder(
staging_source_dir=ingest_path,
sp_type=ScienceProductType.EVLA_CAL.value,
locator=locator,
telescope=Telescope.EVLA.value,
).build()
# get ingestion artifacts
artifacts_file = [file for file in ingest_path.glob("ingestion_artifacts*.tar")][0]
with tarfile.open(artifacts_file, "r") as tar:
# confirm that contains has as many files as we expect...
members = tar.getmembers()
assert len(members) == 1
member = members[0]
mf_path = Path(member.name)
assert mf_path.name == MANIFEST_FILENAME
shutil.rmtree(ingest_path)
@pytest.mark.skip("TODO: broken temporarily, pending fix to output group creation")
def test_evla_cal_manifest_matches_example(ingest_path: Path): def test_evla_cal_manifest_matches_example(ingest_path: Path):
""" """
Given the correct parameters, manifest that matches _16B_069_cal_manifest.json Given the correct parameters, manifest that matches evla_cal_manifest_2021-08-02
should be generated should be generated
:return: :return:
""" """
expected_dir_name = "/lustre/aoc/cluster/pipeline/dsoc-dev/workspaces/staging/cal_test6" expected_dir_name = (
example = find_example_manifest("_16B_069_cal_manifest") "/lustre/aoc/cluster/pipeline/dsoc-prod/stage_products/20A-346_2021_07_23_T13_37_08.376"
)
example = find_example_manifest("evla_cal_manifest_2021-08-02")
with open(example, "r") as infile: with open(example, "r") as infile:
expected_json = dict(json.load(infile).items()) expected_json = dict(json.load(infile).items())
# populate ingestion path with fake files for manifest builder to find # populate ingestion path with fake files for manifest builder to find
for filename in [ populate_fake_evla_cal_ingest_path(ingest_path)
"16B-069_sb32814386_1_001.57685.66193635417.testdate.caltables.tar",
WEBLOG_FILENAME,
]:
file = ingest_path / filename
file.touch()
builder = IngestionManifestBuilder( builder = IngestionManifestBuilder(
staging_source_dir=ingest_path, staging_source_dir=ingest_path,
telescope=Telescope.EVLA.value, telescope=Telescope.EVLA.value,
sp_type=ScienceProductType.EVLA_CAL.value, sp_type=ScienceProductType.EVLA_CAL.value,
locator="uid://evla/execblock/48ba4c9d-d7c7-4a8f-9803-1115cd52459b", locator="uid://evla/execblock/50bb85af-ce52-49d8-b9d8-9221bfce939d",
) )
_, manifest_file = builder.build() manifest, manifest_file = builder.build()
with open(manifest_file, "r") as infile: with open(manifest_file, "r") as infile:
actual_json = dict(json.load(infile).items()) actual_json = dict(json.load(infile).items())
...@@ -265,19 +319,24 @@ def test_evla_cal_manifest_matches_example(ingest_path: Path): ...@@ -265,19 +319,24 @@ def test_evla_cal_manifest_matches_example(ingest_path: Path):
IngestionManifestKey.INGESTION_PATH.value IngestionManifestKey.INGESTION_PATH.value
] = expected_dir_name ] = expected_dir_name
assert actual_json == expected_json expected_params = expected_json["parameters"]
actual_params = manifest.parameters.to_json()
shutil.rmtree(ingest_path) expected_reingest = "true" if expected_params["reingest"] else "false"
assert actual_params["reingest"] == expected_reingest
expected_ngas_ingest = "true" if expected_params["ngas_ingest"] else "false"
assert actual_params["ngas_ingest"] == expected_ngas_ingest
assert "calibrate" not in actual_params.keys()
assert manifest.input_group.to_json() == expected_json["input_group"]
def populate_ingest_path_for_manifest_evla_cal_example(ingest_path: Path): expected_outgroup = expected_json["output_group"]
""" expected_osp = expected_outgroup["science_products"]
Create fake input files to match EVLA CAL manifest example actual_osp = manifest.output_group.science_products
assert len(actual_osp) == len(expected_osp)
:param ingest_path: expected_aps = expected_outgroup["ancillary_products"]
:return: actual_aps = manifest.output_group.ancillary_products
""" assert len(actual_aps) == len(expected_aps) == 2
weblog_file = ingest_path / "weblog.tgz"
weblog_file.touch() shutil.rmtree(ingest_path)
cal_file = ingest_path / "XYZ-abc+TMN.O00.tar"
cal_file.touch()
...@@ -8,7 +8,6 @@ from typing import List ...@@ -8,7 +8,6 @@ from typing import List
import json import json
import shutil import shutil
import tarfile import tarfile
import pytest
from ingest_envoy.ingestion_manifest import IngestionManifestBuilder from ingest_envoy.ingestion_manifest import IngestionManifestBuilder
from ingest_envoy.schema import AbstractTextFile from ingest_envoy.schema import AbstractTextFile
...@@ -32,6 +31,7 @@ from ingest_envoy.manifest_components import ( ...@@ -32,6 +31,7 @@ from ingest_envoy.manifest_components import (
InputGroup, InputGroup,
InputScienceProduct, InputScienceProduct,
MANIFEST_FILENAME, MANIFEST_FILENAME,
TARFILE_EXT,
) )
...@@ -277,7 +277,6 @@ def test_creates_expected_manifest(ingest_path: Path): ...@@ -277,7 +277,6 @@ def test_creates_expected_manifest(ingest_path: Path):
shutil.rmtree(ingest_path) shutil.rmtree(ingest_path)
@pytest.mark.skip("TODO")
def test_writes_expected_output_files(ingest_path: Path): def test_writes_expected_output_files(ingest_path: Path):
""" """
Did the image ingestion manifest builder produce the output file(s) we expect? Did the image ingestion manifest builder produce the output file(s) we expect?
...@@ -285,8 +284,53 @@ def test_writes_expected_output_files(ingest_path: Path): ...@@ -285,8 +284,53 @@ def test_writes_expected_output_files(ingest_path: Path):
:param ingest_path: :param ingest_path:
:return: :return:
""" """
# TODO: populate_fake_image_ingest_path(ingest_path)
raise NotImplementedError manifest, manifest_file = IngestionManifestBuilder(
telescope=Telescope.EVLA.value,
staging_source_dir=ingest_path,
locator="uid://evla/image/kiss-me-Im_Elvish",
sp_type=ScienceProductType.IMAGE.value,
).build()
assert manifest_file
assert manifest_file.name == MANIFEST_FILENAME
assert manifest
ingestion_files = [file for file in ingest_path.iterdir()]
# In addition to the manifest, the science products tar and the ingestion artifact,
# we expect to find the additional metadata .json, two .fits image files, a thumbnail.png,
# CASA byproducts, and the random tar we tossed in
assert len(ingestion_files) == 13
files_accounted_for = []
fits = [file for file in ingestion_files if file.name.endswith(".fits")]
assert len(fits) == 2
for file in fits:
files_accounted_for.append(file)
thumbnails = [file for file in ingestion_files if file.name.endswith(".png")]
assert len(thumbnails) == 1
files_accounted_for.append(thumbnails[0])
casa_byproducts = [file for file in ingestion_files if file.name in CASA_BYPRODUCTS]
assert len(casa_byproducts) == len(CASA_BYPRODUCTS)
for file in casa_byproducts:
files_accounted_for.append(file)
manifests = [file for file in ingestion_files if file.name == MANIFEST_FILENAME]
assert len(manifests) == 1
files_accounted_for.append(manifests[0])
inputs = [file for file in ingestion_files if file.name in IMG_MANIFEST_INPUT_FILENAMES]
assert len(inputs) == len(IMG_MANIFEST_INPUT_FILENAMES)
for file in inputs:
files_accounted_for.append(file)
artifacts = [file for file in ingestion_files if is_ingestion_artifact(file)]
assert len(artifacts) == 1
files_accounted_for.append(artifacts[0])
assert len(set(files_accounted_for)) == len(ingestion_files) - 1
shutil.rmtree(ingest_path) shutil.rmtree(ingest_path)
...@@ -308,7 +352,7 @@ def manifest_parameters(ingest_path: Path) -> ManifestParameters: ...@@ -308,7 +352,7 @@ def manifest_parameters(ingest_path: Path) -> ManifestParameters:
params = ManifestParameters( params = ManifestParameters(
telescope=Telescope.EVLA, telescope=Telescope.EVLA,
reingest=False, reingest=False,
ngas_ingest=False, ngas_ingest=True,
calibrate=False, calibrate=False,
staging_source_dir=ingest_path, staging_source_dir=ingest_path,
additional_metadata=AbstractTextFile(filename=addl_md.filename, content=""), additional_metadata=AbstractTextFile(filename=addl_md.filename, content=""),
...@@ -397,3 +441,13 @@ def build_output_group(staging_source_dir: Path) -> OutputGroup: ...@@ -397,3 +441,13 @@ def build_output_group(staging_source_dir: Path) -> OutputGroup:
ap_list = other_aps ap_list = other_aps
return OutputGroup(science_products=[osp], ancillary_products=ap_list) return OutputGroup(science_products=[osp], ancillary_products=ap_list)
def is_ingestion_artifact(file: Path) -> bool:
"""
We ought to have an ingestion artifact named like "ingestion_artifacts_*.tar"
:param file: some tar we found in staging source dir
:return:
"""
return file.name.startswith("ingestion_artifacts") and file.name.endswith(TARFILE_EXT)
...@@ -5,6 +5,7 @@ import sys ...@@ -5,6 +5,7 @@ import sys
from pathlib import Path from pathlib import Path
# pylint: disable=E0401, E0402, R1721, W0611, W0621 # pylint: disable=E0401, E0402, R1721, W0611, W0621
import shutil
from ingest_envoy.ingestion_manifest import ( from ingest_envoy.ingestion_manifest import (
IngestionManifest, IngestionManifest,
...@@ -105,3 +106,4 @@ def test_entry_point_for_image(ingest_path: Path): ...@@ -105,3 +106,4 @@ def test_entry_point_for_image(ingest_path: Path):
print(f">>> {file.name} present after manifest build") print(f">>> {file.name} present after manifest build")
assert len(ingestion_files_after) == expected_file_count_before + 2 assert len(ingestion_files_after) == expected_file_count_before + 2
shutil.rmtree(ingest_path)
...@@ -28,7 +28,7 @@ case $option in ...@@ -28,7 +28,7 @@ case $option in
;; ;;
esac esac
WORKFLOW_SERVICE=$(capo -q edu.nrao.workspaces.WorkflowSettings.serviceUrl) WORKFLOW_SERVICE=$(pycapo -q edu.nrao.workspaces.WorkflowSettings.serviceUrl)
if [ "$action" = "ingest_cal" ]; then if [ "$action" = "ingest_cal" ]; then
curl -X POST $WORKFLOW_SERVICE/workflows/std_calibration/requests/$2/ingest curl -X POST $WORKFLOW_SERVICE/workflows/std_calibration/requests/$2/ingest
......
...@@ -74,11 +74,9 @@ services: ...@@ -74,11 +74,9 @@ services:
context: ./schema/ context: ./schema/
dockerfile: Dockerfile.local dockerfile: Dockerfile.local
command: ["./bin/run-migrations.sh", "dsoc-dev"] command: ["./bin/run-migrations.sh", "dsoc-dev"]
depends_on:
- db
volumes: volumes:
- ./schema:/code/schema - ./schema:/code/schema
- ~/.capo:/home/casa/capo - ~/.capo:/home/ssa/capo
profiles: profiles:
- schema-dev - schema-dev
......
...@@ -83,10 +83,16 @@ Database ...@@ -83,10 +83,16 @@ Database
Pipeline Pipeline
-------- --------
- Update the end-to-end test container to see how detailed we can be - Update the end-to-end test container with new tests for all new pages/features
- Prevent `cleanup` stage from deleting tagged images when multiple pipelines are running; this issue causes `push` stage to fail - Prevent `cleanup` stage from deleting tagged images when multiple pipelines are running; this issue causes `push` stage to fail
- Raise concurrent pipeline job limit (currently 6)
- Rule for canceling pipelines on a MR if a newer pipeline starts running
- "Testing on dev" flag that we can set to prevent pipelines (or maybe just deploy step?) from running on main (they'll just queue up and run when the flag is unset)
Code Tweaks Code Tweaks
----------- -----------
......
"""update calibration ingestion oracle and mysql
Revision ID: d295036a7c72
Revises: f0f6d7be45e3
Create Date: 2021-07-30 14:56:51.201331
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "d295036a7c72"
down_revision = "f0f6d7be45e3"
branch_labels = None
depends_on = None
def upgrade():
condor_content = """executable = ingest_cal.sh
arguments = metadata.json
output = ingest.out
error = ingest.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/pycapo, $(SBIN_PATH)/conveyor, $(SBIN_PATH)/ingest_envoy, $(SBIN_PATH)/ingest, $(SBIN_PATH)/calibration-table-collector.sh, ./metadata.json
request_memory = 1G
getenv = True
environment = "CAPO_PATH=/home/casa/capo CFLAGS=-I/usr/include/mysql LDFLAGS=-L/usr/lib64/mysql ORACLE_HOME=/home/ssa/share/oracle/instantclient_12_1 LD_LIBRARY_PATH=/home/ssa/share/oracle/instantclient_12_1 PATH=/bin:/usr/bin:$(PATH)/home/ssa/share/oracle/instantclient_12_1"
queue
"""
op.execute(
"""
DELETE FROM workflow_templates WHERE filename='ingest_cal.condor'
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('ingest_cal.condor', E'{condor_content}', 'ingest_cal')
"""
)
def downgrade():
old_condor_content = """executable = ingest_cal.sh
arguments = metadata.json
output = ingest.out
error = ingest.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/pycapo, $(SBIN_PATH)/conveyor, $(SBIN_PATH)/ingest_envoy, $(SBIN_PATH)/ingest, $(SBIN_PATH)/calibration-table-collector.sh, ./metadata.json
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
op.execute(
"""
DELETE FROM workflow_templates WHERE filename='ingest_cal.condor'
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('ingest_cal.condor', E'{old_condor_content}', 'ingest_cal')
"""
)
...@@ -9,7 +9,7 @@ ENV ENV=${env} ...@@ -9,7 +9,7 @@ ENV ENV=${env}
USER root USER root
# HTCondor install # HTCondor install
RUN apt update && apt install -y wget gnupg sudo RUN apt update && apt install -y wget gnupg sudo curl
RUN wget -qO - https://research.cs.wisc.edu/htcondor/debian/HTCondor-Release.gpg.key | sudo apt-key add - RUN wget -qO - https://research.cs.wisc.edu/htcondor/debian/HTCondor-Release.gpg.key | sudo apt-key add -
RUN echo "deb http://research.cs.wisc.edu/htcondor/debian/8.8/buster buster contrib" > /etc/apt/sources.list.d/htcondor.list RUN echo "deb http://research.cs.wisc.edu/htcondor/debian/8.8/buster buster contrib" > /etc/apt/sources.list.d/htcondor.list
RUN echo "deb-src http://research.cs.wisc.edu/htcondor/debian/8.8/buster buster contrib" >> /etc/apt/sources.list.d/htcondor.list RUN echo "deb-src http://research.cs.wisc.edu/htcondor/debian/8.8/buster buster contrib" >> /etc/apt/sources.list.d/htcondor.list
......
...@@ -14,7 +14,7 @@ RUN python setup.py develop --user ...@@ -14,7 +14,7 @@ RUN python setup.py develop --user
USER root USER root
# HTCondor install # HTCondor install
RUN apt update && apt install -y wget gnupg sudo RUN apt update && apt install -y wget gnupg sudo curl
RUN wget -qO - https://research.cs.wisc.edu/htcondor/debian/HTCondor-Release.gpg.key | sudo apt-key add - RUN wget -qO - https://research.cs.wisc.edu/htcondor/debian/HTCondor-Release.gpg.key | sudo apt-key add -
RUN echo "deb http://research.cs.wisc.edu/htcondor/debian/8.8/buster buster contrib" > /etc/apt/sources.list.d/htcondor.list RUN echo "deb http://research.cs.wisc.edu/htcondor/debian/8.8/buster buster contrib" > /etc/apt/sources.list.d/htcondor.list
RUN echo "deb-src http://research.cs.wisc.edu/htcondor/debian/8.8/buster buster contrib" >> /etc/apt/sources.list.d/htcondor.list RUN echo "deb-src http://research.cs.wisc.edu/htcondor/debian/8.8/buster buster contrib" >> /etc/apt/sources.list.d/htcondor.list
......