Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (8)
Showing
with 566 additions and 3 deletions
# Ingestion Envoy
This is a Python port of the ingestion manifest builder in archive-metaproject.
See https://open-confluence.nrao.edu/display/SSA/Ingestion+Manifests.
""" Version information for this package, don't put anything else here. """
___version___ = '4.0.0a1.dev1'
""" The ingestion manifest """
from pathlib import Path
from .utilities import (
Telescope,
ScienceProduct,
)
class Parameters:
"""a manifest's various input parameters"""
def __init__(
self,
telescope: Telescope,
ingestion_path: Path,
additional_metadata: str,
collection_metadata: str,
reingest: bool = False,
ngas_ingest: bool = False,
):
self.telescope = telescope
self.ingestion_path = ingestion_path
self.additional_metadata = additional_metadata
self.collection_metadata = collection_metadata
self.reingest = reingest
self.ngas_ingest = ngas_ingest
class InputGroup:
"""
This represents the starting point for processing which generated a science product.
There is not always an input group for every output group (rawdata, for instance).
Initial assumption: Input groups consist only of science products.
"""
def __init__(self):
self.science_products = []
class IngestionManifest:
"""Represents JSON layout of ingestion information, encompassing several potential scenarios.
see ingest_envoy/test/examples, nicked from https://open-confluence.nrao.edu/x/roPCAQ
"""
def __init__(self, parameters: Parameters):
self.parameters = parameters
# to be an InputGroup
self.input_group = None
# to be an OutputGroup
self.output_group = None
# to be an AssociateGroup (not required?)
self.associate_group = None
self.science_products = []
self.ancillary_products = []
class OutputGroup:
"""Represents result of data processing"""
def __init__(self):
self.science_products = []
self.ancillary_products = []
class AssociateGroup:
"""
A representation of Science Products which are not part of the same Input or Output groups
but are still fundamentally linked. Created for RealFast project, to link the RealFast
specific execution block & image to the execution block during which a transient was
discovered.
Associate groups also, by definition, include any science product(s) within the output
group to be ingested. The new locators generated at ingestion time will be added to any
which compose an associate group in the manifest.
"""
def __init__(self):
self.science_products = []
"""Build an ingestion manifest file"""
from .ingestion_manifest import (
Parameters,
IngestionManifest,
)
class IngestionManifestBuilder:
"""Uses supplied parameters to build ingestion manifest files
for the various types of ingestion"""
def __init__(self, parameters: Parameters):
self.parameters = parameters
def build(self) -> IngestionManifest:
"""
Create the ingestion manifest indicated by the parameters.
:return: the ingestion manifest constructed from the parameters
"""
raise NotImplementedError
def main():
print("Hello, world!")
\ No newline at end of file
""" Objects pertaining to the various ingestion manifests """
from enum import Enum
class Telescope(Enum):
"""Codifying the names of our telescopes, because Janet can't abide magic strings"""
VLA = 1
EVLA = 2
ALMA = 3
VLBA = 4
GBT = 5
NONE = 6
class IngestionType(Enum):
"""Types of ingestion we'll have to do"""
# ALMA products
ALMA_SDM = Telescope.ALMA
ALMA_CAL = Telescope.ALMA
ALMA_AUDI = Telescope.ALMA
# EVLA products
EVLA_SDM = Telescope.EVLA
EVLA_BDF = Telescope.EVLA
EVLA_CAL = Telescope.EVLA
# RealFast projects
REALFAST_SDM = Telescope.EVLA
# VLASS projects
VLASS_QUICKLOOK = Telescope.EVLA
# VLBA ingestion. (IDI and UVFITS products are treated the same.)
VLBA_FITS = Telescope.VLBA
# Coming Real Soon Now: VLBA Mark 4 product ingestion
VLBA_MARK4 = Telescope.VLBA
# Also coming Real Soon: GBT execution block ingestion
GBT_EB = Telescope.GBT
# Hot on its heels: LVLA execution block ingestion
LVLA_EB = Telescope.VLA
# When we just don't know what we're dealing with
UNKNOWN = Telescope.NONE
class ScienceProductType(Enum):
"""Canonical collection of ingestible types of science products"""
EXEC_BLOCK = "execution_block"
CAL = "calibration"
CATALOG = "catalog"
IMAGE = "image"
class ScienceProduct:
"""Represents a science product in an ingestion manifest"""
def __init__(self, sp_type: ScienceProductType, filename: str, locator: str, group_with: str):
self.sp_type = sp_type
self.filename = filename
# product locator, used for input groups; locator for a known science product
self.locator = locator
# for "late" science products; they get added to an existing output group
self.group_with = group_with
class AncillaryProductType:
"""The various types of ancillary products we'll encounter"""
INGESTION_ARTIFACTS = "ingestion_artifacts"
PIPELINE_ARTIFACTS = "pipeline_artifacts"
PIPELINE_WEBLOG = "pipeline_weblog"
LOG = "log_file"
### Images ###
# our default FITS type
FITS = "fits_image"
VLASS_QUICKLOOK = "quicklook_rms_image"
AUDI_FITS_MASK = "clean_mask"
AUDI_PB_FITS = "primary_beam"
ALPHA_FITS = "spectral_index"
CANDIDATE_IMG = "candidate_image"
THUMBNAIL_IMG = "thumbnail_image"
class AncillaryProduct:
"""Represents an ancillary product in an ingestion manifest"""
def __init__(
self, type: AncillaryProductType, filename: str, science_associate: str, group_with: str
):
self.type = type
self.filename = filename
# make this an ancillary to a particular science product (assumes locator string)
self.science_associate = science_associate # TODO: enum?
# make this an ancillary to the group of a science product (assumes locator string)
self.group_with = group_with
#!/usr/bin/python
# -*- coding: utf-8 -*-
from pathlib import Path
from setuptools import find_packages, setup
VERSION = open("ingest_envoy/_version.py").readlines()[-1].split()[-1].strip("\"'")
README = Path("README.md").read_text()
requires = ["pycapo", "pex==2.1.41"]
setup(
name="ssa-" + Path().absolute().name,
version=VERSION,
description="Workspaces ingestion functionality bridge",
long_description=README,
author="NRAO SSA Team",
author_email="dms-ssa@nrao.edu",
url="TBD",
license="GPL",
install_requires=requires,
tests_require=["pytest"],
keywords=[],
packages=find_packages(),
classifiers=["Programming Language :: Python :: 3.8"],
entry_points={"console_scripts": ["ingest_envoy = ingest_envoy.main:main"]},
)
{
"parameters": {
"reingest": "false",
"ngas-ingest": "false",
"calibrate": "false",
"ingestion_path": "/lustre/...../"
},
"input-group": {
"science_products": [
{
"type": "execution-block",
"locator": "......"
}
]
},
"output-group": {
"science_products": [
{
"type": "calibration",
"filename": "19A-321_2019......tar"
}
],
"ancillary_products": [
{
"type": "weblog",
"filename": "weblog.tgz"
}
]
}
}
{
"parameters": {
"reingest": false,
"ngas_ingest": false,
"telescope": "EVLA",
"ingestion_path": "/home/mchammer/evla/parallel-prod"
},
"output_group": {
"science_products": [
{
"type": "execution_block",
"filename": "X_osro_000.59368.65423814815"
}
],
"ancillary_products": [
{
"type": "ingestion_artifacts",
"filename": "ingestion_artifacts_2021_06_03_T15_52_35.031.tar"
}
]
}
}
{
"parameters": {
"reingest": "false",
"ngas-ingest": "false",
"calibrate": "false",
"ingestion_path": "/lustre/.."
},
"input-group": {
"science_products": [
{
"type": "calibration",
"locator": "...."
}
]
},
"output-group": {
"science_products": [
{
"type": "image",
"filename": "image1.fits",
"ancillary-products": [
{
"type": "image_mask",
"filename": "image1.mask.fits"
}
]
},
{
"type": "image",
"filename": "image2.fits",
"ancillary-products": [
{
"type": "image_mask",
"filename": "image2.mask.fits"
}
]
},
{
"type": "image",
"filename": "image3.fits",
"ancillary-products": [
{
"type": "image_mask",
"filename": "image3.mask.fits"
}
]
}
],
"ancillary-products": [
{
"type": "weblog",
"filename": "weblog.tgz"
},
{
"type": "tar",
"filename": "imaging_extras.tar"
}
]
}
}
{
"parameters": {
"reingest": "false",
"ngas-ingest": "false",
"calibrate": "false",
"ingestion_path": "/lustre/aoc/cluster/pipeline/vlass_auto/cache/quicklook/VLASS1.1/T29t04/VLASS1.1.ql.T29t04.J094850+743000.10.2048.v5"
},
"input-group": {
"science_products": [
{
"type": "calibration",
"locator": "uid://evla/calibration/a47c2e78-4f4e-4516-ab95-8bbb4057e9bb"
},
{
"type": "execution_block",
"locator": "uid://evla/execblock/52dd9e10-63fb-4fa8-b6ff-fcf6240b97f4"
}
]
},
"output-group": {
"science_products": [
{
"type": "quicklook_image",
"filename": "VLASS1.1.ql.T29t04.J094850+743000.10.2048.v5.I.iter1.image.pbcor.tt0.subim.fits",
"ancillary-products": [
{
"type": "quicklook_rms",
"filename": "VLASS1.1.ql.T29t04.J094850+743000.10.2048.v5.I.iter1.image.pbcor.tt0.rms.subim.fits"
}
]
}
],
"ancillary-products": [
{
"type": "tar",
"filename": "VLASS1.1.ql.T29t04.J094850+743000.10.2048.v5.tar"
}
]
}
}
{
"parameters": {
"reingest": "false",
"ngas-ingest": "false",
"calibrate": "false",
"ingestion_path": "/lustre/...../"
},
"science-products": [
{
"type": "vlass_catalog",
"filename": "imaging_catalog_example.cat",
"group_with": ".... (locator for an image)"
}
]
}
""" Test for the various types of ALMA ingestion manifests """
import pytest
@pytest.mark.skip("TODO: test_builds_alma_sdm_manifest")
def test_builds_alma_sdm_manifest():
"""
Have we built a well-formed ALMA SDM ingestion manifest?
:return:
"""
raise NotImplementedError
@pytest.mark.skip("TODO: test_builds_alma_cal_manifest")
def test_builds_alma_cal_manifest():
"""
Have we built a well-formed ALMA calibration ingestion manifest?
:return:
"""
raise NotImplementedError
@pytest.mark.skip("TODO: test_builds_alma_cal_manifest")
def test_builds_alma_audi_manifest():
"""
Have we built a well-formed ALMA image ingestion manifest?
:return:
"""
raise NotImplementedError
""" Test for the various types of EVLA ingestion manifests """
from pathlib import Path
import pytest
from ..ingest_envoy.ingestion_manifest import Parameters
from ..ingest_envoy.ingestion_manifest_builder import IngestionManifestBuilder
from apps.cli.executables.pexable.ingest_envoy.ingest_envoy.utilities import Telescope
@pytest.mark.skip("TODO: test_builds_evla_sdm_manifest")
def test_builds_evla_sdm_manifest():
"""
Have we built a well-formed EVLA SDM ingestion manifest?
:return:
"""
parameters = Parameters(Telescope.EVLA, Path("/path/TODO"), None, None)
manifest = IngestionManifestBuilder(parameters).build()
assert manifest.input_group is not None
assert manifest.output_group is not None
assert len(manifest.science_products) > 0
assert len(manifest.ancillary_products) > 0
# TODO: tests for manifest parameters
@pytest.mark.skip("TODO: test_builds_evla_bdf_manifest")
def test_builds_evla_bdf_manifest():
"""
Have we built a well-formed EVLA BDF ingestion manifest?
:return:
"""
raise NotImplementedError
@pytest.mark.skip("TODO: test_builds_evla_cal_manifest")
def test_builds_evla_cal_manifest():
"""
Have we built a well-formed EVLA calibration ingestion manifest?
:return:
"""
raise NotImplementedError
""" Tests for all the other types of ingestion manifests """
import pytest
@pytest.mark.skip("TODO: test_builds_realfast_sdm_manifest")
def test_builds_realfast_sdm_manifest():
"""
Have we built a well-formed RealFast product ingestion manifest?
:return:
"""
raise NotImplementedError
@pytest.mark.skip("TODO: test_builds_vlass_quicklook_manifest")
def test_builds_vlass_quicklook_manifest():
"""
Have we built a well-formed VLASS quicklook product ingestion manifest?
:return:
"""
raise NotImplementedError
@pytest.mark.skip("TODO: test_builds_gbt_manifest")
def test_builds_gbt_manifest():
"""
Not yet in use, but sooner or later we'll need to create manifests
for ingestion of GBT products.
:return:
"""
raise NotImplementedError
@pytest.mark.skip("TODO: test_builds_lvla_eb_manifest")
def test_builds_lvla_eb_manifest():
"""
Not yet in use, but one of these days we'll need to create manifests
for ingestion of LVLA execution blocks.
:return:
"""
raise NotImplementedError
""" Tests for VLBA product ingestion manifests """
import pytest
@pytest.mark.skip("TODO: test_builds_vlba_fits_manifest")
def test_builds_vlba_fits_manifest():
"""
Have we built a well-formed VLBA IDI/UVFITS ingestion manifest?
:return:
"""
raise NotImplementedError
@pytest.mark.skip("TODO: test_builds_vlba_mark4_manifest")
def test_builds_vlba_mark4_manifest():
"""
Unused, but coming: Have we built a well-formed VLBA Mark 4 ingestion manifest?
:return:
"""
raise NotImplementedError
......@@ -28,8 +28,8 @@ services:
parallelism: 0
order: stop-first
volumes:
- /lustre/aoc/cluster/pipeline/dsoc-dev/downloads/nrao:/lustre/aoc/cluster/pipeline/dsoc-dev/downloads/nrao
- /lustre/aoc/cluster/pipeline/dsoc-dev/workspaces:/lustre/aoc/cluster/pipeline/dsoc-dev/workspaces
- /lustre/aoc/cluster/pipeline/dsoc-${ENV}/downloads/nrao:/lustre/aoc/cluster/pipeline/dsoc-${ENV}/downloads/nrao
- /lustre/aoc/cluster/pipeline/dsoc-${ENV}/workspaces:/lustre/aoc/cluster/pipeline/dsoc-${ENV}/workspaces
- /home/ssa/bin/python3.8:/home/ssa/bin/python3.8
- /home/casa/capo:/home/casa/capo
......
......@@ -42,9 +42,14 @@ edu.nrao.archive.workspaces.DeliverySettings.ciplDelivery = /lustre/aoc/cluster
edu.nrao.archive.workspaces.DeliverySettings.cacheWeblogDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/cache/weblog
#
# Data fetcher settings
# Standard Imaging Settings
#
edu.nrao.archive.workspaces.DeliverySettings.standardImageDelivery = /lustre/aoc/cluster/pipeline/docker/workspaces/image-qa
#
# Data fetcher settings
#
edu.nrao.archive.workflow.config.DataFetcherSettings.ramInGb = 16
edu.nrao.archive.workflow.config.DataFetcherSettings.clusterTimeout = 01:00:00:00
edu.nrao.archive.datafetcher.DataFetcherSettings.locatorServiceUrlPrefix = https://webtest.aoc.nrao.edu/archiveServices/location?locator
......