From 3b7efd235b44d9209becb9fba72e53a9c333bc42 Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Wed, 25 Sep 2024 14:32:32 -0400 Subject: [PATCH 01/17] Alembic migration for replacing ingest_cal and ingest_image workflows with ingest_products --- .../7ba77453966a_ingest_products_workflow.py | 155 ++++++++++++++++++ .../ingest_cal/ingest_cal_condor_2.8.5.txt | 17 ++ .../ingest_cal/ingest_cal_sh_2.8.5.txt | 7 + .../ingest_image_aux_image_metadata_2.8.5.txt | 13 ++ .../ingest_image_condor_2.8.5.txt | 17 ++ .../ingest_image/ingest_image_sh_2.8.5.txt | 7 + ...gest_products_aux_image_metadata_2.8.6.txt | 13 ++ .../ingest_products_condor_2.8.6.txt | 17 ++ .../ingest_products_sh_2.8.6.txt | 10 ++ 9 files changed, 256 insertions(+) create mode 100644 shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py create mode 100644 shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_condor_2.8.5.txt create mode 100644 shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_sh_2.8.5.txt create mode 100644 shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_aux_image_metadata_2.8.5.txt create mode 100644 shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_condor_2.8.5.txt create mode 100644 shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_sh_2.8.5.txt create mode 100644 shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_aux_image_metadata_2.8.6.txt create mode 100644 shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_condor_2.8.6.txt create mode 100644 shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_sh_2.8.6.txt diff --git a/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py b/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py new file mode 100644 index 000000000..ef29bae92 --- /dev/null +++ b/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py @@ -0,0 +1,155 @@ +# Copyright (C) 2023 Associated Universities, Inc. Washington DC, USA. +# +# This file is part of NRAO Workspaces. +# +# Workspaces is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Workspaces is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Workspaces. If not, see <https://www.gnu.org/licenses/>. +# +"""ingest products workflow + +Revision ID: 7ba77453966a +Revises: 44041d0425b7 +Create Date: 2024-09-23 14:28:25.260873 + +""" +from pathlib import Path + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "7ba77453966a" +down_revision = "44041d0425b7" +branch_labels = None +depends_on = None + + +new_wf_name = "ingest_products" +old_wf_names = ["ingest_cal", "ingest_image"] + + +def set_content(wf_name: str, filename: str) -> str: + return (Path.cwd() / "versions" / "templates" / wf_name / filename).read_text() + + +def upgrade(): + """ + Deletes the old 'ingest_cal' and 'ingest_image' workflows in favor of the combined 'ingest_products' workflow. + """ + conn = op.get_bind() + + # Insert workflow + op.execute( + f""" + INSERT INTO workflows (workflow_name, requires_lustre) VALUES (E'{new_wf_name}', true) + """ + ) + + # Insert new templates + conn.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES (%s, %s, %s) + """, + new_wf_name + ".condor", + set_content(f"{new_wf_name}_condor_2.8.6.txt"), + new_wf_name, + ) + + conn.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES (%s, %s, %s) + """, + new_wf_name + ".sh", + set_content(f"{new_wf_name}_sh_2.8.6.txt"), + new_wf_name, + ) + + conn.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES (%s, %s, %s) + """, + "aux_image_metadata.json", + set_content(f"{new_wf_name}_aux_image_metadata_2.8.6.txt"), + new_wf_name, + ) + + # Delete old workflows and templates + for old_wf_name in old_wf_names: + op.execute( + f""" + DELETE FROM workflow_templates WHERE workflow_name = E'{old_wf_name}' + """ + ) + + op.execute( + f""" + DELETE FROM workflows WHERE workflow_name = E'{old_wf_name}' + """ + ) + + +def downgrade(): + conn = op.get_bind() + + op.execute( + f""" + DELETE FROM workflow_templates WHERE workflow_name = E'{new_wf_name}' + """ + ) + + op.execute( + f""" + DELETE FROM workflows WHERE workflow_name = E'{new_wf_name}' + """ + ) + + for old_wf_name in old_wf_names: + op.execute( + f""" + INSERT INTO workflows (workflow_name, requires_lustre) VALUES (E'{old_wf_name}', true) + """ + ) + + conn.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES (%s, %s, %s) + """, + old_wf_name + ".condor", + set_content(f"{old_wf_name}_condor_2.8.5.txt"), + old_wf_name, + ) + + conn.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES (%s, %s, %s) + """, + old_wf_name + ".sh", + set_content(f"{old_wf_name}_sh_2.8.5.txt"), + old_wf_name, + ) + + if "image" in old_wf_name: + conn.execute( + f""" + INSERT INTO workflow_templates (filename, content, workflow_name) + VALUES (%s, %s, %s) + """, + "aux_image_metadata.json", + set_content(f"{old_wf_name}_aux_image_metadata_2.8.5.txt"), + old_wf_name, + ) diff --git a/shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_condor_2.8.5.txt b/shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_condor_2.8.5.txt new file mode 100644 index 000000000..da8cbc741 --- /dev/null +++ b/shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_condor_2.8.5.txt @@ -0,0 +1,17 @@ +executable = ingest_cal.sh +arguments = metadata.json + +output = ingest.out +error = ingest.err +log = condor.log + +should_transfer_files = NO ++WantIOProxy = True + +request_memory = {{ramInGb}} +getenv = True +environment = "CAPO_PATH=/home/casa/capo CFLAGS=-I/usr/include/mysql LDFLAGS=-L/usr/lib64/mysql ORACLE_HOME=/home/ssa/share/oracle/instantclient_21_1 LD_LIBRARY_PATH=/home/ssa/share/oracle/instantclient_21_1 PATH=/bin:/usr/bin:$(PATH)/home/ssa/share/oracle/instantclient_21_1" + +requirements = HasLustre == True + +queue diff --git a/shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_sh_2.8.5.txt b/shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_sh_2.8.5.txt new file mode 100644 index 000000000..a6dea7029 --- /dev/null +++ b/shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_sh_2.8.5.txt @@ -0,0 +1,7 @@ +#!/bin/sh +set -o errexit + +SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin + +${SBIN_PATH}/conveyor --retrieve $1 +${SBIN_PATH}/ingest_envoy --calibration $1 diff --git a/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_aux_image_metadata_2.8.5.txt b/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_aux_image_metadata_2.8.5.txt new file mode 100644 index 000000000..4df18aaa4 --- /dev/null +++ b/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_aux_image_metadata_2.8.5.txt @@ -0,0 +1,13 @@ +{ + "project_code": "{{projectCode}}", + "band_code": "{{bands}}", + "configurations": "{{configurations}}", + "starttime": null, + "endtime": null, + "exposure_time": null, + "rms_noise": null, + "image_tags": "", + "product_tags": "", + "collection_name": "", + "calibration_level": 2 +} diff --git a/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_condor_2.8.5.txt b/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_condor_2.8.5.txt new file mode 100644 index 000000000..6583cc7d1 --- /dev/null +++ b/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_condor_2.8.5.txt @@ -0,0 +1,17 @@ +executable = ingest_image.sh +arguments = metadata.json + +output = ingest.out +error = ingest.err +log = condor.log + +should_transfer_files = NO ++WantIOProxy = True + +request_memory = {{ramInGb}} +getenv = True +environment = "CAPO_PATH=/home/casa/capo" + +requirements = HasLustre == True + +queue diff --git a/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_sh_2.8.5.txt b/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_sh_2.8.5.txt new file mode 100644 index 000000000..831142224 --- /dev/null +++ b/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_sh_2.8.5.txt @@ -0,0 +1,7 @@ +#!/bin/sh +set -o errexit + +SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin + +${SBIN_PATH}/conveyor --retrieve-img $1 +${SBIN_PATH}/ingest_envoy --image $1 diff --git a/shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_aux_image_metadata_2.8.6.txt b/shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_aux_image_metadata_2.8.6.txt new file mode 100644 index 000000000..4df18aaa4 --- /dev/null +++ b/shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_aux_image_metadata_2.8.6.txt @@ -0,0 +1,13 @@ +{ + "project_code": "{{projectCode}}", + "band_code": "{{bands}}", + "configurations": "{{configurations}}", + "starttime": null, + "endtime": null, + "exposure_time": null, + "rms_noise": null, + "image_tags": "", + "product_tags": "", + "collection_name": "", + "calibration_level": 2 +} diff --git a/shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_condor_2.8.6.txt b/shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_condor_2.8.6.txt new file mode 100644 index 000000000..495ab844c --- /dev/null +++ b/shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_condor_2.8.6.txt @@ -0,0 +1,17 @@ +executable = ingest_products.sh +arguments = metadata.json + +output = ingest.out +error = ingest.err +log = condor.log + +should_transfer_files = NO ++WantIOProxy = True + +request_memory = {{ramInGb}} +getenv = True +environment = "CAPO_PATH=/home/casa/capo CFLAGS=-I/usr/include/mysql LDFLAGS=-L/usr/lib64/mysql ORACLE_HOME=/home/ssa/share/oracle/instantclient_21_1 LD_LIBRARY_PATH=/home/ssa/share/oracle/instantclient_21_1 PATH=/bin:/usr/bin:$(PATH)/home/ssa/share/oracle/instantclient_21_1" + +requirements = HasLustre == True + +queue diff --git a/shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_sh_2.8.6.txt b/shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_sh_2.8.6.txt new file mode 100644 index 000000000..fe5467bf5 --- /dev/null +++ b/shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_sh_2.8.6.txt @@ -0,0 +1,10 @@ +#!/bin/sh +set -o errexit + +SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin + +${SBIN_PATH}/conveyor --retrieve $1 + +{{#products_to_ingest}} +${SBIN_PATH}/ingest_envoy --{{.}} $1 +{{^products_to_ingest}} -- GitLab From eea71a6e7a53849df7ca70bb5ad40b1be4b4a14a Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Fri, 27 Sep 2024 09:10:51 -0400 Subject: [PATCH 02/17] Workflow's ingest endpoint now uses the single ingest_products workflow --- services/workflow/workflow/server.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index bc9003dd5..61457ae44 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -391,7 +391,7 @@ class WorkflowRequestRestService: @view_config(request_method="POST", route_name="ingest_workflow_result") def ingest(self): """ - Ingest specified workflow request's results into NGAS and archive + Ingest specified workflow request's results into NGAS and archive using the 'ingest_products' workflow. :return: """ @@ -402,9 +402,8 @@ class WorkflowRequestRestService: file = lookup_file(request=self.request) # 2. create ingestion workflow request - ingest_type = "ingest_cal" if "calibration" in self.request.matchdict["name"] else "ingest_image" ingest_request = self.request.info.create_workflow_request( - workflow=ingest_type, + workflow="ingest_products", argument={"parent_wf_request_id": self.request.matchdict["request_id"]}, ) # 3. attach metadata.json to ingestion wf request -- GitLab From ad9e5f71ce6a19b8fbe4a8b06ec99e29fb33a662 Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Fri, 27 Sep 2024 09:15:59 -0400 Subject: [PATCH 03/17] Ingest envoy changes to support cal and imaging products --- apps/cli/executables/pexable/ingest_envoy/README.md | 2 +- .../pexable/ingest_envoy/ingest_envoy/ingest.py | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/apps/cli/executables/pexable/ingest_envoy/README.md b/apps/cli/executables/pexable/ingest_envoy/README.md index 2f35f1db3..43b11a0ce 100644 --- a/apps/cli/executables/pexable/ingest_envoy/README.md +++ b/apps/cli/executables/pexable/ingest_envoy/README.md @@ -20,7 +20,7 @@ options: ``` Ingest Envoy makes use of the existing *ingest* functionality of the AAT-PPI which simply takes an -*ingestion manifest* as input. While this is consistent regaurdless of ingestion type, the manifest itself, +*ingestion manifest* as input. While this is consistent regardless of ingestion type, the manifest itself, as well as the ingestion staging requirements differ between the types of files to be ingested. For this reason, Ingest Envoy's functionality can be broken into two underlying parts: Setup and Launch. diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py index 9dbad4589..caeaa6a2b 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py @@ -26,13 +26,13 @@ from typing import Tuple, Union import requests from ingest_envoy.interfaces import LauncherIF from ingest_envoy.launchers import ( + CuratorLauncher, IngestCalibrationLauncher, IngestImageLauncher, IngestObservationLauncher, - CuratorLauncher, ) from ingest_envoy.solicitor import Solicitor -from ingest_envoy.utilities import IngestType, VLASSIngestType, CuratorType +from ingest_envoy.utilities import CuratorType, IngestType, VLASSIngestType from pycapo import CapoConfig """ @@ -152,12 +152,14 @@ def check_ingest_type(args_type: Union[IngestType, VLASSIngestType], parameters: :return: boolean representing if the requested ingestion type matches the product type """ + # Allow observations and curations if "workflowName" not in parameters and (args_type == IngestType.OBS or isinstance(args_type, CuratorType)): return True - wf_name = parameters["workflowName"] - if args_type.value in wf_name or (args_type == IngestType.IMG and "imaging" in wf_name): + # Check valid product types for product ingestion + if "products" in parameters["workflowName"] and args_type in [IngestType.CAL, IngestType.IMG]: return True + return False -- GitLab From 5fe412cf7dff2f90f4ebe98cc1d1dfa333263802 Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Tue, 1 Oct 2024 09:45:33 -0400 Subject: [PATCH 04/17] Fixed alembic migration for removing old workflows --- .../7ba77453966a_ingest_products_workflow.py | 51 ++++++++++++------- 1 file changed, 33 insertions(+), 18 deletions(-) diff --git a/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py b/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py index ef29bae92..c2e642797 100644 --- a/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py +++ b/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py @@ -62,7 +62,7 @@ def upgrade(): VALUES (%s, %s, %s) """, new_wf_name + ".condor", - set_content(f"{new_wf_name}_condor_2.8.6.txt"), + set_content(new_wf_name, f"{new_wf_name}_condor_2.8.6.txt"), new_wf_name, ) @@ -72,7 +72,7 @@ def upgrade(): VALUES (%s, %s, %s) """, new_wf_name + ".sh", - set_content(f"{new_wf_name}_sh_2.8.6.txt"), + set_content(new_wf_name, f"{new_wf_name}_sh_2.8.6.txt"), new_wf_name, ) @@ -82,12 +82,19 @@ def upgrade(): VALUES (%s, %s, %s) """, "aux_image_metadata.json", - set_content(f"{new_wf_name}_aux_image_metadata_2.8.6.txt"), + set_content(new_wf_name, f"{new_wf_name}_aux_image_metadata_2.8.6.txt"), new_wf_name, ) # Delete old workflows and templates for old_wf_name in old_wf_names: + # Need to update workflow_requests to link to the new workflow name + op.execute( + f""" + UPDATE workflow_requests SET workflow_name = E'{new_wf_name}' WHERE workflow_name = E'{old_wf_name}' + """ + ) + op.execute( f""" DELETE FROM workflow_templates WHERE workflow_name = E'{old_wf_name}' @@ -104,18 +111,7 @@ def upgrade(): def downgrade(): conn = op.get_bind() - op.execute( - f""" - DELETE FROM workflow_templates WHERE workflow_name = E'{new_wf_name}' - """ - ) - - op.execute( - f""" - DELETE FROM workflows WHERE workflow_name = E'{new_wf_name}' - """ - ) - + # Re-insert the old workflows and templates for old_wf_name in old_wf_names: op.execute( f""" @@ -129,7 +125,7 @@ def downgrade(): VALUES (%s, %s, %s) """, old_wf_name + ".condor", - set_content(f"{old_wf_name}_condor_2.8.5.txt"), + set_content(old_wf_name, f"{old_wf_name}_condor_2.8.5.txt"), old_wf_name, ) @@ -139,7 +135,7 @@ def downgrade(): VALUES (%s, %s, %s) """, old_wf_name + ".sh", - set_content(f"{old_wf_name}_sh_2.8.5.txt"), + set_content(old_wf_name, f"{old_wf_name}_sh_2.8.5.txt"), old_wf_name, ) @@ -150,6 +146,25 @@ def downgrade(): VALUES (%s, %s, %s) """, "aux_image_metadata.json", - set_content(f"{old_wf_name}_aux_image_metadata_2.8.5.txt"), + set_content(old_wf_name, f"{old_wf_name}_aux_image_metadata_2.8.5.txt"), old_wf_name, ) + + # Need to update workflow_requests to link to an old workflow. We'll lose whether this was a cal/image ingest + op.execute( + f""" + UPDATE workflow_requests SET workflow_name = E'{old_wf_name}' WHERE workflow_name = E'{new_wf_name}' + """ + ) + + op.execute( + f""" + DELETE FROM workflow_templates WHERE workflow_name = E'{new_wf_name}' + """ + ) + + op.execute( + f""" + DELETE FROM workflows WHERE workflow_name = E'{new_wf_name}' + """ + ) \ No newline at end of file -- GitLab From 97aca232d59e78c2dcf50b9c47b1bbb35037ffb2 Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Tue, 1 Oct 2024 09:47:01 -0400 Subject: [PATCH 05/17] Corrected passing product names to the ingest endpoint and then retrieving them from the request body --- services/workflow/workflow/server.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index 61457ae44..ee7c2b727 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -401,16 +401,22 @@ class WorkflowRequestRestService: self.request.matchdict["filename"] = "metadata.json" file = lookup_file(request=self.request) - # 2. create ingestion workflow request + # 2. Collect ingest arguments from the request and its body + ingest_args = {"parent_wf_request_id": self.request.matchdict["request_id"]} + body = self.request.json_body + if "products" in body and body["products"]: + ingest_args["products_to_ingest"] = body["products"] + + # 3. create ingestion workflow request ingest_request = self.request.info.create_workflow_request( workflow="ingest_products", - argument={"parent_wf_request_id": self.request.matchdict["request_id"]}, + argument=ingest_args, ) - # 3. attach metadata.json to ingestion wf request + # 4. attach metadata.json to ingestion wf request self.request.workflows.attach_file_to_request( request=ingest_request, filename=file.filename, content=file.content ) - # 4. submit ingestion workflow request + # 5. submit ingestion workflow request self.request.workflows.execute(ingest_request) @view_config(request_method="POST", route_name="ingest_vlass_products") -- GitLab From bf4e653db4abe4767dc55a5fe477b6daabe9cb75 Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Tue, 1 Oct 2024 09:48:05 -0400 Subject: [PATCH 06/17] Corrected template mustache closing tag --- .../templates/ingest_products/ingest_products_sh_2.8.6.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_sh_2.8.6.txt b/shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_sh_2.8.6.txt index fe5467bf5..43406e814 100644 --- a/shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_sh_2.8.6.txt +++ b/shared/workspaces/alembic/versions/templates/ingest_products/ingest_products_sh_2.8.6.txt @@ -7,4 +7,4 @@ ${SBIN_PATH}/conveyor --retrieve $1 {{#products_to_ingest}} ${SBIN_PATH}/ingest_envoy --{{.}} $1 -{{^products_to_ingest}} +{{/products_to_ingest}} -- GitLab From eeedd3d987e47317dc30990abccaa95bd4cf5fc7 Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Wed, 2 Oct 2024 10:15:20 -0400 Subject: [PATCH 07/17] Passing only the product name to the ingest workflow, as that's all it needs --- shared/workspaces/workspaces/capability/schema.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/shared/workspaces/workspaces/capability/schema.py b/shared/workspaces/workspaces/capability/schema.py index 0c55a4a19..89ed09aac 100644 --- a/shared/workspaces/workspaces/capability/schema.py +++ b/shared/workspaces/workspaces/capability/schema.py @@ -776,7 +776,7 @@ class QaPass(Action): products = execution.capability.products if is_partial_pass is True: products = [p for p in products if p.is_included_in_partial_pass] - products_json = [p.__json__() for p in products] + products_json = [p.product_name for p in products] response = requests.post( f"{workflow_service_url}/workflows/{execution.capability_name}/requests/{wf_req_id}/ingest", json={"products": products_json}, -- GitLab From 2ba9230975079067f4ab57089feb74d057aba001 Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Wed, 2 Oct 2024 10:43:24 -0400 Subject: [PATCH 08/17] Didn't realize workflowName here was the original workflow that calls ingest --- .../executables/pexable/ingest_envoy/ingest_envoy/ingest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py index caeaa6a2b..f7d50f7f7 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py @@ -157,7 +157,8 @@ def check_ingest_type(args_type: Union[IngestType, VLASSIngestType], parameters: return True # Check valid product types for product ingestion - if "products" in parameters["workflowName"] and args_type in [IngestType.CAL, IngestType.IMG]: + wf_name = parameters["workflowName"] + if args_type.value in wf_name or (args_type == IngestType.IMG and "imaging" in wf_name): return True return False -- GitLab From 06f11087375dce5e1d9ad13c031afc6618bf8822 Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Wed, 2 Oct 2024 14:47:16 -0400 Subject: [PATCH 09/17] No longer deleting the old workflows --- .../7ba77453966a_ingest_products_workflow.py | 74 +------------------ 1 file changed, 2 insertions(+), 72 deletions(-) diff --git a/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py b/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py index c2e642797..572716dd0 100644 --- a/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py +++ b/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py @@ -35,7 +35,6 @@ depends_on = None new_wf_name = "ingest_products" -old_wf_names = ["ingest_cal", "ingest_image"] def set_content(wf_name: str, filename: str) -> str: @@ -44,7 +43,7 @@ def set_content(wf_name: str, filename: str) -> str: def upgrade(): """ - Deletes the old 'ingest_cal' and 'ingest_image' workflows in favor of the combined 'ingest_products' workflow. + Inserts a new 'ingest_products' workflow to replace both the `ingest_cal` and `ingest_image` workflows. """ conn = op.get_bind() @@ -86,77 +85,8 @@ def upgrade(): new_wf_name, ) - # Delete old workflows and templates - for old_wf_name in old_wf_names: - # Need to update workflow_requests to link to the new workflow name - op.execute( - f""" - UPDATE workflow_requests SET workflow_name = E'{new_wf_name}' WHERE workflow_name = E'{old_wf_name}' - """ - ) - - op.execute( - f""" - DELETE FROM workflow_templates WHERE workflow_name = E'{old_wf_name}' - """ - ) - - op.execute( - f""" - DELETE FROM workflows WHERE workflow_name = E'{old_wf_name}' - """ - ) - def downgrade(): - conn = op.get_bind() - - # Re-insert the old workflows and templates - for old_wf_name in old_wf_names: - op.execute( - f""" - INSERT INTO workflows (workflow_name, requires_lustre) VALUES (E'{old_wf_name}', true) - """ - ) - - conn.execute( - f""" - INSERT INTO workflow_templates (filename, content, workflow_name) - VALUES (%s, %s, %s) - """, - old_wf_name + ".condor", - set_content(old_wf_name, f"{old_wf_name}_condor_2.8.5.txt"), - old_wf_name, - ) - - conn.execute( - f""" - INSERT INTO workflow_templates (filename, content, workflow_name) - VALUES (%s, %s, %s) - """, - old_wf_name + ".sh", - set_content(old_wf_name, f"{old_wf_name}_sh_2.8.5.txt"), - old_wf_name, - ) - - if "image" in old_wf_name: - conn.execute( - f""" - INSERT INTO workflow_templates (filename, content, workflow_name) - VALUES (%s, %s, %s) - """, - "aux_image_metadata.json", - set_content(old_wf_name, f"{old_wf_name}_aux_image_metadata_2.8.5.txt"), - old_wf_name, - ) - - # Need to update workflow_requests to link to an old workflow. We'll lose whether this was a cal/image ingest - op.execute( - f""" - UPDATE workflow_requests SET workflow_name = E'{old_wf_name}' WHERE workflow_name = E'{new_wf_name}' - """ - ) - op.execute( f""" DELETE FROM workflow_templates WHERE workflow_name = E'{new_wf_name}' @@ -167,4 +97,4 @@ def downgrade(): f""" DELETE FROM workflows WHERE workflow_name = E'{new_wf_name}' """ - ) \ No newline at end of file + ) -- GitLab From 947aa669f148a818bfe241e4e6e05bf49592d2af Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Wed, 2 Oct 2024 14:48:23 -0400 Subject: [PATCH 10/17] Removed ingest_cal and ingest_image workflow templates, we'll grab them when we need them --- .../ingest_cal/ingest_cal_condor_2.8.5.txt | 17 ----------------- .../ingest_cal/ingest_cal_sh_2.8.5.txt | 7 ------- .../ingest_image_aux_image_metadata_2.8.5.txt | 13 ------------- .../ingest_image/ingest_image_condor_2.8.5.txt | 17 ----------------- .../ingest_image/ingest_image_sh_2.8.5.txt | 7 ------- 5 files changed, 61 deletions(-) delete mode 100644 shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_condor_2.8.5.txt delete mode 100644 shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_sh_2.8.5.txt delete mode 100644 shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_aux_image_metadata_2.8.5.txt delete mode 100644 shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_condor_2.8.5.txt delete mode 100644 shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_sh_2.8.5.txt diff --git a/shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_condor_2.8.5.txt b/shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_condor_2.8.5.txt deleted file mode 100644 index da8cbc741..000000000 --- a/shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_condor_2.8.5.txt +++ /dev/null @@ -1,17 +0,0 @@ -executable = ingest_cal.sh -arguments = metadata.json - -output = ingest.out -error = ingest.err -log = condor.log - -should_transfer_files = NO -+WantIOProxy = True - -request_memory = {{ramInGb}} -getenv = True -environment = "CAPO_PATH=/home/casa/capo CFLAGS=-I/usr/include/mysql LDFLAGS=-L/usr/lib64/mysql ORACLE_HOME=/home/ssa/share/oracle/instantclient_21_1 LD_LIBRARY_PATH=/home/ssa/share/oracle/instantclient_21_1 PATH=/bin:/usr/bin:$(PATH)/home/ssa/share/oracle/instantclient_21_1" - -requirements = HasLustre == True - -queue diff --git a/shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_sh_2.8.5.txt b/shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_sh_2.8.5.txt deleted file mode 100644 index a6dea7029..000000000 --- a/shared/workspaces/alembic/versions/templates/ingest_cal/ingest_cal_sh_2.8.5.txt +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/sh -set -o errexit - -SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin - -${SBIN_PATH}/conveyor --retrieve $1 -${SBIN_PATH}/ingest_envoy --calibration $1 diff --git a/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_aux_image_metadata_2.8.5.txt b/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_aux_image_metadata_2.8.5.txt deleted file mode 100644 index 4df18aaa4..000000000 --- a/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_aux_image_metadata_2.8.5.txt +++ /dev/null @@ -1,13 +0,0 @@ -{ - "project_code": "{{projectCode}}", - "band_code": "{{bands}}", - "configurations": "{{configurations}}", - "starttime": null, - "endtime": null, - "exposure_time": null, - "rms_noise": null, - "image_tags": "", - "product_tags": "", - "collection_name": "", - "calibration_level": 2 -} diff --git a/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_condor_2.8.5.txt b/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_condor_2.8.5.txt deleted file mode 100644 index 6583cc7d1..000000000 --- a/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_condor_2.8.5.txt +++ /dev/null @@ -1,17 +0,0 @@ -executable = ingest_image.sh -arguments = metadata.json - -output = ingest.out -error = ingest.err -log = condor.log - -should_transfer_files = NO -+WantIOProxy = True - -request_memory = {{ramInGb}} -getenv = True -environment = "CAPO_PATH=/home/casa/capo" - -requirements = HasLustre == True - -queue diff --git a/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_sh_2.8.5.txt b/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_sh_2.8.5.txt deleted file mode 100644 index 831142224..000000000 --- a/shared/workspaces/alembic/versions/templates/ingest_image/ingest_image_sh_2.8.5.txt +++ /dev/null @@ -1,7 +0,0 @@ -#!/bin/sh -set -o errexit - -SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin - -${SBIN_PATH}/conveyor --retrieve-img $1 -${SBIN_PATH}/ingest_envoy --image $1 -- GitLab From 288869938861976d4e58155b131a24255d344e67 Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Thu, 3 Oct 2024 10:43:33 -0400 Subject: [PATCH 11/17] Alembic down revision update --- .../alembic/versions/7ba77453966a_ingest_products_workflow.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py b/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py index 572716dd0..0435af6b0 100644 --- a/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py +++ b/shared/workspaces/alembic/versions/7ba77453966a_ingest_products_workflow.py @@ -18,7 +18,7 @@ """ingest products workflow Revision ID: 7ba77453966a -Revises: 44041d0425b7 +Revises: 58690eec160b Create Date: 2024-09-23 14:28:25.260873 """ @@ -29,7 +29,7 @@ from alembic import op # revision identifiers, used by Alembic. revision = "7ba77453966a" -down_revision = "44041d0425b7" +down_revision = "58690eec160b" branch_labels = None depends_on = None -- GitLab From faded82818995ce16eb2a82a5a45f522cc3dc98d Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Thu, 3 Oct 2024 16:28:41 -0400 Subject: [PATCH 12/17] Hitting up the archive-service for the calibration SPL if it's not in the metadata file --- .../ingest_envoy/ingest_envoy/ingest.py | 5 +++- .../ingest_envoy/ingest_envoy/solicitor.py | 26 ++++++++++++++++++- docker.properties | 1 + 3 files changed, 30 insertions(+), 2 deletions(-) diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py index f7d50f7f7..56b5778a2 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py @@ -63,8 +63,11 @@ def _get_settings( script_location = CapoConfig().settings("edu.nrao.workspaces.ProcessingSettings").scriptLocation capability_url = CapoConfig().settings("edu.nrao.workspaces.CapabilitySettings").externalServiceUrl workflow_url = CapoConfig().settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl + archive_url = ingestion_settings.findLocatorServiceUrlPrefix - parameters = Solicitor(arg_type, [capability_url, workflow_url], filename=filename).solicit_parameters() + parameters = Solicitor( + arg_type, [capability_url, workflow_url, archive_url], filename=filename + ).solicit_parameters() if cal_spl is not None: parameters["calSpl"] = cal_spl diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py index 0c488107e..476e36f61 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py @@ -40,6 +40,7 @@ class Solicitor: self.argument = arg_type self.capability_url = urls[0] self.workflow_url = urls[1] + self.archive_url = urls[2] @staticmethod def solicit_contents(filename: str) -> dict: @@ -150,6 +151,29 @@ class Solicitor: creation_time = self.metadata["creationTime"].replace("-", "_").replace(":", "_") return creation_time + def solicit_calibration_product_locator(self, sdm_id: str) -> str | None: + """ + Returns the calibration product locator from the metadata or requests it from the archive service if needed. + + :param sdm_id: SDM ID of the exec block + :return: Calibration product locator or None if not found + """ + cal_locator = None + + if "calProductLocator" in self.metadata: + cal_locator = self.metadata["calProductLocator"] + else: + # Make a call to the archive service to get the cal SPL if it's not in the metadata + get_cal_locator_url = f"{self.archive_url}={sdm_id}.%" + response = requests.get(get_cal_locator_url) + + if response.status_code == http.HTTPStatus.OK and "locator" in response.json(): + cal_locator = response.json()["locator"] + else: + self.logger.info("Failed to fetch calibration SPL from the archive service") + + return cal_locator + def get_general_params(self) -> dict: """ Determine general ingestion parameters (WS only) @@ -198,7 +222,7 @@ class Solicitor: general = self.get_general_params() img = { - "calSpl": self.metadata["calProductLocator"], # image only + "calSpl": self.solicit_calibration_product_locator(general["sdmId"]), # image only "spoolDir": self.metadata["destinationDirectory"], } diff --git a/docker.properties b/docker.properties index f77c5adcb..37c90a5c3 100644 --- a/docker.properties +++ b/docker.properties @@ -58,6 +58,7 @@ edu.nrao.workspaces.VlassDeliverySettings.cache = /lustre/aoc/cluster/pipeline/v edu.nrao.workspaces.IngestionSettings.useIngest = False edu.nrao.workspaces.IngestionSettings.stagingDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/staging edu.nrao.workspaces.IngestionSettings.storageDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/storage +edu.nrao.workspaces.IngestionSettings.findLocatorServiceUrlPrefix = https://webtest.aoc.nrao.edu/archive-service/find_locator?identifier archive-ingestion.ingestNGAS.observation = False archive-ingestion.ingestNGAS.calibration = False archive-ingestion.ingestNGAS.imaging = False -- GitLab From 4df8ddaf9145403d1d3d1dd2c6d63b5c773c669a Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Thu, 3 Oct 2024 17:02:22 -0400 Subject: [PATCH 13/17] Added logging --- .../executables/pexable/ingest_envoy/ingest_envoy/solicitor.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py index 476e36f61..d020e4c81 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py @@ -164,11 +164,13 @@ class Solicitor: cal_locator = self.metadata["calProductLocator"] else: # Make a call to the archive service to get the cal SPL if it's not in the metadata + self.logger.info("Cal SPL not in the metadata, requesting it from the archive service") get_cal_locator_url = f"{self.archive_url}={sdm_id}.%" response = requests.get(get_cal_locator_url) if response.status_code == http.HTTPStatus.OK and "locator" in response.json(): cal_locator = response.json()["locator"] + self.logger.info(f"Obtained cal SPL from the archive service: {cal_locator}") else: self.logger.info("Failed to fetch calibration SPL from the archive service") -- GitLab From 487dfd41da3c2617c3ed10db95fce73feb9d8cad Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Fri, 4 Oct 2024 09:11:46 -0400 Subject: [PATCH 14/17] Moved the cal spl lookup into its own function so it can be mocked, added a unit test for solicitor without a given cal spl --- .../ingest_envoy/ingest_envoy/solicitor.py | 28 ++++++----- .../test-image-metadata-nocal.json | 17 +++++++ .../ingest_envoy/test/test_solicitor.py | 49 +++++++++++++------ 3 files changed, 68 insertions(+), 26 deletions(-) create mode 100644 apps/cli/executables/pexable/ingest_envoy/test/input_files/test-image-metadata-nocal.json diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py index d020e4c81..395a52150 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py @@ -158,23 +158,29 @@ class Solicitor: :param sdm_id: SDM ID of the exec block :return: Calibration product locator or None if not found """ - cal_locator = None - if "calProductLocator" in self.metadata: - cal_locator = self.metadata["calProductLocator"] + return self.metadata["calProductLocator"] else: # Make a call to the archive service to get the cal SPL if it's not in the metadata self.logger.info("Cal SPL not in the metadata, requesting it from the archive service") - get_cal_locator_url = f"{self.archive_url}={sdm_id}.%" - response = requests.get(get_cal_locator_url) + return self.lookup_calibration_product_locator(sdm_id) - if response.status_code == http.HTTPStatus.OK and "locator" in response.json(): - cal_locator = response.json()["locator"] - self.logger.info(f"Obtained cal SPL from the archive service: {cal_locator}") - else: - self.logger.info("Failed to fetch calibration SPL from the archive service") + def lookup_calibration_product_locator(self, sdm_id: str) -> str | None: + """ + Requests the calibration product locator from the archive service using the given exec block SDM ID. - return cal_locator + :param sdm_id: SDM ID of the exec block + :return: Calibration product locator or None if not found + """ + get_cal_locator_url = f"{self.archive_url}={sdm_id}.%" + response = requests.get(get_cal_locator_url) + + if response.status_code == http.HTTPStatus.OK and "locator" in response.json(): + self.logger.info(f"Obtained cal SPL from the archive service: {response.json()['locator']}") + return response.json()["locator"] + else: + self.logger.info("Failed to fetch calibration SPL from the archive service") + return None def get_general_params(self) -> dict: """ diff --git a/apps/cli/executables/pexable/ingest_envoy/test/input_files/test-image-metadata-nocal.json b/apps/cli/executables/pexable/ingest_envoy/test/input_files/test-image-metadata-nocal.json new file mode 100644 index 000000000..54b7c2e41 --- /dev/null +++ b/apps/cli/executables/pexable/ingest_envoy/test/input_files/test-image-metadata-nocal.json @@ -0,0 +1,17 @@ +{ + "fileSetIds": "brain_000.58099.67095825232", + "workflowName": "std_cms_imaging", + "systemId": "12", + "creationTime": "2021-07-29T14:26:31", + "productLocator": "uid://evla/execblock/ec082e65-452d-4fec-ad88-f5b4af1f9e36", + "projectMetadata": { + "projectCode": "Operations", + "title": "", + "telescope": "EVLA", + "startTime": "58099.6710792824", + "observer": "VLA Operations" + }, + "destinationDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmprb1se376", + "calibrationSourceDirectory":"/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmp7bmd17zp/working", + "cmsName":"brain_000.58099.67095825232.ms" +} diff --git a/apps/cli/executables/pexable/ingest_envoy/test/test_solicitor.py b/apps/cli/executables/pexable/ingest_envoy/test/test_solicitor.py index 489226ee1..8d1f3b172 100644 --- a/apps/cli/executables/pexable/ingest_envoy/test/test_solicitor.py +++ b/apps/cli/executables/pexable/ingest_envoy/test/test_solicitor.py @@ -27,7 +27,7 @@ from ingest_envoy.solicitor import Solicitor from ingest_envoy.utilities import CuratorType, IngestType SOLICIT_WORKFLOWS_PATCH = "ingest_envoy.solicitor.Solicitor.solicit_contents" -SOLICITOR_URLS = ["http://capability:3457", "http://workflow:3456"] +SOLICITOR_URLS = ["http://capability:3457", "http://workflow:3456", "fake-archive-service"] @pytest.fixture(scope="function") @@ -42,6 +42,12 @@ def image_solicitor() -> Solicitor: return Solicitor(IngestType.IMG, SOLICITOR_URLS, filename=image_filename) +@pytest.fixture(scope="function") +def image_solicitor_nocal() -> Solicitor: + image_filename = "test/input_files/test-image-nocal-metadata.json" + return Solicitor(IngestType.IMG, SOLICITOR_URLS, filename=image_filename) + + expected_metadata = { "fileSetIds": "16B-069_sb32814386_1_001.57685.66193635417", "workflowName": "std_calibration", @@ -58,6 +64,20 @@ expected_metadata = { "destinationDirectory": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmp576dknjp", } +expected_image_metadata = { + "sdmId": "brain_000.58099.67095825232", + "telescope": "EVLA", + "workflowName": "std_cms_imaging", + "processingStart": "2021_07_29T14_26_31", + "workflowDir": "tmprb1se376", + "initialVersionDir": "iamthefirst", + "multiVersion": True, + "calSpl": "uid://evla/calibration/c78ccfd6-fe4e-43c6-a5c5-70e5bcfde16b", + "project": "Operations", + "requestId": "12", + "spoolDir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmprb1se376", +} + class TestSolicitor: def test_solicit_contents(self, solicitor: Solicitor): @@ -95,22 +115,21 @@ class TestSolicitor: def test_solicit_parameters_image(self, image_solicitor: Solicitor): initial_version_dir = "iamthefirst" - metadata = { - "sdmId": "brain_000.58099.67095825232", - "telescope": "EVLA", - "workflowName": "std_cms_imaging", - "processingStart": "2021_07_29T14_26_31", - "workflowDir": "tmprb1se376", - "initialVersionDir": "iamthefirst", - "multiVersion": True, - "calSpl": "uid://evla/calibration/c78ccfd6-fe4e-43c6-a5c5-70e5bcfde16b", - "project": "Operations", - "requestId": "12", - "spoolDir": "/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmprb1se376", - } with patch("ingest_envoy.solicitor.Solicitor.solicit_initial_directory_name", return_value=initial_version_dir): parameters = image_solicitor.solicit_parameters() - assert parameters == metadata + assert parameters == expected_image_metadata + + def test_solicit_parameters_image_without_cal_spl(self, image_solicitor_nocal: Solicitor): + initial_version_dir = "iamthefirst" + with ( + patch("ingest_envoy.solicitor.Solicitor.solicit_initial_directory_name", return_value=initial_version_dir), + patch( + "ingest_envoy.solicitor.Solicitor.lookup_calibration_product_locator", + return_value="uid://evla/calibration/c78ccfd6-fe4e-43c6-a5c5-70e5bcfde16b", + ), + ): + parameters = image_solicitor_nocal.solicit_parameters() + assert parameters == expected_image_metadata def test_solicit_parameters_full_curation_image_file_list(self): filename = "test/input_files/test-full-curation-image-evla-metadata.json" -- GitLab From 47d1319d5bcfb1126f0fc4fe0ce624d643485686 Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Fri, 4 Oct 2024 09:15:09 -0400 Subject: [PATCH 15/17] Wrong filename --- ...t-image-metadata-nocal.json => test-image-nocal-metadata.json} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename apps/cli/executables/pexable/ingest_envoy/test/input_files/{test-image-metadata-nocal.json => test-image-nocal-metadata.json} (100%) diff --git a/apps/cli/executables/pexable/ingest_envoy/test/input_files/test-image-metadata-nocal.json b/apps/cli/executables/pexable/ingest_envoy/test/input_files/test-image-nocal-metadata.json similarity index 100% rename from apps/cli/executables/pexable/ingest_envoy/test/input_files/test-image-metadata-nocal.json rename to apps/cli/executables/pexable/ingest_envoy/test/input_files/test-image-nocal-metadata.json -- GitLab From 002f0f6787c786df28d2af33bef10563e9959b27 Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Fri, 4 Oct 2024 12:35:54 -0400 Subject: [PATCH 16/17] Throwing an exception instead of letting ingest falsely run if we don't have a cal SPL --- .../ingest_envoy/ingest_envoy/solicitor.py | 16 ++++++++++------ .../ingest_envoy/ingest_envoy/utilities.py | 4 ++++ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py index 395a52150..8c7b66fae 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py @@ -25,7 +25,12 @@ import pathlib from typing import List, Union import requests -from ingest_envoy.utilities import CuratorType, IngestType, VLASSIngestType +from ingest_envoy.utilities import ( + CuratorType, + IngestType, + InvalidCalLocatorException, + VLASSIngestType, +) INVALID_INITIAL_VERSION = "Initial version not valid for ingest" @@ -151,7 +156,7 @@ class Solicitor: creation_time = self.metadata["creationTime"].replace("-", "_").replace(":", "_") return creation_time - def solicit_calibration_product_locator(self, sdm_id: str) -> str | None: + def solicit_calibration_product_locator(self, sdm_id: str) -> str: """ Returns the calibration product locator from the metadata or requests it from the archive service if needed. @@ -165,12 +170,12 @@ class Solicitor: self.logger.info("Cal SPL not in the metadata, requesting it from the archive service") return self.lookup_calibration_product_locator(sdm_id) - def lookup_calibration_product_locator(self, sdm_id: str) -> str | None: + def lookup_calibration_product_locator(self, sdm_id: str) -> str: """ Requests the calibration product locator from the archive service using the given exec block SDM ID. :param sdm_id: SDM ID of the exec block - :return: Calibration product locator or None if not found + :return: Calibration product locator """ get_cal_locator_url = f"{self.archive_url}={sdm_id}.%" response = requests.get(get_cal_locator_url) @@ -179,8 +184,7 @@ class Solicitor: self.logger.info(f"Obtained cal SPL from the archive service: {response.json()['locator']}") return response.json()["locator"] else: - self.logger.info("Failed to fetch calibration SPL from the archive service") - return None + raise InvalidCalLocatorException("Failed to fetch calibration SPL from the archive service") def get_general_params(self) -> dict: """ diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/utilities.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/utilities.py index 34c620a44..39e439c03 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/utilities.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/utilities.py @@ -105,6 +105,10 @@ class InvalidLocatorException(Exception): """Throw this if we're fed a bad science product locator""" +class InvalidCalLocatorException(Exception): + """Throw this if we weren't fed a calibration locator and weren't able to fetch one from the archive service""" + + class IngestionManifestException(Exception): """Throw this if we're unable to construct an ingestion manifest using supplied inputs""" -- GitLab From 5de85332737d2deebc10b4e1753ca29a4b90d030 Mon Sep 17 00:00:00 2001 From: Daniel Nemergut <dnemergu@nrao.edu> Date: Tue, 8 Oct 2024 09:24:29 -0400 Subject: [PATCH 17/17] Setting SELFCAL to an empty string rather than an error message if a selfcal.auxproducts doesn't exist (which we check below) --- .../wf_framework/ingest_requirements/image-product-collector.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/cli/executables/wf_framework/ingest_requirements/image-product-collector.sh b/apps/cli/executables/wf_framework/ingest_requirements/image-product-collector.sh index 935c35fca..1d878072f 100644 --- a/apps/cli/executables/wf_framework/ingest_requirements/image-product-collector.sh +++ b/apps/cli/executables/wf_framework/ingest_requirements/image-product-collector.sh @@ -96,7 +96,7 @@ fi # NOTE: This fails if we have no weblog, previously we allowed that # possibility. -SELFCAL=$(ls -t ${SOURCE_DIR}/*selfcal.auxproducts.tgz) +SELFCAL=$(ls -t ${SOURCE_DIR}/*selfcal.auxproducts.tgz 2>/dev/null || true) if [[ -n "$SELFCAL" ]] then -- GitLab