From 4af90a056b7555ba4701c5d6bb43eb0721615090 Mon Sep 17 00:00:00 2001 From: chausman <chausman@nrao.edu> Date: Thu, 8 Jul 2021 13:10:06 -0600 Subject: [PATCH] WS-508: ingest_envoy structure and integration --- .../pexable/conveyor/test/test_conveyor.py | 11 +- .../ingest_envoy/ingest_envoy/ingest.py | 72 ++++++++ .../ingest_envoy/ingest_envoy/interfaces.py | 27 +++ .../ingest_envoy/ingest_envoy/launchers.py | 50 ++++++ .../pexable/ingest_envoy/ingest_envoy/main.py | 2 - .../ingest_envoy/ingest_envoy/solicitor.py | 37 ++++ .../executables/pexable/ingest_envoy/setup.py | 2 +- .../test/input_files/test-metadata.json | 14 ++ .../pexable/ingest_envoy/test/test_ingest.py | 61 +++++++ .../ingest_envoy/test/test_launchers.py | 53 ++++++ .../ingest_envoy/test/test_solicitor.py | 51 ++++++ docker.properties | 6 + ...add_workflow_creation_time_to_metadata_.py | 162 ++++++++++++++++++ 13 files changed, 540 insertions(+), 8 deletions(-) create mode 100644 apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py create mode 100644 apps/cli/executables/pexable/ingest_envoy/ingest_envoy/interfaces.py create mode 100644 apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py delete mode 100644 apps/cli/executables/pexable/ingest_envoy/ingest_envoy/main.py create mode 100644 apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py create mode 100644 apps/cli/executables/pexable/ingest_envoy/test/input_files/test-metadata.json create mode 100644 apps/cli/executables/pexable/ingest_envoy/test/test_ingest.py create mode 100644 apps/cli/executables/pexable/ingest_envoy/test/test_launchers.py create mode 100644 apps/cli/executables/pexable/ingest_envoy/test/test_solicitor.py create mode 100644 schema/versions/a7c2b4682aae_add_workflow_creation_time_to_metadata_.py diff --git a/apps/cli/executables/pexable/conveyor/test/test_conveyor.py b/apps/cli/executables/pexable/conveyor/test/test_conveyor.py index ed4c83033..1794a37a3 100644 --- a/apps/cli/executables/pexable/conveyor/test/test_conveyor.py +++ b/apps/cli/executables/pexable/conveyor/test/test_conveyor.py @@ -5,8 +5,6 @@ import argparse from unittest.mock import patch, MagicMock import conveyor.conveyor as con -from conveyor.deliver import DeliveryConveyor -from conveyor.retrieve import RetrievalConveyor expected_settings = { "qa_delivery_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/qa2", @@ -26,14 +24,17 @@ class TestConveyor: settings = con._get_settings("test/test.json") assert settings["qa_delivery_area"] == expected_settings["qa_delivery_area"] assert settings["weblog_cache_area"] == expected_settings["weblog_cache_area"] - assert settings["workspaces_lustre_root_dir"] == expected_settings["workspaces_lustre_root_dir"] + assert ( + settings["workspaces_lustre_root_dir"] + == expected_settings["workspaces_lustre_root_dir"] + ) # mock calls to cwd and count assert cwd.call_count == 2 assert settings["destination_dir"] == expected_settings["destination_dir"] assert settings["destination_subdir"] == expected_settings["destination_subdir"] def test_main_deliver(self): - args.deliver = ['test/test.json'] + args.deliver = ["test/test.json"] with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args: assert args.deliver[0] == "test/test.json" @@ -45,7 +46,7 @@ class TestConveyor: args.deliver = None def test_main_retrieve(self): - args.retrieve = ['test/test.json'] + args.retrieve = ["test/test.json"] with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args: with patch("conveyor.retrieve.RetrievalConveyor.convey") as ret_convey: diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py new file mode 100644 index 000000000..970a38468 --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py @@ -0,0 +1,72 @@ +import argparse +import logging +import sys + +from pycapo import CapoConfig + +from ingest_envoy.launchers import IngestCalibrationLauncher +from ingest_envoy.solicitor import Solicitor + +""" +Setup and Launch ingestion via Workspaces +""" + +logger = logging.getLogger("ingest_envoy") +logger.setLevel(logging.INFO) +logger.addHandler(logging.StreamHandler(sys.stdout)) + + +def _get_settings(filename: str) -> dict: + ingestion_settings = CapoConfig().settings("edu.nrao.archive.workspaces.IngestionSettings") + staging_root_dir = ingestion_settings.stagingDirectory + storage_root_dir = ingestion_settings.storageDirectory + + parameters = Solicitor(filename).solicit_parameters() + + parameters["staging_area"] = staging_root_dir + parameters["storage_area"] = storage_root_dir + + return parameters + + +def arg_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Workspaces Ingestion handler", formatter_class=argparse.RawTextHelpFormatter + ) + parser.add_argument( + "--calibration", + nargs=1, + action="store", + required=False, + help="run ingestion for a calibration product", + ) + parser.add_argument( + "--image", + nargs=1, + action="store", + required=False, + help="run ingestion for an image product (anticipated functionality)", + ) + return parser + + +def check_ingest_type(args_type: str, parameters: dict) -> bool: + if args_type in parameters["workflowName"]: + return True + + return False + + +def main(): + args = arg_parser().parse_args() + + if args.calibration is not None: + arg_type = "calibration" + parameters = _get_settings(args.calibration[0]) + + if check_ingest_type(args_type=arg_type, parameters=parameters): + IngestCalibrationLauncher(parameters).launch_ingestion() + else: + logger.error( + f"ERROR: The workflow request to be ingested does not match specified ingestion type {type}." + ) diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/interfaces.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/interfaces.py new file mode 100644 index 000000000..ca02ee793 --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/interfaces.py @@ -0,0 +1,27 @@ +""" +Interfaces for ingest_envoy +""" +from abc import ABC + + +class IngestLauncherIF(ABC): + """ + Generic Ingestion Launcher methods. + Should be implemented for any type of ingestion launcher + """ + + # launch all ingest steps, should be called in ingest.main + def launch_ingestion(self): + raise NotImplementedError + + # run ingest pex + def run_ingest(self): + raise NotImplementedError + + # setup workflow results for ingestion, ensure placement in staging area + def prepare_for_ingest(self): + raise NotImplementedError + + # create ingestion manifest + def create_manifest(self): + raise NotImplementedError diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py new file mode 100644 index 000000000..16c845344 --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py @@ -0,0 +1,50 @@ +import logging +import subprocess +import sys + +from ingest_envoy.ingestion_manifest import IngestionManifest +from ingest_envoy.interfaces import IngestLauncherIF + + +class IngestCalibrationLauncher(IngestLauncherIF): + def __init__(self, parameters: dict): + self.logger = logging.getLogger("ingest_envoy") + self.ingestion_type = "calibration" + self.parameters = parameters + self.staging_source_dir = ( + self.parameters["staging_area"] + "/" + self.parameters["workflowDir"] + ) + + def launch_ingestion(self): + self.prepare_for_ingest() + self.run_ingest() + + def run_ingest(self): + subprocess.run( + ["./ingest", "--json", self.staging_source_dir], + stdout=sys.stdout, + stderr=sys.stderr, + ) + + def prepare_for_ingest(self): + # 1. run collection script to create calibration tarfile + self.run_collection_script() + + # 2. create ingestion manifest + self.create_manifest() + + def run_collection_script(self): + workflow_dir = self.parameters["workflowDir"] + sdm_id = self.parameters["sdmId"] + cal_processing_datetime = self.parameters["processingStart"] + # run script + subprocess.run( + ["./calibration-table-collector.sh", workflow_dir, sdm_id, cal_processing_datetime], + stdout=sys.stdout, + stderr=sys.stderr, + ) + + def create_manifest(self): + spl = self.parameters["spl"] + + IngestionManifest(self.staging_source_dir, self.ingestion_type, spl).create() diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/main.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/main.py deleted file mode 100644 index ccd65ac67..000000000 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/main.py +++ /dev/null @@ -1,2 +0,0 @@ -def main(): - print("Hello, world!") \ No newline at end of file diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py new file mode 100644 index 000000000..801cea22f --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py @@ -0,0 +1,37 @@ +""" +Solicitor: uses metadata.json to determine input parameters for ingestion +""" +import json +import pathlib + + +class Solicitor: + def __init__(self, filename: str): + self.filename = filename + self.metadata = self.solicit_contents() + + def solicit_contents(self) -> dict: + with open(self.filename) as file: + metadata = json.loads(file.read()) + return metadata + + def solicit_workflow_directory_name(self): + destination_path = pathlib.Path(self.metadata["destinationDirectory"]) + return destination_path.stem + + def solicit_processing_creation_time(self): + # incoming datetime: 2021-07-06T21:50:48 + # transformation for NGAS friendliness: 2021_07_06T21_50_48 + creation_time = self.metadata["creationTime"].replace("-", "_").replace(":", "_") + return creation_time + + def solicit_parameters(self): + metadata = self.solicit_contents() + return { + "sdmId": metadata["fileSetIds"], + "workflowName": metadata["workflowName"], + "spl": metadata["productLocator"], + "processingStart": self.solicit_processing_creation_time(), + "destinationDir": metadata["destinationDirectory"], + "workflowDir": self.solicit_workflow_directory_name(), + } diff --git a/apps/cli/executables/pexable/ingest_envoy/setup.py b/apps/cli/executables/pexable/ingest_envoy/setup.py index 6e6262006..4c18cc63b 100644 --- a/apps/cli/executables/pexable/ingest_envoy/setup.py +++ b/apps/cli/executables/pexable/ingest_envoy/setup.py @@ -26,5 +26,5 @@ setup( keywords=[], packages=find_packages(), classifiers=["Programming Language :: Python :: 3.8"], - entry_points={"console_scripts": ["ingest_envoy = ingest_envoy.main:main"]}, + entry_points={"console_scripts": ["ingest_envoy = ingest_envoy.ingest:main"]}, ) diff --git a/apps/cli/executables/pexable/ingest_envoy/test/input_files/test-metadata.json b/apps/cli/executables/pexable/ingest_envoy/test/input_files/test-metadata.json new file mode 100644 index 000000000..82782d96a --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/test/input_files/test-metadata.json @@ -0,0 +1,14 @@ +{ + "fileSetIds": "16B-069_sb32814386_1_001.57685.66193635417", + "workflowName": "std_calibration", + "systemId": "2", + "creationTime": "2021-07-06T21:50:48", + "productLocator": "uid://evla/execblock/48ba4c9d-d7c7-4a8f-9803-1115cd52459b", + "projectMetadata": { + "projectCode": "16B-069", + "title": "JVLA monitoring of an in-progress extreme scattering event", + "startTime": "57685.661952546296", + "observer": "Keith Bannister" + }, + "destinationDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmp576dknjp" +} diff --git a/apps/cli/executables/pexable/ingest_envoy/test/test_ingest.py b/apps/cli/executables/pexable/ingest_envoy/test/test_ingest.py new file mode 100644 index 000000000..9243bf5e3 --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/test/test_ingest.py @@ -0,0 +1,61 @@ +""" +Tests for ingest_envoy.ingest +""" +import argparse +from unittest.mock import patch, MagicMock + +import ingest_envoy.ingest as ingest + +expected_settings = { + "sdmId": "16B-069_sb32814386_1_001.57685.66193635417", + "workflowName": "std_calibration", + "spl": "uid://evla/execblock/48ba4c9d-d7c7-4a8f-9803-1115cd52459b", + "processingStart": "2021_07_06T21_50_48", + "destinationDir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmp576dknjp", + "workflowDir": "tmp576dknjp", + "staging_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/staging", + "storage_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/storage", +} + +args = argparse.Namespace +filename = "test/input_files/test-metadata.json" + + +class TestIngest: + def test_get_settings(self): + settings = ingest._get_settings(filename) + assert settings["sdmId"] == expected_settings["sdmId"] + assert settings["workflowName"] == expected_settings["workflowName"] + assert settings["spl"] == expected_settings["spl"] + assert settings["processingStart"] == expected_settings["processingStart"] + assert settings["destinationDir"] == expected_settings["destinationDir"] + assert settings["workflowDir"] == expected_settings["workflowDir"] + assert settings["staging_area"] == expected_settings["staging_area"] + assert settings["storage_area"] == expected_settings["storage_area"] + + def test_check_ingest_type(self): + arg_type = "calibration" + arg_type2 = "image" + + check = ingest.check_ingest_type(arg_type, expected_settings) + assert check is True + + check2 = ingest.check_ingest_type(arg_type2, expected_settings) + assert check2 is False + + def test_parser(self): + parser = ingest.arg_parser() + assert isinstance(parser, argparse.ArgumentParser) + + def test_main_calibration(self): + args.calibration = [filename] + + with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)) as mock_args: + with patch( + "ingest_envoy.launchers.IngestCalibrationLauncher.launch_ingestion" + ) as cal_ingest: + ingest.main() + assert cal_ingest.call_count == 1 + + # reset for other testing + args.calibration = None diff --git a/apps/cli/executables/pexable/ingest_envoy/test/test_launchers.py b/apps/cli/executables/pexable/ingest_envoy/test/test_launchers.py new file mode 100644 index 000000000..a734a8fec --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/test/test_launchers.py @@ -0,0 +1,53 @@ +""" +Tests for ingest_envoy.launchers +""" +from unittest.mock import patch + +import pytest + +from ingest_envoy.launchers import IngestCalibrationLauncher + +parameters = { + "sdmId": "16B-069_sb32814386_1_001.57685.66193635417", + "workflowName": "std_calibration", + "spl": "uid://evla/execblock/48ba4c9d-d7c7-4a8f-9803-1115cd52459b", + "processingStart": "2021_07_06T21_50_48", + "destinationDir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmp576dknjp", + "workflowDir": "tmp576dknjp", + "staging_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/staging", + "storage_area": "/lustre/aoc/cluster/pipeline/docker/workspaces/storage", +} + + +class TestIngestCalibrationLauncher: + @pytest.mark.skip("Skip until manifest builder is complete") + @patch("ingest_envoy.ingestion_manifest.IngestionManifest.create") + @patch("subprocess.run") + def test_launch_ingestion(self, mock_run, mock_manifest): + IngestCalibrationLauncher(parameters).launch_ingestion() + assert mock_run.call_count == 2 + assert mock_manifest.call_count == 1 + + @patch("subprocess.run") + def test_run_ingest(self, mock_run): + IngestCalibrationLauncher(parameters).run_ingest() + assert mock_run.call_count == 1 + + @pytest.mark.skip("Skip until manifest builder is complete") + @patch("ingest_envoy.ingestion_manifest.IngestionManifest.create") + @patch("subprocess.run") + def test_prepare_for_ingest(self, mock_run, mock_manifest): + IngestCalibrationLauncher(parameters).prepare_for_ingest() + assert mock_run.call_count == 1 + assert mock_manifest.call_count == 1 + + @patch("subprocess.run") + def test_run_collection_script(self, mock_run): + IngestCalibrationLauncher(parameters).run_collection_script() + assert mock_run.call_count == 1 + + @pytest.mark.skip("Skip until manifest builder is complete") + @patch("ingest_envoy.ingestion_manifest.IngestionManifest.create") + def test_create_manifest(self, mock_manifest): + IngestCalibrationLauncher(parameters).create_manifest() + assert mock_manifest.call_count == 1 diff --git a/apps/cli/executables/pexable/ingest_envoy/test/test_solicitor.py b/apps/cli/executables/pexable/ingest_envoy/test/test_solicitor.py new file mode 100644 index 000000000..d7dd0fd19 --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/test/test_solicitor.py @@ -0,0 +1,51 @@ +""" +Tests for solicitor.py +""" +from ingest_envoy.solicitor import Solicitor + +solicitor = Solicitor("test/input_files/test-metadata.json") +expected_metadata = { + "fileSetIds": "16B-069_sb32814386_1_001.57685.66193635417", + "workflowName": "std_calibration", + "systemId": "2", + "creationTime": "2021-07-06T21:50:48", + "productLocator": "uid://evla/execblock/48ba4c9d-d7c7-4a8f-9803-1115cd52459b", + "projectMetadata": { + "projectCode": "16B-069", + "title": "JVLA monitoring of an in-progress extreme scattering event", + "startTime": "57685.661952546296", + "observer": "Keith Bannister", + }, + "destinationDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmp576dknjp", +} + + +class TestSolicitor: + def test_solicit_contents(self): + metadata = solicitor.solicit_contents() + assert metadata == expected_metadata + + def test_solicit_workflow_directory_name(self): + expected_workflow_dir_name = "tmp576dknjp" + + name = solicitor.solicit_workflow_directory_name() + assert name == expected_workflow_dir_name + + def test_solicit_processing_creation_time(self): + expected_corrected_time = "2021_07_06T21_50_48" + + time_string = solicitor.solicit_processing_creation_time() + assert time_string == expected_corrected_time + + def test_solicit_parameters(self): + expected_parameters = { + "sdmId": "16B-069_sb32814386_1_001.57685.66193635417", + "workflowName": "std_calibration", + "spl": "uid://evla/execblock/48ba4c9d-d7c7-4a8f-9803-1115cd52459b", + "processingStart": "2021_07_06T21_50_48", + "destinationDir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmp576dknjp", + "workflowDir": "tmp576dknjp", + } + + parameters = solicitor.solicit_parameters() + assert parameters == expected_parameters diff --git a/docker.properties b/docker.properties index d6238b371..87ef477da 100644 --- a/docker.properties +++ b/docker.properties @@ -35,6 +35,12 @@ edu.nrao.archive.workflow.config.DeliverySettings.nraoDownloadDirectory = /tmp/d edu.nrao.archive.workflow.config.DeliverySettings.nraoDownloadUrl = http://localhost:4444/dl +# +# Ingestion Settings +# +edu.nrao.archive.workspaces.IngestionSettings.stagingDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/staging +edu.nrao.archive.workspaces.IngestionSettings.storageDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/storage + # # Standard Calibration Settings # diff --git a/schema/versions/a7c2b4682aae_add_workflow_creation_time_to_metadata_.py b/schema/versions/a7c2b4682aae_add_workflow_creation_time_to_metadata_.py new file mode 100644 index 000000000..4bd0d383e --- /dev/null +++ b/schema/versions/a7c2b4682aae_add_workflow_creation_time_to_metadata_.py @@ -0,0 +1,162 @@ +"""add workflow creation time to metadata.json + +Revision ID: a7c2b4682aae +Revises: 758525350770 +Create Date: 2021-07-07 15:42:31.380435 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "a7c2b4682aae" +down_revision = "758525350770" +branch_labels = None +depends_on = None + + +def upgrade(): + cal_content = """{ + "fileSetIds": "{{sdmId}}", + "workflowName": "std_calibration", + "systemId": "{{request_id}}", + "creationTime": "{{created_at}}", + "productLocator": "{{product_locator}}", + "projectMetadata": { + "projectCode": "{{projectCode}}", + "title": "{{title}}", + "startTime": "{{startTime}}", + "observer": "{{observer}}" + }, + "destinationDirectory": "{{root_directory}}/{{relative_path}}" +} + """ + + cms_content = """{ + "fileSetIds": "{{sdmId}}", + "workflowName": "std_cms_imaging", + "systemId": "{{request_id}}", + "creationTime": "{{created_at}}", + "productLocator": "{{product_locator}}", + "projectMetadata": { + "projectCode": "{{projectCode}}", + "title": "{{title}}", + "startTime": "{{startTime}}", + "observer": "{{observer}}" + }, + "destinationDirectory": "{{root_directory}}/{{relative_path}}", + "calibrationSourceDirectory":"{{cms_path}}", + "cmsName":"{{sdmId}}.ms" +} + + """ + + condor_content = """executable = ingest_cal.sh +arguments = metadata.json + +output = ingest.out +error = ingest.err +log = condor.log + +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +should_transfer_files = yes +transfer_input_files = $(SBIN_PATH)/conveyor, $(SBIN_PATH)/ingest_envoy, $(SBIN_PATH)/ingest, $(SBIN_PATH)/calibration-table-collector.sh, ./metadata.json + +getenv = True +environment = "CAPO_PATH=/home/casa/capo" + +queue + +""" + + op.execute( + f""" + UPDATE workflow_templates + SET content = E'{cal_content}' WHERE filename='metadata.json' AND workflow_name='std_calibration' + """ + ) + op.execute( + f""" + UPDATE workflow_templates + SET content = E'{cms_content}' WHERE filename='metadata.json' AND workflow_name='std_cms_imaging' + """ + ) + op.execute( + f""" + UPDATE workflow_templates + SET content = E'{condor_content}' WHERE filename='ingest_cal.condor' + """ + ) + + +def downgrade(): + old_cal_content = """{ + "fileSetIds": "{{sdmId}}", + "workflowName": "std_calibration", + "systemId": "{{request_id}}", + "productLocator": "{{product_locator}}", + "projectMetadata": { + "projectCode": "{{projectCode}}", + "title": "{{title}}", + "startTime": "{{startTime}}", + "observer": "{{observer}}" + }, + "destinationDirectory": "{{root_directory}}/{{relative_path}}" +} + """ + + old_cms_content = """{ + "fileSetIds": "{{sdmId}}", + "workflowName": "std_cms_imaging", + "systemId": "{{request_id}}", + "productLocator": "{{product_locator}}", + "projectMetadata": { + "projectCode": "{{projectCode}}", + "title": "{{title}}", + "startTime": "{{startTime}}", + "observer": "{{observer}}" + }, + "destinationDirectory": "{{root_directory}}/{{relative_path}}", + "calibrationSourceDirectory":"{{cms_path}}", + "cmsName":"{{sdmId}}.ms" +} + + """ + + old_condor_content = """executable = ingest_cal.sh +arguments = metadata.json + +output = ingest.out +error = ingest.err +log = condor.log + +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +should_transfer_files = yes +transfer_input_files = $(SBIN_PATH)/conveyor, $(SBIN_PATH)/ingest_envoy, $(SBIN_PATH)/calibration-table-collector.sh, ./metadata.json + +getenv = True +environment = "CAPO_PATH=/home/casa/capo" + +queue + + """ + + op.execute( + f""" + UPDATE workflow_templates + SET content = E'{old_cal_content}' WHERE filename='metadata.json' AND workflow_name='std_calibration' + """ + ) + op.execute( + f""" + UPDATE workflow_templates + SET content = E'{old_cms_content}' WHERE filename='metadata.json' AND workflow_name='std_cms_imaging' + """ + ) + op.execute( + f""" + UPDATE workflow_templates + SET content = E'{old_condor_content}' WHERE filename='ingest_cal.condor' + """ + ) -- GitLab