Skip to content
Snippets Groups Projects
Commit 4f7fb647 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

Revert "WS-508: ingest_envoy structure and integration"

This reverts commit 4af90a05.
parent de08150e
No related branches found
No related tags found
1 merge request!338WS-540: Create blank qa-notes.html with correct permissions
Showing
with 8 additions and 513 deletions
......@@ -5,6 +5,8 @@ import argparse
from unittest.mock import patch, MagicMock
import conveyor.conveyor as con
from conveyor.deliver import DeliveryConveyor
from conveyor.retrieve import RetrievalConveyor
expected_settings = {
"qa_delivery_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/qa2",
......@@ -24,17 +26,14 @@ class TestConveyor:
settings = con._get_settings("test/test.json")
assert settings["qa_delivery_area"] == expected_settings["qa_delivery_area"]
assert settings["weblog_cache_area"] == expected_settings["weblog_cache_area"]
assert (
settings["workspaces_lustre_root_dir"]
== expected_settings["workspaces_lustre_root_dir"]
)
assert settings["workspaces_lustre_root_dir"] == expected_settings["workspaces_lustre_root_dir"]
# mock calls to cwd and count
assert cwd.call_count == 2
assert settings["destination_dir"] == expected_settings["destination_dir"]
assert settings["destination_subdir"] == expected_settings["destination_subdir"]
def test_main_deliver(self):
args.deliver = ["test/test.json"]
args.deliver = ['test/test.json']
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
assert args.deliver[0] == "test/test.json"
......@@ -46,7 +45,7 @@ class TestConveyor:
args.deliver = None
def test_main_retrieve(self):
args.retrieve = ["test/test.json"]
args.retrieve = ['test/test.json']
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
with patch("conveyor.retrieve.RetrievalConveyor.convey") as ret_convey:
......
import argparse
import logging
import sys
from pycapo import CapoConfig
from ingest_envoy.launchers import IngestCalibrationLauncher
from ingest_envoy.solicitor import Solicitor
"""
Setup and Launch ingestion via Workspaces
"""
logger = logging.getLogger("ingest_envoy")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
def _get_settings(filename: str) -> dict:
ingestion_settings = CapoConfig().settings("edu.nrao.archive.workspaces.IngestionSettings")
staging_root_dir = ingestion_settings.stagingDirectory
storage_root_dir = ingestion_settings.storageDirectory
parameters = Solicitor(filename).solicit_parameters()
parameters["staging_area"] = staging_root_dir
parameters["storage_area"] = storage_root_dir
return parameters
def arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Workspaces Ingestion handler", formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
"--calibration",
nargs=1,
action="store",
required=False,
help="run ingestion for a calibration product",
)
parser.add_argument(
"--image",
nargs=1,
action="store",
required=False,
help="run ingestion for an image product (anticipated functionality)",
)
return parser
def check_ingest_type(args_type: str, parameters: dict) -> bool:
if args_type in parameters["workflowName"]:
return True
return False
def main():
args = arg_parser().parse_args()
if args.calibration is not None:
arg_type = "calibration"
parameters = _get_settings(args.calibration[0])
if check_ingest_type(args_type=arg_type, parameters=parameters):
IngestCalibrationLauncher(parameters).launch_ingestion()
else:
logger.error(
f"ERROR: The workflow request to be ingested does not match specified ingestion type {type}."
)
import logging
import subprocess
import sys
from ingest_envoy.ingestion_manifest import IngestionManifest
from ingest_envoy.interfaces import IngestLauncherIF
class IngestCalibrationLauncher(IngestLauncherIF):
def __init__(self, parameters: dict):
self.logger = logging.getLogger("ingest_envoy")
self.ingestion_type = "calibration"
self.parameters = parameters
self.staging_source_dir = (
self.parameters["staging_area"] + "/" + self.parameters["workflowDir"]
)
def launch_ingestion(self):
self.prepare_for_ingest()
self.run_ingest()
def run_ingest(self):
subprocess.run(
["./ingest", "--json", self.staging_source_dir],
stdout=sys.stdout,
stderr=sys.stderr,
)
def prepare_for_ingest(self):
# 1. run collection script to create calibration tarfile
self.run_collection_script()
# 2. create ingestion manifest
self.create_manifest()
def run_collection_script(self):
workflow_dir = self.parameters["workflowDir"]
sdm_id = self.parameters["sdmId"]
cal_processing_datetime = self.parameters["processingStart"]
# run script
subprocess.run(
["./calibration-table-collector.sh", workflow_dir, sdm_id, cal_processing_datetime],
stdout=sys.stdout,
stderr=sys.stderr,
)
def create_manifest(self):
spl = self.parameters["spl"]
IngestionManifest(self.staging_source_dir, self.ingestion_type, spl).create()
def main():
print("Hello, world!")
\ No newline at end of file
"""
Solicitor: uses metadata.json to determine input parameters for ingestion
"""
import json
import pathlib
class Solicitor:
def __init__(self, filename: str):
self.filename = filename
self.metadata = self.solicit_contents()
def solicit_contents(self) -> dict:
with open(self.filename) as file:
metadata = json.loads(file.read())
return metadata
def solicit_workflow_directory_name(self):
destination_path = pathlib.Path(self.metadata["destinationDirectory"])
return destination_path.stem
def solicit_processing_creation_time(self):
# incoming datetime: 2021-07-06T21:50:48
# transformation for NGAS friendliness: 2021_07_06T21_50_48
creation_time = self.metadata["creationTime"].replace("-", "_").replace(":", "_")
return creation_time
def solicit_parameters(self):
metadata = self.solicit_contents()
return {
"sdmId": metadata["fileSetIds"],
"workflowName": metadata["workflowName"],
"spl": metadata["productLocator"],
"processingStart": self.solicit_processing_creation_time(),
"destinationDir": metadata["destinationDirectory"],
"workflowDir": self.solicit_workflow_directory_name(),
}
......@@ -25,5 +25,5 @@ setup(
keywords=[],
packages=find_packages(),
classifiers=["Programming Language :: Python :: 3.8"],
entry_points={"console_scripts": ["ingest_envoy = ingest_envoy.ingest:main"]},
entry_points={"console_scripts": ["ingest_envoy = ingest_envoy.main:main"]},
)
{
"fileSetIds": "16B-069_sb32814386_1_001.57685.66193635417",
"workflowName": "std_calibration",
"systemId": "2",
"creationTime": "2021-07-06T21:50:48",
"productLocator": "uid://evla/execblock/48ba4c9d-d7c7-4a8f-9803-1115cd52459b",
"projectMetadata": {
"projectCode": "16B-069",
"title": "JVLA monitoring of an in-progress extreme scattering event",
"startTime": "57685.661952546296",
"observer": "Keith Bannister"
},
"destinationDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmp576dknjp"
}
"""
Tests for ingest_envoy.ingest
"""
import argparse
from unittest.mock import patch, MagicMock
import ingest_envoy.ingest as ingest
expected_settings = {
"sdmId": "16B-069_sb32814386_1_001.57685.66193635417",
"workflowName": "std_calibration",
"spl": "uid://evla/execblock/48ba4c9d-d7c7-4a8f-9803-1115cd52459b",
"processingStart": "2021_07_06T21_50_48",
"destinationDir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmp576dknjp",
"workflowDir": "tmp576dknjp",
"staging_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/staging",
"storage_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/storage",
}
args = argparse.Namespace
filename = "test/input_files/test-metadata.json"
class TestIngest:
def test_get_settings(self):
settings = ingest._get_settings(filename)
assert settings["sdmId"] == expected_settings["sdmId"]
assert settings["workflowName"] == expected_settings["workflowName"]
assert settings["spl"] == expected_settings["spl"]
assert settings["processingStart"] == expected_settings["processingStart"]
assert settings["destinationDir"] == expected_settings["destinationDir"]
assert settings["workflowDir"] == expected_settings["workflowDir"]
assert settings["staging_area"] == expected_settings["staging_area"]
assert settings["storage_area"] == expected_settings["storage_area"]
def test_check_ingest_type(self):
arg_type = "calibration"
arg_type2 = "image"
check = ingest.check_ingest_type(arg_type, expected_settings)
assert check is True
check2 = ingest.check_ingest_type(arg_type2, expected_settings)
assert check2 is False
def test_parser(self):
parser = ingest.arg_parser()
assert isinstance(parser, argparse.ArgumentParser)
def test_main_calibration(self):
args.calibration = [filename]
with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args:
with patch(
"ingest_envoy.launchers.IngestCalibrationLauncher.launch_ingestion"
) as cal_ingest:
ingest.main()
assert cal_ingest.call_count == 1
# reset for other testing
args.calibration = None
"""
Tests for ingest_envoy.launchers
"""
from unittest.mock import patch
import pytest
from ingest_envoy.launchers import IngestCalibrationLauncher
parameters = {
"sdmId": "16B-069_sb32814386_1_001.57685.66193635417",
"workflowName": "std_calibration",
"spl": "uid://evla/execblock/48ba4c9d-d7c7-4a8f-9803-1115cd52459b",
"processingStart": "2021_07_06T21_50_48",
"destinationDir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmp576dknjp",
"workflowDir": "tmp576dknjp",
"staging_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/staging",
"storage_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/storage",
}
class TestIngestCalibrationLauncher:
@pytest.mark.skip("Skip until manifest builder is complete")
@patch("ingest_envoy.ingestion_manifest.IngestionManifest.create")
@patch("subprocess.run")
def test_launch_ingestion(self, mock_run, mock_manifest):
IngestCalibrationLauncher(parameters).launch_ingestion()
assert mock_run.call_count == 2
assert mock_manifest.call_count == 1
@patch("subprocess.run")
def test_run_ingest(self, mock_run):
IngestCalibrationLauncher(parameters).run_ingest()
assert mock_run.call_count == 1
@pytest.mark.skip("Skip until manifest builder is complete")
@patch("ingest_envoy.ingestion_manifest.IngestionManifest.create")
@patch("subprocess.run")
def test_prepare_for_ingest(self, mock_run, mock_manifest):
IngestCalibrationLauncher(parameters).prepare_for_ingest()
assert mock_run.call_count == 1
assert mock_manifest.call_count == 1
@patch("subprocess.run")
def test_run_collection_script(self, mock_run):
IngestCalibrationLauncher(parameters).run_collection_script()
assert mock_run.call_count == 1
@pytest.mark.skip("Skip until manifest builder is complete")
@patch("ingest_envoy.ingestion_manifest.IngestionManifest.create")
def test_create_manifest(self, mock_manifest):
IngestCalibrationLauncher(parameters).create_manifest()
assert mock_manifest.call_count == 1
"""
Tests for solicitor.py
"""
from ingest_envoy.solicitor import Solicitor
solicitor = Solicitor("test/input_files/test-metadata.json")
expected_metadata = {
"fileSetIds": "16B-069_sb32814386_1_001.57685.66193635417",
"workflowName": "std_calibration",
"systemId": "2",
"creationTime": "2021-07-06T21:50:48",
"productLocator": "uid://evla/execblock/48ba4c9d-d7c7-4a8f-9803-1115cd52459b",
"projectMetadata": {
"projectCode": "16B-069",
"title": "JVLA monitoring of an in-progress extreme scattering event",
"startTime": "57685.661952546296",
"observer": "Keith Bannister",
},
"destinationDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmp576dknjp",
}
class TestSolicitor:
def test_solicit_contents(self):
metadata = solicitor.solicit_contents()
assert metadata == expected_metadata
def test_solicit_workflow_directory_name(self):
expected_workflow_dir_name = "tmp576dknjp"
name = solicitor.solicit_workflow_directory_name()
assert name == expected_workflow_dir_name
def test_solicit_processing_creation_time(self):
expected_corrected_time = "2021_07_06T21_50_48"
time_string = solicitor.solicit_processing_creation_time()
assert time_string == expected_corrected_time
def test_solicit_parameters(self):
expected_parameters = {
"sdmId": "16B-069_sb32814386_1_001.57685.66193635417",
"workflowName": "std_calibration",
"spl": "uid://evla/execblock/48ba4c9d-d7c7-4a8f-9803-1115cd52459b",
"processingStart": "2021_07_06T21_50_48",
"destinationDir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmp576dknjp",
"workflowDir": "tmp576dknjp",
}
parameters = solicitor.solicit_parameters()
assert parameters == expected_parameters
......@@ -35,12 +35,6 @@ edu.nrao.archive.workflow.config.DeliverySettings.nraoDownloadDirectory = /tmp/d
edu.nrao.archive.workflow.config.DeliverySettings.nraoDownloadUrl = http://localhost:4444/dl
#
# Ingestion Settings
#
edu.nrao.archive.workspaces.IngestionSettings.stagingDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/staging
edu.nrao.archive.workspaces.IngestionSettings.storageDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/storage
#
# Standard Calibration Settings
#
......
"""add workflow creation time to metadata.json
Revision ID: a7c2b4682aae
Revises: 758525350770
Create Date: 2021-07-07 15:42:31.380435
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "a7c2b4682aae"
down_revision = "758525350770"
branch_labels = None
depends_on = None
def upgrade():
cal_content = """{
"fileSetIds": "{{sdmId}}",
"workflowName": "std_calibration",
"systemId": "{{request_id}}",
"creationTime": "{{created_at}}",
"productLocator": "{{product_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}"
}
"""
cms_content = """{
"fileSetIds": "{{sdmId}}",
"workflowName": "std_cms_imaging",
"systemId": "{{request_id}}",
"creationTime": "{{created_at}}",
"productLocator": "{{product_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}",
"calibrationSourceDirectory":"{{cms_path}}",
"cmsName":"{{sdmId}}.ms"
}
"""
condor_content = """executable = ingest_cal.sh
arguments = metadata.json
output = ingest.out
error = ingest.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/conveyor, $(SBIN_PATH)/ingest_envoy, $(SBIN_PATH)/ingest, $(SBIN_PATH)/calibration-table-collector.sh, ./metadata.json
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
op.execute(
f"""
UPDATE workflow_templates
SET content = E'{cal_content}' WHERE filename='metadata.json' AND workflow_name='std_calibration'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content = E'{cms_content}' WHERE filename='metadata.json' AND workflow_name='std_cms_imaging'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content = E'{condor_content}' WHERE filename='ingest_cal.condor'
"""
)
def downgrade():
old_cal_content = """{
"fileSetIds": "{{sdmId}}",
"workflowName": "std_calibration",
"systemId": "{{request_id}}",
"productLocator": "{{product_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}"
}
"""
old_cms_content = """{
"fileSetIds": "{{sdmId}}",
"workflowName": "std_cms_imaging",
"systemId": "{{request_id}}",
"productLocator": "{{product_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}",
"calibrationSourceDirectory":"{{cms_path}}",
"cmsName":"{{sdmId}}.ms"
}
"""
old_condor_content = """executable = ingest_cal.sh
arguments = metadata.json
output = ingest.out
error = ingest.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/conveyor, $(SBIN_PATH)/ingest_envoy, $(SBIN_PATH)/calibration-table-collector.sh, ./metadata.json
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
op.execute(
f"""
UPDATE workflow_templates
SET content = E'{old_cal_content}' WHERE filename='metadata.json' AND workflow_name='std_calibration'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content = E'{old_cms_content}' WHERE filename='metadata.json' AND workflow_name='std_cms_imaging'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content = E'{old_condor_content}' WHERE filename='ingest_cal.condor'
"""
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment