Skip to content
Snippets Groups Projects
Commit 3760030d authored by Janet Goldstein's avatar Janet Goldstein
Browse files

WS-507: further development and first tests

parent 8a4f1b84
No related branches found
No related tags found
1 merge request!317WS-507: further development and first tests
Pipeline #2081 failed
""" The ingestion manifest """
from pathlib import Path
from .utilities import (
Telescope,
ScienceProduct,
)
# pylint: disable=E0402, R0903, R0913
from .utilities import Telescope
class Parameters:
......@@ -50,13 +48,12 @@ class IngestionManifest:
self.parameters = parameters
# to be an InputGroup
self.input_group = None
self.input_group = InputGroup()
# to be an OutputGroup
self.output_group = None
# to be an AssociateGroup (not required?)
self.associate_group = None
self.output_group = OutputGroup()
self.associate_group = AssociateGroup()
self.ingestion_path = None
self.science_products = []
self.ancillary_products = []
class OutputGroup:
......
"""Build an ingestion manifest file"""
import datetime
import json
import logging
import shutil
import stat
import sys
from pathlib import Path
from typing import Tuple, List
from .ingestion_manifest import (
Parameters,
IngestionManifest,
)
# pylint: disable=E0401, E0402
from astropy.time import Time
from .ingestion_manifest import IngestionManifest
from .utilities import IngestionType
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
MANIFEST_NAME_BASE = "ingestion_manifest_"
MANIFEST_NAME_EXT = ".json"
ARTIFACT_NAME = "ingestion_artifacts_"
ARTIFACT_EXT = ".tar"
# TODO: say what?? we're accepting a manifest in the constructor and ALSO building one?
class IngestionManifestBuilder:
"""Uses supplied parameters to build ingestion manifest files
for the various types of ingestion"""
def __init__(self, parameters: Parameters):
self.parameters = parameters
def __init__(
self,
manifest: IngestionManifest,
ingest_type: IngestionType,
ingestion_location: Path,
working_dir: Path,
):
self.manifest = manifest
self.ingestion_type = ingest_type
self.ingestion_location = ingestion_location
self.working_dir = working_dir
self.parameters = manifest.parameters
self.manifest_filename, self.artifact_filename = self.build_filenames()
def build(self) -> IngestionManifest:
"""
Create the ingestion manifest indicated by the parameters.
TODO: Write the ingestion manifest indicated by the parameters.
:return: the ingestion manifest constructed from the parameters
"""
raise NotImplementedError
# Write the manifest to the staging area, where other files may be.
manifest_content = json.dumps(self.manifest.__json__())
staging_manifest = self.ingestion_location / self.manifest_filename
with open(staging_manifest, "w") as out:
out.write(manifest_content)
# Open up permissions so we can delete the manifest file later.
staging_manifest.chmod(staging_manifest.stat().st_mode | stat.S_IREAD | stat.S_IWRITE)
# Get all the files we'll need....
ingestion_files = self.find_ingestion_files()
# ....and put them in both places, with rw permissions
for filename in ingestion_files:
staging_dir_copy = self.ingestion_location / filename
shutil.copy(str(staging_manifest), str(staging_dir_copy))
staging_dir_copy.chmod(staging_manifest.stat().st_mode | stat.S_IREAD | stat.S_IWRITE)
working_dir_copy = self.working_dir / filename
shutil.copy(str(staging_manifest), str(working_dir_copy))
working_dir_copy.chmod(staging_manifest.stat().st_mode | stat.S_IREAD | stat.S_IWRITE)
# Rename the manifest to the shared name decided on for ingestion invocation
generalized = self.working_dir / "ingestion-manifest.json"
generalized.symlink_to(working_dir_copy, target_is_directory=False)
generalized.chmod(staging_manifest.stat().st_mode | stat.S_IREAD | stat.S_IWRITE)
# Now that all the loose files are copied, create the ingestion artifacts tar
self.write_ingestion_artifact_tar(ingestion_files)
# TODO: we need the manifest itself, not the file
return staging_manifest
@staticmethod
def format_timestamp(start_time_mjd: float) -> str:
"""
Take an mjd time and format it as "yyyy_MM_dd_'T'HH_mm_ss.SSS"
for the ingestion manifest filename
:param start_time_mjd: observation start time directly from database;
will be modified Julian date
:return: a human-readable timestamp string
"""
# TODO Some Fine Day: is there a more elegant way to do this?
astro_time = Time(start_time_mjd, scale="tt", format="mjd")
astro_time.format = "fits"
str_time = str(astro_time).replace("-", "_", len(str(astro_time)))
str_time = str_time.replace(":", "_", len(str_time)).replace("T", "'T'")
return str_time
def build_filenames(self) -> Tuple[str, str]:
"""
Build manifest and artifacts filenames.
:return: the filenames
"""
timestamp = self.format_timestamp(datetime.datetime.now())
manifest_filename = f"{MANIFEST_NAME_BASE} {timestamp}{MANIFEST_NAME_EXT}"
artifact_filename = f"{ARTIFACT_NAME} {timestamp}{ARTIFACT_EXT}"
return manifest_filename, artifact_filename
def find_ingestion_files(self) -> List[Path]:
"""
Gather the files required for ingestion
:return: ingestion inputs
"""
aux_files = []
if self.parameters.additional_metadata is not None:
aux_string = self.parameters.additional_metadata
aux_files = aux_string.split(",")
if self.parameters.collection_metadata is not None:
coll_str = self.parameters.collection_metadata
coll_files = coll_str.split(",")
aux_files += coll_files
# be sure to add the manifest itself
aux_files.append(self.manifest_filename)
return aux_files
def write_ingestion_artifact_tar(self, ingestion_files: List[Path]) -> Path:
"""
Take the list of files and build a tar for inclusion into the archive.
This happens in the staging area for ingestion.
:param ingestion_files: all the files needed for ingestion
:return: a .tar archive of the ingestion artifacts
"""
#!/usr/bin/python
# -*- coding: utf-8 -*-
""" setter-upper for ingestion manifest creation """
from pathlib import Path
from setuptools import find_packages, setup
......@@ -8,7 +10,7 @@ from setuptools import find_packages, setup
VERSION = open("ingest_envoy/_version.py").readlines()[-1].split()[-1].strip("\"'")
README = Path("README.md").read_text()
requires = ["pycapo", "pex==2.1.41"]
requires = ["pycapo", "pex==2.1.41", "astropy"]
setup(
name="ssa-" + Path().absolute().name,
......@@ -20,7 +22,7 @@ setup(
url="TBD",
license="GPL",
install_requires=requires,
tests_require=["pytest"],
tests_require=["pytest", "astropy"],
keywords=[],
packages=find_packages(),
classifiers=["Programming Language :: Python :: 3.8"],
......
""" Test for the various types of EVLA ingestion manifests """
from pathlib import Path
import logging
import sys
import pytest
from ingest_envoy.ingestion_manifest import IngestionManifest, Parameters
from ingest_envoy.ingestion_manifest_builder import IngestionManifestBuilder
from ingest_envoy.utilities import Telescope
from ..ingest_envoy.ingestion_manifest import Parameters
from ..ingest_envoy.ingestion_manifest_builder import IngestionManifestBuilder
from apps.cli.executables.pexable.ingest_envoy.ingest_envoy.utilities import Telescope
# TODO: fix test_builds_evla_sdm_manifest, then *REMOVE* E1120 from disable list
# pylint: disable=E0401, E1120, W1203
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
def test_builds_timestamp_in_desired_format():
"""
Does IngestionManifestBuilder format an MJD timestamp yanked out of the DB as expected?
:return:
"""
str_time = IngestionManifestBuilder.format_timestamp(59215.3)
assert str_time == "2021_01_01'T'07_12_00.000"
def test_minimal_manifest_is_as_expected():
"""
Make sure a minimal, generic manifest has the properties we expect it to have.
:return:
"""
temp_dir = Path(tempfile.mkdtemp())
ingest_path = temp_dir / "ingestion"
ingest_path.mkdir()
# no reingest, ngas ingest, additional metadata nor collection metadata
parameters = Parameters(Telescope.EVLA, ingest_path, None, None)
manifest = IngestionManifest(parameters)
fake_ancillary_product = {
"type": "obslog",
"filename": "my_fabulous_obs.log",
"group_with": "uid://evla/execblock/ABCDEFGHIJKLMNOP",
}
manifest.output_group.ancillary_products.append(fake_ancillary_product)
assert manifest.parameters == parameters
mf_json = manifest.__json__()
logger.info(f">>> THE JSON:\n{mf_json}")
output_group = mf_json["output_group"]
assert len(output_group.science_products) == 0
assert output_group.ancillary_products == [fake_ancillary_product]
@pytest.mark.skip("TODO: test_generates_correct_filename")
def test_generates_correct_filename():
"""
TODO
We expect the manifest to be named like "ingestion_manifest_2019_07_30_T13_03_00.936"
:return:
"""
return NotImplementedError
from ingest_envoy.ingestion_manifest import IngestionManifest, Parameters
from ingest_envoy.ingestion_manifest_builder import IngestionManifestBuilder
from ingest_envoy.utilities import Telescope
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
def test_builds_timestamp_in_desired_format():
"""
Does IngestionManifestBuilder format an MJD timestamp yanked out of the DB as expected?
:return:
"""
str_time = IngestionManifestBuilder.format_timestamp(59215.3)
assert str_time == "2021_01_01'T'07_12_00.000"
def test_minimal_manifest_is_as_expected(tmpdir):
"""
Make sure a minimal, generic manifest has the properties we expect it to have.
:return:
"""
ingest_path = tmpdir / "ingestion"
ingest_path.mkdir()
# no reingest, ngas ingest, additional metadata nor collection metadata
parameters = Parameters(Telescope.EVLA, ingest_path, None, None)
manifest = IngestionManifest(parameters)
fake_ancillary_product = {
"type": "obslog",
"filename": "my_fabulous_obs.log",
"group_with": "uid://evla/execblock/ABCDEFGHIJKLMNOP",
}
manifest.output_group.ancillary_products.append(fake_ancillary_product)
assert manifest.parameters == parameters
assert len(manifest.output_group.science_products) == 0
assert manifest.output_group.ancillary_products == [fake_ancillary_product]
@pytest.mark.skip("TODO: test_generates_correct_filename")
def test_generates_correct_filename():
"""
TODO
We expect the manifest to be named like "ingestion_manifest_2019_07_30_T13_03_00.936"
:return:
"""
return NotImplementedError
@pytest.mark.skip("TODO: test_builds_evla_sdm_manifest")
def test_builds_evla_sdm_manifest():
def test_builds_evla_sdm_manifest(tmpdir):
"""
Have we built a well-formed EVLA SDM ingestion manifest?
:return:
"""
parameters = Parameters(Telescope.EVLA, Path("/path/TODO"), None, None)
ingest_path = tmpdir / "ingestion"
ingest_path.mkdir()
# no reingest, ngas ingest, additional metadata nor collection metadata
parameters = Parameters(Telescope.EVLA, ingest_path, None, None)
manifest = IngestionManifestBuilder(parameters).build()
assert manifest.input_group is not None
assert manifest.output_group is not None
assert len(manifest.science_products) > 0
assert len(manifest.ancillary_products) > 0
# TODO: tests for manifest parameters
# TODO: will need ingestion files
assert manifest.input_group is None
assert manifest.output_group["science_products"] == {
"type": "execution_block",
"filename": "X_osro_000.59368.65423814815",
}
assert manifest.ancillary_products == {
"type": "ingestion_artifacts",
"filename": "ingestion_artifacts_2021_06_03_T15_52_35.031.tar",
}
@pytest.mark.skip("TODO: test_builds_evla_bdf_manifest")
......
......@@ -11,6 +11,7 @@
-e ../packages/apps/cli/executables/pexable/productfetcher
-e ../packages/apps/cli/executables/pexable/delivery
-e ../packages/apps/cli/executables/pexable/casa_envoy
-e ../packages/apps/cli/executables/pexable/ingest_envoy
-e ../packages/apps/cli/executables/pexable/conveyor
-e ../packages/apps/cli/executables/pexable/null
-e ../packages/apps/cli/executables/pexable/vela
......
......@@ -44,6 +44,7 @@ requires = [
"mock_alchemy==0.2.1",
"pytest-resource-path",
"requests-mock",
"astropy",
]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment