Skip to content
Snippets Groups Projects
Commit 90df69af authored by Daniel Lyons's avatar Daniel Lyons
Browse files

ingestion manifest creation for EVLA CAL ingestion only

parent 1739141e
No related branches found
No related tags found
1 merge request!337ingestion manifest creation for EVLA CAL ingestion only
Pipeline #2167 failed
Showing
with 1029 additions and 383 deletions
""" The entrypoint (to be renamed from "main" at some point"""
import sys
from pathlib import Path
from ingest_envoy.ingestion_manifest import IngestionManifest
from ingest_envoy.utilities import ScienceProductType
def main():
"""
Takes a single command-line parameter: spool directory containing files to be ingested.
:return:
"""
# parser handles command-line args; no validation needed here
staging_dir, ingestion_type, locator = sys.argv[1], sys.argv[2], sys.argv[3]
# nor here
source_dir = Path(staging_dir)
# ValueError will be thrown if ingestion_type isn't a known ScienceProductType
ingestion_type = ScienceProductType.from_str(ingestion_type)
# Launch the manifest builder
IngestionManifest(source_dir, ingestion_type, locator).create()
""" The ingestion manifest """ """ This is the entrypoint for ingestion launching """
import json
# pylint: disable=E0402, R0903, R0913 import logging
from typing import Dict import sys
from pathlib import Path
from .utilities import Telescope # pylint: disable=R0903
from ingest_envoy.ingestion_manifest_writer import EvlaCalIngestionManifestWriter
from ingest_envoy.utilities import ScienceProductType
class Parameters: logger = logging.getLogger(__name__)
"""a manifest's various input parameters""" logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
def __init__(
self,
telescope: Telescope,
# for JSON parsing, path must be a string
ingestion_path: str,
additional_metadata: str,
collection_metadata: str,
reingest: bool = False,
ngas_ingest: bool = False,
):
self.telescope = telescope
self.ingestion_path = ingestion_path
self.additional_metadata = additional_metadata
self.collection_metadata = collection_metadata
self.reingest = reingest
self.ngas_ingest = ngas_ingest
def __repr__(self):
return repr(self.__dict__)
class InputGroup:
"""
This represents the starting point for processing which generated a science product.
There is not always an input group for every output group (rawdata, for instance).
Initial assumption: Input groups consist only of science products.
"""
def __init__(self):
self.science_products = []
def __repr__(self) -> str:
return repr(self.__dict__)
class IngestionManifest: class IngestionManifest:
"""Represents JSON layout of ingestion information, encompassing several potential scenarios. """needed for ingestion-launching interface"""
see ingest_envoy/test/examples, nicked from https://open-confluence.nrao.edu/x/roPCAQ
"""
def __init__(self, parameters: Parameters):
self.parameters = parameters
self.input_group = InputGroup()
if self.parameters.additional_metadata:
self.input_group.science_products.append(
json.loads(self.parameters.additional_metadata)
)
self.output_group = OutputGroup() def __init__(self, staging_source_dir: str, ingestion_type: str, locator: str):
if self.parameters.collection_metadata: self.ingest_path = Path(staging_source_dir)
self.output_group.ancillary_products.append( self.sp_type = ScienceProductType.from_str(ingestion_type)
json.loads(self.parameters.collection_metadata) self.locator = locator
)
self.ingestion_path = self.parameters.ingestion_path
# TODO: what is this, and how do we use it?
self.associate_group = AssociateGroup()
def content(self) -> Dict: def create(self):
""" """
Accessor for manifest content Create the ingestion manifest in this directory for a product of this type,
identified by this locator.
:return: manifest as dict :return:
""" """
return dict(
input_group=repr(self.input_group),
output_group=repr(self.output_group),
associate_group=repr(self.associate_group),
ingestion_path=repr(self.ingestion_path),
science_products=repr(self.input_group.science_products),
ancillary_products=repr(self.output_group.ancillary_products),
)
class OutputGroup:
"""Represents result of data processing"""
def __init__(self):
self.science_products = []
self.ancillary_products = []
def __repr__(self): if self.sp_type != ScienceProductType.EVLA_CAL:
return repr(self.__dict__) return NotImplementedError(
f"Don't yet know how to handle {self.sp_type.value} science product"
)
class AssociateGroup:
"""
A representation of Science Products which are not part of the same Input or Output groups
but are still fundamentally linked. Created for RealFast project, to link the RealFast
specific execution block & image to the execution block during which a transient was
discovered.
Associate groups also, by definition, include any science product(s) within the output
group to be ingested. The new locators generated at ingestion time will be added to any
which compose an associate group in the manifest.
"""
def __init__(self):
self.science_products = []
def __repr__(self): writer = EvlaCalIngestionManifestWriter(self.ingest_path)
return repr(self.__dict__) writer.write_evla_cal_manifest(self.locator)
"""Build an ingestion manifest file""" """Build an ingestion manifest file"""
import json import json
import logging import logging
import shutil import re
import sys import sys
import tarfile
from pathlib import Path from pathlib import Path
from typing import Tuple, List from typing import List, Tuple, Dict
# pylint: disable=E0401, E0402 # pylint: disable=E0401, E0402,R1721
import pendulum import pendulum
from pendulum import DateTime from pendulum import DateTime
from .ingestion_manifest import IngestionManifest from .utilities import (
MANIFEST_NAME_BASE,
MANIFEST_NAME_EXT,
ARTIFACT_NAME,
ARTIFACT_EXT,
WEBLOG,
SCIENCE_PRODUCT_PATTERN,
EvlaCalInputScienceProduct,
EvlaCalInputGroup,
EvlaCalOutputScienceProduct,
EvlaCalOutputGroup,
)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout)) logger.addHandler(logging.StreamHandler(sys.stdout))
MANIFEST_NAME_BASE = "ingestion_manifest_"
MANIFEST_NAME_EXT = ".json"
ARTIFACT_NAME = "ingestion_artifacts_"
ARTIFACT_EXT = ".tar"
class EvlaCalIngestionManifestWriter:
"""For building ingestion manifests"""
class IngestionManifestWriter: # (science product type is always EVLA_CAL)
"""Uses supplied parameters to build ingestion manifest files def __init__(self, ingest_path: Path):
for the various types of ingestion""" self.ingest_path = ingest_path
self.input_group = self.output_group = self.infiles = None
def __init__( def write_evla_cal_manifest(self, locator: str) -> Tuple[Path, List[Path]]:
self,
manifest: IngestionManifest,
working_dir: Path,
):
self.manifest = manifest
self.working_dir = working_dir
self.parameters = manifest.parameters
self.manifest_filename, self.artifact_filename = self.build_filenames()
def build(self) -> Path:
""" """
Write the ingestion manifest indicated by the parameters. Write the manifest and associated ingestion files
:return: the ingestion manifest constructed from the parameters :param locator: science product locator
:return:
""" """
self.infiles = [file for file in self.ingest_path.iterdir()]
self.input_group = EvlaCalInputGroup(EvlaCalInputScienceProduct(locator))
self.output_group = self._make_evla_cal_output_group()
# Pull out the manifest content and stringify it # Pull out the manifest content and stringify it
manifest_content = json.dumps(self.manifest.content()) manifest_content = json.dumps(self.content(), indent=4)
# Write the manifest to the staging area, where other files may be. manifest_filename = self.manifest_filename()
ingestion_location = self.parameters.ingestion_path # Write the manifest to the staging area
staging_manifest = Path(ingestion_location) / self.manifest_filename staging_manifest = Path(self.ingest_path / manifest_filename)
with open(staging_manifest, "w") as out: with open(staging_manifest, "w") as out:
out.write(manifest_content) out.write(f"{manifest_content}\n")
# Get all the files we'll need
addl_ingestion_files = self.find_additional_ingestion_files()
# # Open up permissions so we can delete the manifest file later. # Write the artifacts tar.
# rw_mode = 0o666 artifacts_tar = self.write_ingestion_artifacts_tar(staging_manifest, addl_ingestion_files)
# staging_manifest.chmod(rw_mode) addl_ingestion_files.append(artifacts_tar)
# Get all the files we'll need.... # return a Path explicitly; LocalPath won't work
ingestion_files = self.find_ingestion_files() return Path(staging_manifest), addl_ingestion_files
# there should be at least one file, the manifest
if len(ingestion_files) < 1:
raise ValueError(f"no ingestion files nor manifest found at {ingestion_location}")
# ....and put them in both places, with rw permissions def write_ingestion_artifacts_tar(
for filename in ingestion_files: self, manifest_file: Path, ingestion_files: List[Path]
staging_dir_copy = ingestion_location / filename ) -> Path:
working_dir_copy = self.working_dir / filename """
shutil.copy(str(staging_manifest), str(working_dir_copy)) Take the list of files and build a tar for inclusion into the archive.
This happens in the staging area for ingestion.
The EVLA CAL tar will contain just the manifest
# Rename the manifest to the shared name decided on for ingestion invocation. :param manifest_file: the ingestion manifest
# Path() cast "shouldn't" be necessary, but if we don't do it, :param ingestion_files: all the files needed for ingestion
# file is a LocalPath and we can't create the symlink :return: a .tar archive of the ingestion artifacts
generalized = Path(self.working_dir / "ingestion-manifest.json") """
generalized.symlink_to(working_dir_copy, target_is_directory=False) ingestion_artifacts_fn = self.ingest_path / "ingestion_artifacts.tar"
with tarfile.open(ingestion_artifacts_fn, "w") as ingestion_artifacts_tar:
for file in ingestion_files:
ingestion_artifacts_tar.add(file)
# Now that all the loose files are copied, create the ingestion artifacts tar # include the manifest
self.write_ingestion_artifact_tar(ingestion_files) if manifest_file not in ingestion_files:
ingestion_artifacts_tar.add(manifest_file)
# again: return a Path explicitly, for a LocalPath won't work return ingestion_artifacts_fn
return Path(staging_manifest)
@staticmethod def _make_evla_cal_output_group(self):
def format_timestamp(start_time: DateTime) -> str:
""" """
Format the current time as follows: Create the JSON for the "output-group" section of the manifest.
input format: An EVLA CAL OutputGroup contains a science product(s) and an ancillary product
2021-07-01T13:49:17.237119+00:00 (the weblog)
desired output format as yyyy_MM_dd_'T'HH_mm_ss.SSS:
2021_07_01'T'13_49_17.237
:param start_time: current pendulum timestamp :return: manifest output group
:return:
""" """
sp_tar = self._find_science_product_tar()
science_product = EvlaCalOutputScienceProduct(str(sp_tar))
weblog = Path(self.ingest_path / WEBLOG)
if weblog.exists():
return EvlaCalOutputGroup(science_product, weblog)
time_str = str(start_time) return EvlaCalOutputGroup(science_product, None)
# change hyphens and colons to underscores
timestamp = time_str.replace("-", "_", len(time_str))
timestamp = timestamp.replace(":", "_", len(timestamp))
# chop off the last bit
timestamp = timestamp.split("+")[0]
# now timestamp ends with ss.###....; round to 3 places
ts_parts = timestamp.split("_")
seconds = float(ts_parts[len(ts_parts) - 1])
rounded = round(seconds, 3)
timestamp = timestamp.replace(str(seconds), str(rounded))
# finally, the T in single quotes
timestamp = timestamp.replace("T", "'T'")
return timestamp def content(self) -> Dict[str, str]:
"""
Accessor for manifest content
def build_filenames(self) -> Tuple[str, str]: :return: manifest as dict
""" """
Build manifest and artifacts filenames.
:return: the filenames params = {
"reingest": "false",
"ngas_ingest": "false",
"calibrate": "false",
"ingestion_path": str(self.ingest_path),
}
json_out = dict(
parameters=params,
input_group=self.input_group.__json__(),
output_group=self.output_group.__json__(),
# associate_group=self.associate_group.__json__(), # TODO when we need it
ingestion_path=str(self.ingest_path),
)
return json_out
def _find_science_product_tar(self) -> Path:
""" """
current_time = pendulum.now() A calibration ingestion staging dir should have ONE science product tar; ignore any others
timestamp = self.format_timestamp(current_time)
manifest_filename = f"{MANIFEST_NAME_BASE}{timestamp}{MANIFEST_NAME_EXT}"
artifact_filename = f"{ARTIFACT_NAME}{timestamp}{ARTIFACT_EXT}"
return manifest_filename, artifact_filename
def find_ingestion_files(self) -> List[Path]: :return:
""" """
Gather the files required for ingestion files = [file for file in self.ingest_path.iterdir() if file.is_file]
for file in files:
if re.match(SCIENCE_PRODUCT_PATTERN, file.name):
return file
:return: ingestion inputs raise FileNotFoundError(f"no science product found at {self.ingest_path}")
@staticmethod
def format_timestamp(datetime: DateTime) -> str:
""" """
Format the current time as
2021_07_01T13_49_17.237
coll_files = aux_files = [] :param datetime: current timestamp
:return: timestamp suitable for ingestion manifest filename
"""
if self.parameters.additional_metadata is not None: return datetime.format("YYYY_MM_DDThh_mm_ss.SSS")
aux_string = self.parameters.additional_metadata
aux_files = aux_string.split(",")
if self.parameters.collection_metadata is not None: @staticmethod
coll_str = self.parameters.collection_metadata def manifest_filename() -> str:
coll_files = coll_str.split(",") """
Build unique manifest filename in standard format.
aux_files += coll_files :return: the filename
"""
current_time = pendulum.now()
timestamp = EvlaCalIngestionManifestWriter.format_timestamp(current_time)
return f"{MANIFEST_NAME_BASE}{timestamp}{MANIFEST_NAME_EXT}"
# be sure to add the manifest itself @staticmethod
aux_files.append(self.manifest_filename) def artifact_filename() -> str:
"""
Build manifest and artifacts filenames.
return aux_files :return: the filenames
"""
current_time = pendulum.now()
timestamp = EvlaCalIngestionManifestWriter.format_timestamp(current_time)
artifact_filename = f"{ARTIFACT_NAME}{timestamp}{ARTIFACT_EXT}"
return artifact_filename
def write_ingestion_artifact_tar(self, ingestion_files: List[Path]) -> Path: def find_additional_ingestion_files(self) -> List[Path]:
""" """
Take the list of files and build a tar for inclusion into the archive. Gather the files required for ingestion.
This happens in the staging area for ingestion. There won't be any for EVLA CAL ingestion; this is just a placeholder.
:param ingestion_files: all the files needed for ingestion :return: ingestion inputs
:return: a .tar archive of the ingestion artifacts
""" """
return []
# TODO: we'll have extra information for other ingestion types
# coll_files = aux_files = []
#
# if self.additional_metadata is not None:
# addl_md = json.loads(self.additional_metadata)
# aux_files.append(addl_md["filename"])
#
# if self.collection_metadata is not None:
# coll_md = json.loads(self.collection_metadata)
# coll_files.append(coll_md["filename"])
#
# aux_files += coll_files
#
# # be sure to add the manifest itself
# aux_files.append(self.manifest_filename)
#
# return list(set(aux_files))
def main():
print("Hello, world!")
\ No newline at end of file
""" Objects pertaining to the various ingestion manifests """ """ Objects pertaining to the various ingestion manifests """
from __future__ import annotations
import json
import re
import tarfile
from enum import Enum from enum import Enum
from pathlib import Path
from typing import List, Dict
# pylint: disable=E0401, R0903, R1721
MANIFEST_NAME_BASE = "ingestion_manifest_"
MANIFEST_NAME_EXT = ".json"
ARTIFACT_NAME = "ingestion_artifacts_"
ARTIFACT_EXT = ".tar"
WEBLOG = "weblog.tgz"
# pylint: disable=R0903 SCIENCE_PRODUCT_PATTERN = re.compile("[a-zA-Z0-9._\\-+]*\\.tar")
class Telescope(Enum): class Telescope(Enum):
"""Codifying the names of our telescopes, because Janet can't abide magic strings""" """Codifying the names of our telescopes, because Janet can't abide magic strings"""
VLA = 1 VLA = "VLA"
EVLA = 2 EVLA = "EVLA"
ALMA = 3 ALMA = "ALMA"
VLBA = 4 VLBA = "VLBA"
GBT = 5 GBT = "GBT"
NONE = 6 NONE = "NONE"
class IngestionType(Enum): class IngestionType(Enum):
...@@ -55,24 +71,35 @@ class ScienceProductType(Enum): ...@@ -55,24 +71,35 @@ class ScienceProductType(Enum):
"""Canonical collection of ingestible types of science products""" """Canonical collection of ingestible types of science products"""
EXEC_BLOCK = "execution_block" EXEC_BLOCK = "execution_block"
CAL = "calibration" EVLA_CAL = "calibration"
CATALOG = "catalog" CATALOG = "catalog"
IMAGE = "image" IMAGE = "image"
def __str__(self):
return f'"{str(self.value)}"'
class ScienceProduct: @staticmethod
"""Represents a science product in an ingestion manifest""" def from_str(sp_type_in) -> ScienceProductType:
"""
In comes a string; out comes the corresponding ScienceProductType, if any.
Or maybe it's already a ScienceProductType, in which case we can just return it.
def __init__(self, sp_type: ScienceProductType, filename: str, locator: str, group_with: str): :param sp_type_in: a string that "should" represent a ScienceProductType
self.sp_type = sp_type :return:
self.filename = filename """
# product locator, used for input groups; locator for a known science product if isinstance(sp_type_in, ScienceProductType):
self.locator = locator return sp_type_in
# for "late" science products; they get added to an existing output group
self.group_with = group_with for spt in ScienceProductType:
if spt.value == sp_type_in:
return spt
raise ValueError(
f"unrecognized ScienceProductType: {sp_type_in}; it's a {type(sp_type_in)}"
)
class AncillaryProductType:
class AncillaryProductType(Enum):
"""The various types of ancillary products we'll encounter""" """The various types of ancillary products we'll encounter"""
INGESTION_ARTIFACTS = "ingestion_artifacts" INGESTION_ARTIFACTS = "ingestion_artifacts"
...@@ -92,6 +119,9 @@ class AncillaryProductType: ...@@ -92,6 +119,9 @@ class AncillaryProductType:
CANDIDATE_IMG = "candidate_image" CANDIDATE_IMG = "candidate_image"
THUMBNAIL_IMG = "thumbnail_image" THUMBNAIL_IMG = "thumbnail_image"
def __str__(self):
return f'"{self.value}"'
class AncillaryProduct: class AncillaryProduct:
"""Represents an ancillary product in an ingestion manifest""" """Represents an ancillary product in an ingestion manifest"""
...@@ -105,3 +135,189 @@ class AncillaryProduct: ...@@ -105,3 +135,189 @@ class AncillaryProduct:
self.science_associate = science_associate # TODO, possibly: enum? self.science_associate = science_associate # TODO, possibly: enum?
# make this an ancillary to the group of a science product (assumes locator string) # make this an ancillary to the group of a science product (assumes locator string)
self.group_with = group_with self.group_with = group_with
def write_ingestion_artifact_tar(ingestion_location: Path, ingestion_files: List[Path]) -> Path:
"""
Take the list of files and build a tar for inclusion into the archive.
This happens in the staging area for ingestion.
:param ingestion_location: path to ingestion location (probably the spool directory)
:param ingestion_files: all the files needed for ingestion
:return: a .tar archive of the ingestion artifacts
"""
ingestion_artifacts_tar = ingestion_location / "ingestion_artifacts.tar"
with tarfile.open(ingestion_artifacts_tar, "w") as ingestion_artifacts_tar:
for file in ingestion_files:
ingestion_artifacts_tar.add(file)
return Path(ingestion_artifacts_tar.name)
class EvlaCalInputScienceProduct:
"""Represents the "science_product" in EvlaCalInputGroup"""
def __init__(self, locator: str):
self.locator = locator
self.type = ScienceProductType.EVLA_CAL
def __json__(self) -> Dict[str, str]:
json_out = self.__dict__
json_out["type"] = ScienceProductType.EVLA_CAL.value
return json_out
class EvlaCalInputGroup:
"""
This represents the starting point for processing which generated a science product.
There is not always an input group for every output group (rawdata, for instance).
Initial assumption: Input groups consist only of science products.
"""
def __init__(self, science_product: EvlaCalInputScienceProduct):
# science product locator
self.science_products = [science_product]
def __json__(self) -> Dict[str, str]:
"""
Create the "input-group" section of the manifest as a JSON string
:return: JSONified InputGroup
"""
json_out = self.__dict__
sps = json_out["science_products"]
sci_prod = sps[0]
sp_str = sci_prod if isinstance(sci_prod, str) else sci_prod.__json__()
json_out["science_products"] = f"[{sp_str}]"
return json_out
class EvlaCalOutputScienceProduct:
"""The science product in the output group"""
def __init__(self, filename: str):
self.filename = filename
self.type = ScienceProductType.EVLA_CAL
def __json__(self) -> Dict[str, str]:
json_out = self.__dict__
json_out["type"] = ScienceProductType.EVLA_CAL.value
return json_out
class Weblog:
"""Represents a weblog.tgz as an ancillary product"""
def __init__(self, weblog_path: Path):
self.ancillary_product = {"type": "weblog", "filename": str(weblog_path)}
def __json__(self) -> Dict[str, str]:
return dict(self.__dict__)
class EvlaCalOutputGroup:
"""Represents result of data processing. Will have a single calibration tar
plus a weblog.
"""
def __init__(self, science_product: EvlaCalOutputScienceProduct, weblog: Path):
self.science_products = [science_product]
self.ancillary_products = [Weblog(weblog)]
def __json__(self) -> Dict[str, str]:
"""
Create the "output-group" section of the manifest as a JSON string.
__json__() will not work; __repr__() is necessary for json.loads() to succeed.
:return: JSONified OutputGroup
"""
json_out = self.__dict__
anc_prod = self.ancillary_products[0]
ap_str = anc_prod if isinstance(anc_prod, str) else anc_prod.__json__()
json_out[IngestionManifestKey.ANCILLARY_PRODUCTS.value] = f"[{ap_str}]"
sci_prod = self.science_products[0]
sp_str = sci_prod if isinstance(sci_prod, str) else sci_prod.__json__()
json_out[IngestionManifestKey.SCIENCE_PRODUCTS.value] = f"[{sp_str}]"
return json_out
class EvlaCalIngestionManifest:
"""TODO: this is JUST the ingestion manifest JSON, not a bespoke object"""
def __init__(self, ingestion_path: Path, spl: str):
"""
:param ingestion_path: staging directory
:param spl: science product locator
"""
self.ingestion_path = ingestion_path
self.locator = spl
self.sp_type = ScienceProductType.EVLA_CAL
self.infiles = [file for file in self.ingestion_path.iterdir()]
self.input_group = EvlaCalInputGroup(EvlaCalInputScienceProduct(self.locator))
self.output_group = self._make_output_group()
def __str__(self):
params = self._make_params_section()
input_group = self.input_group.__json__()
output_group = self.output_group.__json__()
return f"{params}\n{input_group}\n{output_group}"
def _make_params_section(self) -> str:
"""
Create the JSON for the "parameters" section of the manifest.
It's always the same for any EVLA CAL ingestion manifest, except for the ingestion path.
:return: stringified JSON
"""
params = {
"parameters": {
"reingest": "false",
"ngas-ingest": "false",
"calibrate": "false",
"ingestion_path": str(self.ingestion_path),
}
}
return json.dumps(params)
def _make_output_group(self) -> EvlaCalOutputGroup:
"""
Create the JSON for the "output-group" section of the manifest.
An EVLA CAL OutputGroup contains a science product(s) and an ancillary product
(the weblog)
:return: manifest output group
"""
sp_tar = self._find_science_product_tar()
science_product = EvlaCalOutputScienceProduct(str(sp_tar))
weblog = Path(self.ingestion_path / WEBLOG)
if weblog.exists():
return EvlaCalOutputGroup(science_product, weblog)
return EvlaCalOutputGroup(science_product, None)
def _find_science_product_tar(self) -> Path:
"""
A calibration ingestion staging dir should have ONE science product tar; ignore any others
:return:
"""
for file in self.infiles:
if re.match(SCIENCE_PRODUCT_PATTERN, file.name):
return file
class IngestionManifestKey(Enum):
"""Sections we expect to see in a manifest"""
INPUT_GROUP = "input_group"
OUTPUT_GROUP = "output_group"
INGESTION_PATH = "ingestion_path"
SCIENCE_PRODUCTS = "science_products"
ANCILLARY_PRODUCTS = "ancillary_products"
...@@ -22,7 +22,6 @@ setup( ...@@ -22,7 +22,6 @@ setup(
url="TBD", url="TBD",
license="GPL", license="GPL",
install_requires=requires, install_requires=requires,
tests_require=["pytest", "astropy", "pendulum"],
keywords=[], keywords=[],
packages=find_packages(), packages=find_packages(),
classifiers=["Programming Language :: Python :: 3.8"], classifiers=["Programming Language :: Python :: 3.8"],
......
#!/usr/bin/env bash
#
# A replacement for the old qaarchive script used by the
# data analysts. This script will tar & zip the pipeline
# weblog into WEBLOG.tar.gz and then collect everything but
# FITS files in the products directory from a CIPL run into
# a single tar file (naming convention TBD) that is created
# in a storage directory for ingestion.
#
# Arguments:
# 1: Working Directory in qa2 to be worked upon
# 2: Filename (if different from the above)
#
#
#Basics: Path modification
set -o errexit -o nounset -o xtrace
#
# command line argument
#
# The tar file will be named after the working directory it came from
# which preserves the processing time information.
#
SUBDIRECTORY=$1;shift
PROFILE=$1;shift
# Get the qa2, spool, and staging paths from CAPO
SPOOL_DIR=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.spoolDirectory)
STAGE_DIR=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.stageDirectory)
STORE_DIR=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.storageDirectory)
#Yet More VLASS Specialness
if [[ ${PROFILE} != vlass* ]]
then
QA2_DIR=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.qaDirectory)
FILENAME=${SUBDIRECTORY}
else
# For VLASS, they don't want the data moved between qa2/ and spool/
QA2_DIR=${SPOOL_DIR}
# They also provide an extra layer of directory within the filename.
IFS='/' # redefine the character on which to split
read -ra COMPONENTS <<< "${SUBDIRECTORY}"
IFS=' ' # reset to default after
# We get: calibration/VLASS1.1_stuff --> FILENAME becomes VLASS1.1_stuff (in line with CIPL)
FILENAME=${COMPONENTS[1]}
fi
# Get the weblog caching directory from CAPO
WEBLOG_CACHE=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.cacheWeblogDirectory)
#
# For the ability to reproduce results, we'll want the PPR.xml file. Ensure it is
# included in the products/ directory:
#
# TODO: Check for base_dir/products/*.pprequest.xml. If it exists, do nothing. If not, use base_dir/PPR.xml
if [ ! -e ${QA2_DIR}/${SUBDIRECTORY}/products/PPR.xml ]
then
cp ${QA2_DIR}/${SUBDIRECTORY}/working/PPR.xml ${QA2_DIR}/${SUBDIRECTORY}/products/PPR.xml
fi
#
# The VLASS project wants the flux.csv file. Check if it's here, if not, check for it in
# the working directory parallel to this one. Don't fail if we can't find it, however (so
# we minimize the disruption to the CIPL system).
#
if [ ! -e ${QA2_DIR}/${SUBDIRECTORY}/products/flux.csv ]
then
if [ -e ${QA2_DIR}/${SUBDIRECTORY}/working/flux.csv ]
then
cp ${QA2_DIR}/${SUBDIRECTORY}/working/flux.csv ${QA2_DIR}/${SUBDIRECTORY}/products/flux.csv
else
echo "No flux.csv file found here or in parallel working directory. Continuing."
fi
fi
#
# Both the pipeline-YYYMMDDTHHMMSS directory and weblog.tgz should exist. We prefer the
# directory (in case of updates/edits), but fall back on the tgz file.
#
# Check that they're both home, as we expect
WEBLOG_DIR=$(ls -t ${QA2_DIR}/${SUBDIRECTORY}/products | grep pipeline- | head -1)
if [ -n "$WEBLOG_DIR" ]
then
# if weblog.tgz exists, we want to remove it
if [ -e ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz ]
then
rm -f ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz
fi
# Tar & Zip the weblog
tar -C${QA2_DIR}/${SUBDIRECTORY}/products -czf ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz ${WEBLOG_DIR}
if [ $? -ne 0 ]
then
echo "Creation of weblog.tgz failed, exiting"
exit -1
fi
else
# no weblog directory. If there's no weblog.tgz file, there's an issue: Issue a warning
if [ ! -e ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz ]
then
echo "Neither weblog.tgz or the weblog directory exist, continuing"
fi
fi
#
# Sanity checks: create a staging subdirectory for this cal, and if the file already exists, remove it.
#
mkdir -p ${STAGE_DIR}/${SUBDIRECTORY}
if [ -e ${STAGE_DIR}/${SUBDIRECTORY}/${FILENAME}.tar ]
then
echo "Calibration Tar File Already Exists! Removing the file for recreation"
#We could rename them with a version ...
#FILENAME=${SUBDIRECTORY}.$(ls -1 ${STAGE_DIR}/${SUBDIRECTORY} | wc -l)
# if we rename it... how do we tell the workflow?
/bin/rm -f ${STAGE_DIR}/${SUBDIRECTORY}/${FILENAME}.tar
fi
#
# tar all non-fits and non-weblog-related files into a tar archive in the storage path
# SSA-6115: Don't exclude the weblog.tgz: Users and DAs prefer it bundled in.
#
tar --exclude=\*.fits --exclude=pipeline-\* -C${QA2_DIR}/${SUBDIRECTORY} -cvf ${STAGE_DIR}/${SUBDIRECTORY}/${FILENAME}.tar products
if [ $? -ne 0 ]
then
echo "Creation of main tar file failed, exiting"
exit -1
fi
#
# Copy the weblog over, for ingestion as an ancillary file
#
/bin/cp -f ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz ${STAGE_DIR}/${SUBDIRECTORY}
if [ $? -ne 0 ]
then
echo "Copy of the weblog to staging location failed. Exiting."
exit -1
fi
#
# To stay consistent with current working methods: Copy from STAGE_DIR to STORE_DIR
#
cp ${STAGE_DIR}/${SUBDIRECTORY}/${FILENAME}.tar ${STORE_DIR}
if [ $? -ne 0 ]
then
# If something goes wrong, make some noise, but continue on.
echo "Failed to copy the calibration to ${STORE_DIR}, continuing."
$? = 0
fi
# Move subdirectories to the /spool/ copy of this directory
# if it exists, otherwise, just move what we have to /spool/
#
# If this is an ingestion for VLASS, don't move anything.
#
if [[ ${PROFILE} != vlass* ]]
then
if [ -e ${SPOOL_DIR}/${SUBDIRECTORY} ]
then
# Our base directory with the outputlogs is there, move our subdirectories back
/bin/mv -f ${QA2_DIR}/${SUBDIRECTORY}/products ${SPOOL_DIR}/${SUBDIRECTORY}
/bin/mv -f ${QA2_DIR}/${SUBDIRECTORY}/rawdata ${SPOOL_DIR}/${SUBDIRECTORY}
/bin/mv -f ${QA2_DIR}/${SUBDIRECTORY}/working ${SPOOL_DIR}/${SUBDIRECTORY}
# Cleanup the QA2 area
/bin/rm -rf ${QA2_DIR}/${SUBDIRECTORY}
else
#if no old directory exists, just move the whole thing back
/bin/mv -f ${QA2_DIR}/${SUBDIRECTORY} ${SPOOL_DIR}
fi
fi
""" Various 'n' sundry utilities for our tests """
# pylint: disable=E0401, R1721
from pathlib import Path
from typing import List
import pytest
from ingest_envoy.utilities import WEBLOG
WANTED_FILENAMES = ["my_science_products.tar", WEBLOG]
UNWANTED = ["ignore_me.fits", "just_a_lotta_nothing", "uninteresting_metadata.xml"]
@pytest.fixture(scope="function")
def ingest_path(tmpdir: Path):
"""
Make an "ingestion path" for tests
:param tmpdir: temporary home for ingestion location
:return:
"""
# cast is necessary because otherwise we get a LocalPath, which doesn't work
fake_ingest_path = Path(tmpdir / "ingestion")
fake_ingest_path.mkdir()
return fake_ingest_path
def find_example_manifest(manifest_name: str) -> Path:
"""
Get this example manifest for comparison with one we've generated in a test.
:param manifest_name: unique file identifier
:return: full path to the manifest file
"""
filename = manifest_name + "_manifest.json"
for file in Path.cwd().rglob(filename):
return file
raise FileNotFoundError(filename)
def populate_fake_ingest_path(staging_dir: Path) -> List[Path]:
"""
Create a directory containing fake calibration products, plus other stuff
that we -don't- want to ingest.
:param staging_dir: our temporary dir
:return:
"""
files = []
filenames = [filename for filename in WANTED_FILENAMES]
for filename in UNWANTED:
filenames.append(filename)
for filename in filenames:
path = staging_dir / filename
path.touch()
files.append(path)
return files
...@@ -3,13 +3,13 @@ ...@@ -3,13 +3,13 @@
"reingest": "false", "reingest": "false",
"ngas-ingest": "false", "ngas-ingest": "false",
"calibrate": "false", "calibrate": "false",
"ingestion_path": "/lustre/...../" "ingestion_path": "/home/mchammer/evla/parallel-prod"
}, },
"input-group": { "input-group": {
"science_products": [ "science_products": [
{ {
"type": "execution-block", "type": "calibration",
"locator": "......" "locator": "uid://evla/calibration/long-freakin-uuid-22"
} }
] ]
}, },
...@@ -17,7 +17,7 @@ ...@@ -17,7 +17,7 @@
"science_products": [ "science_products": [
{ {
"type": "calibration", "type": "calibration",
"filename": "19A-321_2019......tar" "filename": "19A-321_2019_more_stuff.tar"
} }
], ],
"ancillary_products": [ "ancillary_products": [
......
""" Test for the various types of ALMA ingestion manifests """ """ Test for the various types of ALMA ingestion manifests """
# pylint: disable=E0401
import pytest import pytest
......
""" Tests for EVLA calibration ingestion manifest generation ONLY """
import json
import logging
import re
import shutil
import sys
from pathlib import Path
# pylint: disable=E0401, E0402, R0914
from ingest_envoy.ingestion_manifest import IngestionManifest
from ingest_envoy.ingestion_manifest_writer import (
EvlaCalIngestionManifestWriter,
)
from ingest_envoy.utilities import (
WEBLOG,
ScienceProductType,
MANIFEST_NAME_BASE,
MANIFEST_NAME_EXT,
EvlaCalOutputScienceProduct,
Weblog,
EvlaCalIngestionManifest,
IngestionManifestKey,
)
from .conftest import (
find_example_manifest,
populate_fake_ingest_path,
WANTED_FILENAMES,
UNWANTED,
)
logger = logging.getLogger(IngestionManifest.__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
# pylint: disable=E0402, R1721
def test_creates_empty_evla_cal_manifest(ingest_path: Path):
"""
Make sure we create the manifest we expect.
:param ingest_path: ingestion location
:return:
"""
manifest = EvlaCalIngestionManifest(ingest_path, "")
sps_in = manifest.input_group.science_products
assert len(sps_in) == 1
sp_in = sps_in[0]
assert sp_in.type == ScienceProductType.EVLA_CAL
assert sp_in.locator == ""
outgrp = manifest.output_group
sps_out = outgrp.science_products
assert len(sps_out) == 1
sp_out = sps_out[0]
assert sp_out.type == sp_in.type
weblogs = outgrp.ancillary_products
assert len(weblogs) == 1
weblog = weblogs[0]
assert isinstance(weblog, Weblog)
ap_out = weblog.ancillary_product
assert ap_out["type"] == "weblog"
assert ap_out["filename"] == "None"
ingest_path.rmdir()
def test_creates_expected_manifest(ingest_path: Path):
"""
Make sure we create the manifest we expect.
:param ingest_path: ingestion location
:return:
"""
locator = "uid://evla/calibration/abcdefg_2_bubble_bubble"
all_files = populate_fake_ingest_path(ingest_path)
manifest = EvlaCalIngestionManifest(ingest_path, locator)
sps_in = manifest.input_group.science_products
assert len(sps_in) == 1
sp_in = sps_in[0]
assert sp_in.type == ScienceProductType.EVLA_CAL
assert sp_in.locator == locator
outgrp = manifest.output_group
sps_out = outgrp.science_products
assert len(sps_out) == 1
sp_out = sps_out[0]
assert isinstance(sp_out, EvlaCalOutputScienceProduct)
assert sp_out.type == sp_in.type
files_after = [file for file in ingest_path.iterdir()]
# nothing should have been removed from staging dir
assert len(files_after) == len(all_files)
assert Path(sp_out.filename) in files_after
aps = outgrp.ancillary_products
assert len(aps) == 1
anc_prod = aps[0]
assert isinstance(anc_prod, Weblog)
assert Path(ingest_path / WEBLOG).exists()
shutil.rmtree(ingest_path)
def test_writes_real_manifest_to_file(ingest_path: Path):
"""
We should get a pretty, formatted, human-readable JSON text file
:param ingest_path: the staging dir
:return:
"""
populate_fake_ingest_path(ingest_path)
writer = EvlaCalIngestionManifestWriter(ingest_path=ingest_path)
locator = "uid://evla/calibration/my_devastating_observation"
manifest_file, _ = writer.write_evla_cal_manifest(locator)
with open(manifest_file, "r") as mf_in:
manifest_content = dict(json.load(mf_in).items())
assert len(manifest_content.keys()) >= len(IngestionManifestKey) - 1
for key in ["parameters", "input_group", "output_group"]:
assert key in manifest_content.keys()
shutil.rmtree(ingest_path)
def test_builds_expected_manifest_filename(ingest_path: Path):
"""
We expect the manifest to be named like
ingestion_manifest_2019_07_30_T13_03_00.936.json
:param ingest_path: ingestion location
:return:
"""
filename = EvlaCalIngestionManifestWriter.manifest_filename()
assert filename.startswith(MANIFEST_NAME_BASE)
assert filename.endswith(MANIFEST_NAME_EXT)
filename_parts = filename.split("_")
assert len(filename_parts) == 7
# get just the timestamp
timestamp = filename.replace(MANIFEST_NAME_BASE, "").replace(MANIFEST_NAME_EXT, "")
# we should have gotten year, month, day, hours, minutes, seconds to 3 decimal places
assert re.match(r"\d{4}_\d{2}_\d{2}T\d{2}_\d{2}_\d{2}\.\d{0,3}", timestamp)
shutil.rmtree(ingest_path)
def test_writes_expected_output_files(ingest_path: Path):
"""
We expect to get the manifest file, the weblog, and the science product tar.
:param ingest_path: ingestion location
:return:
"""
populate_fake_ingest_path(ingest_path)
example_manifest = find_example_manifest("evla_calibration")
content = ""
with open(example_manifest, "r") as infile:
for line in infile.readlines():
content += line
mf_json = json.loads(content)
ig_in = mf_json["input-group"]
ig_sp = ig_in["science_products"][0]
ip_in = mf_json["parameters"]["ingestion_path"]
assert ip_in == "/home/mchammer/evla/parallel-prod"
writer = EvlaCalIngestionManifestWriter(ingest_path=ingest_path)
manifest_file, more_ingestion_files = writer.write_evla_cal_manifest(ig_sp["locator"])
assert manifest_file.exists()
for file in more_ingestion_files:
assert file.exists()
assert len(more_ingestion_files) == 1
# make sure that one file is the artifacts tar
file = more_ingestion_files[0]
assert file.exists()
assert file.name.startswith("ingestion_artifacts") and file.name.endswith(".tar")
shutil.rmtree(ingest_path)
def test_filters_cal_input_files(ingest_path: Path):
"""
We'll be getting calibration products from a directory under
/lustre/aoc/cluster/pipeline/{CAPO_PROFILE}/workspaces/staging
Make sure we take -only- the files to be ingested.
:param ingest_path: our temporary dir
:return:
"""
populate_fake_ingest_path(ingest_path)
writer = EvlaCalIngestionManifestWriter(ingest_path=ingest_path)
locator = "uid://evla/calibration/im_a_one-touch_espresso_machine"
manifest_file, _ = writer.write_evla_cal_manifest(locator)
with open(manifest_file, "r") as mf_in:
manifest_content = dict(json.load(mf_in).items())
input_group = manifest_content[IngestionManifestKey.INPUT_GROUP.value]
assert isinstance(input_group, dict)
assert len(input_group) == 1
for val in input_group.values():
sci_prod = val.replace("'", '"', len(val))
sps = json.loads(sci_prod)
assert isinstance(sps, list)
assert len(sps) == 1
sci_prod = sps[0]
assert sci_prod["type"] == ScienceProductType.EVLA_CAL.value
assert sci_prod["locator"] == locator
output_group = manifest_content[IngestionManifestKey.OUTPUT_GROUP.value]
assert isinstance(output_group, dict)
assert len(output_group) == 2
for key, val in output_group.items():
if key == IngestionManifestKey.SCIENCE_PRODUCTS.value:
sci_prod = val.replace("'", '"', len(val))
sps = json.loads(sci_prod)
assert len(sps) == 1
else:
assert key == IngestionManifestKey.ANCILLARY_PRODUCTS.value
anc_prod = val.replace("'", '"', len(val))
aps = json.loads(anc_prod)
assert len(aps) == 1
for sci_prod in sps:
filename = Path(sci_prod["filename"]).name
file = Path(ingest_path / filename)
assert file.exists()
assert file.parent == ingest_path
assert filename in WANTED_FILENAMES
assert filename not in UNWANTED
anc_prod_dict1 = aps[0]
for key, val in anc_prod_dict1.items():
anc_prod_dict2 = val
assert isinstance(anc_prod_dict2, dict)
for key, val in anc_prod_dict2.items():
if key == "type":
assert val == "weblog"
else:
assert key == "filename"
file = Path(ingest_path / val)
assert file.exists()
shutil.rmtree(ingest_path)
""" Test for the various types of EVLA ingestion manifests """
import json
import logging
import re
import sys
from pathlib import Path
import pytest
from ingest_envoy.ingestion_manifest import IngestionManifest, Parameters
from ingest_envoy.ingestion_manifest_writer import IngestionManifestWriter
from ingest_envoy.ingestion_manifest_writer import (
MANIFEST_NAME_BASE,
MANIFEST_NAME_EXT,
)
from ingest_envoy.utilities import Telescope
# pylint: disable=E0401, W1203
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
def test_minimal_manifest_is_as_expected(tmpdir):
"""
Make sure a minimal, generic manifest has the properties we expect it to have.
:return:
"""
ingest_path = tmpdir / "ingestion"
ingest_path.mkdir()
# no reingest, ngas ingest, additional metadata nor collection metadata
parameters = Parameters(Telescope.EVLA, ingest_path, None, None)
manifest = IngestionManifest(parameters)
fake_ancillary_product = {
"type": "obslog",
"filename": "my_fabulous_obs.log",
"group_with": "uid://evla/execblock/ABCDEFGHIJKLMNOP",
}
manifest.output_group.ancillary_products.append(fake_ancillary_product)
assert manifest.parameters == parameters
assert len(manifest.output_group.science_products) == 0
assert manifest.output_group.ancillary_products == [fake_ancillary_product]
def test_generates_correct_filename(tmpdir):
"""
We expect the manifest to be named like "ingestion_manifest_2019_07_30_T13_03_00.936.json"
:return:
"""
ingest_path = tmpdir / "ingestion"
ingest_path.mkdir()
working_dir = tmpdir / "working"
working_dir.mkdir()
# no reingest, ngas ingest, additional metadata nor collection metadata
parameters = Parameters(Telescope.EVLA, Path(ingest_path), None, None)
manifest = IngestionManifest(parameters)
manifest_file = IngestionManifestWriter(manifest, working_dir).build()
filename = manifest_file.name
assert filename.startswith(MANIFEST_NAME_BASE)
assert filename.endswith(MANIFEST_NAME_EXT)
filename_parts = filename.split("_")
assert len(filename_parts) == 7
# get just the timestamp
timestamp = filename.replace(MANIFEST_NAME_BASE, "").replace(MANIFEST_NAME_EXT, "")
# we should have gotten year, month, day, hours, minutes, seconds to 3 decimal places
assert re.match(r"\d{4}_\d{2}_\d{2}'T'\d{2}_\d{2}_\d{2}\.\d{0,3}", timestamp)
def test_builds_evla_sdm_manifest(tmpdir):
"""
Have we built a well-formed EVLA SDM ingestion manifest?
:return:
"""
ingest_path = tmpdir / "ingestion"
ingest_path.mkdir()
working_dir = tmpdir / "working"
working_dir.mkdir()
fake_science_product = json.dumps(
{
"type": "execution_block",
"filename": "X_osro_000.59368.65423814815",
}
)
fake_ancillary_product = json.dumps(
{
"type": "ingestion_artifacts",
"filename": "ingestion_artifacts_2021_06_03_T15_52_35.031.tar",
}
)
# no reingest, ngas ingest, additional metadata nor collection metadata
parameters = Parameters(
telescope=Telescope.EVLA,
ingestion_path=str(ingest_path),
additional_metadata=fake_science_product,
collection_metadata=fake_ancillary_product,
)
manifest = IngestionManifest(parameters)
assert len(manifest.input_group.science_products) == 1
assert manifest.input_group.science_products[0] == json.loads(fake_science_product)
assert len(manifest.output_group.ancillary_products) == 1
assert manifest.output_group.ancillary_products == [json.loads(fake_ancillary_product)]
@pytest.mark.skip("TODO: test_writes_evla_sdm_manifest")
def test_writes_evla_sdm_manifest():
"""
Have we written an EVLA BDF ingestion manifest correctly?
:return:
"""
# TODO:
# manifest_file = IngestionManifestWriter(manifest, working_dir).build()
raise NotImplementedError
@pytest.mark.skip("TODO: test_builds_evla_bdf_manifest")
def test_builds_evla_bdf_manifest():
"""
Have we built a well-formed EVLA BDF ingestion manifest?
:return:
"""
raise NotImplementedError
@pytest.mark.skip("TODO: test_builds_evla_cal_manifest")
def test_builds_evla_cal_manifest():
"""
Have we built a well-formed EVLA calibration ingestion manifest?
:return:
"""
raise NotImplementedError
""" Tests for all the other types of ingestion manifests """ """ Miscellaneous manifest-building tests """
import json
import logging
import shutil
import sys
from pathlib import Path
# pylint: disable=E0401, E0402, R1721
import pytest import pytest
from ingest_envoy.ingestion_manifest import IngestionManifest
from ingest_envoy.utilities import ScienceProductType, IngestionManifestKey
from .conftest import populate_fake_ingest_path, WANTED_FILENAMES, UNWANTED
logger = logging.getLogger(IngestionManifest.__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
def test_entry_point_for_evla_cal(ingest_path: Path):
"""
Confirm that the ingestion launcher entrypoint functions as expected.
:param ingest_path: fake tmp ingestion path
:return:
"""
populate_fake_ingest_path(ingest_path)
manifest = IngestionManifest(
str(ingest_path), ScienceProductType.EVLA_CAL.value, "uid://evla/calibration/meeniemyniemoe"
)
manifest.create()
files = [file for file in ingest_path.iterdir()]
manifest_file = [file for file in files if file.name.endswith(".json")][0]
assert manifest_file.exists()
assert len(files) == len(WANTED_FILENAMES) + len(UNWANTED) + 2
# make sure manifest_file contains an IngestionManifest
with open(manifest_file, "r") as out:
manifest_content = dict(json.load(out).items())
for key in ["parameters", "input_group", "output_group", "ingestion_path"]:
assert key in manifest_content.keys()
input_group = manifest_content[IngestionManifestKey.INPUT_GROUP.value]
assert isinstance(input_group, dict)
assert len(input_group) == 1
output_group = manifest_content[IngestionManifestKey.OUTPUT_GROUP.value]
assert isinstance(output_group, dict)
for key, val in output_group.items():
val = val.replace("'", '"', len(val))
sci_prods = json.loads(val)
assert isinstance(sci_prods, list)
assert len(sci_prods) == 1
ancillary_products = output_group[IngestionManifestKey.ANCILLARY_PRODUCTS.value]
assert ancillary_products[0] == "["
assert ancillary_products[-1] == "]"
assert "weblog" in ancillary_products
assert "type" in ancillary_products
assert "filename" in ancillary_products
a_prods = json.loads(ancillary_products.replace("'", '"', len(ancillary_products)))
assert isinstance(a_prods, list)
a_prods = a_prods[0]
assert isinstance(a_prods, dict)
for key, val in a_prods.items():
assert isinstance(val, dict)
a_prods = val
assert len(a_prods) == 2
shutil.rmtree(ingest_path)
@pytest.mark.skip("TODO: test_builds_realfast_sdm_manifest") @pytest.mark.skip("TODO: test_builds_realfast_sdm_manifest")
def test_builds_realfast_sdm_manifest(): def test_builds_realfast_sdm_manifest():
......
""" Tests for VLBA product ingestion manifests """ """ Tests for VLBA product ingestion manifests """
# pylint: disable=E0401
import pytest import pytest
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment