diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/collectors.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/collectors.py index a9545581ca316e0e9ddbbe64a877a34af5db3dce..6a9a98fe70139b99f4d58cbd7e36a5d6e877bf86 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/collectors.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/collectors.py @@ -177,10 +177,11 @@ class ObservationCollector(CollectorIF): source_path = self.parameters["sourcePath"] workflow_dir = self.parameters["workflowDir"] staging_dir = self.parameters["staging_area"] + sbin_dir = self.parameters["script_location"] # run script collector = subprocess.run( - ["./observation-product-collector.sh", source_path, workflow_dir, staging_dir], + [f"{sbin_dir}/observation-product-collector.sh", source_path, workflow_dir, staging_dir], stdout=sys.stdout, stderr=sys.stderr, ) diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py index 3962883fc4b34bf8389d581e0a1a570284715551..b0b33f26959d6438bcc23c26fa278ff1a18867d5 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py @@ -56,6 +56,7 @@ def _get_settings( :return: a dict containing required settings to run the provided ingestion type """ ingestion_settings = CapoConfig().settings("edu.nrao.workspaces.IngestionSettings") + script_location = CapoConfig().settings("edu.nrao.workspaces.ProcessingSettings").scriptLocation capability_url = CapoConfig().settings("edu.nrao.workspaces.CapabilitySettings").externalServiceUrl workflow_url = CapoConfig().settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl @@ -70,6 +71,7 @@ def _get_settings( parameters["staging_area"] = ingestion_settings.stagingDirectory parameters["storage_area"] = ingestion_settings.storageDirectory parameters["useIngest"] = strtobool(ingestion_settings.useIngest) + parameters["script_location"] = script_location parameters["workflowUrl"] = workflow_url return parameters diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py index b871bf64eb59c22e8596afb624528fe452a7093e..866d679f6e2daa7ff5e39d85314c6a5446fe8d6a 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py @@ -33,17 +33,19 @@ from ingest_envoy.schema import AbstractTextFile from ingest_envoy.utilities import IngestType, VLASSIngestType -def trigger_ingest(real_ingest: bool, staging_dir: str) -> int: +def trigger_ingest(real_ingest: bool, staging_dir: str, bin_dir: str = ".") -> int: """ Run ingest :param real_ingest: real ingestion vs. testing flag :param staging_dir: staging directory to ingest from + :param bin_dir: directory containing the ingestion script :return: return code """ if real_ingest: + # TODO: This should be cleaned up when cal and image ingests don't transfer files into condor and use sbin ingest_process = subprocess.run( - ["./ingest", "--json", "-p", staging_dir], + [f"{bin_dir}/ingest", "--json", "-p", staging_dir], stdout=sys.stdout, stderr=sys.stderr, ) @@ -229,7 +231,7 @@ class IngestObservationLauncher(IngestLauncherIF): self.prepare_for_ingest() self.logger.info("Running ingest!") - return trigger_ingest(self.parameters["useIngest"], self.staging_source_dir) + return trigger_ingest(self.parameters["useIngest"], self.staging_source_dir, self.parameters["script_location"]) def prepare_for_ingest(self): """ diff --git a/shared/workspaces/alembic/versions/491102d56809_add_ingest_obs_workflow.py b/shared/workspaces/alembic/versions/491102d56809_add_ingest_obs_workflow.py index e52410afdb132611d6caca99ce87d9c1621e85e4..2835fc9ad457cda9b5e789287954873eae4aa5cf 100644 --- a/shared/workspaces/alembic/versions/491102d56809_add_ingest_obs_workflow.py +++ b/shared/workspaces/alembic/versions/491102d56809_add_ingest_obs_workflow.py @@ -24,16 +24,11 @@ output = ingest.out error = ingest.err log = condor.log -SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin -SPOOL_DIR = {{spool_dir}} -should_transfer_files = yes -transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/ingest_envoy, nraorsync://$(SBIN_PATH)/ingest, nraorsync://$(SBIN_PATH)/observation-product-collector.sh, ./metadata.json +should_transfer_files = NO +WantIOProxy = True - request_memory = {{ramInGb}} getenv = True environment = "CAPO_PATH=/home/casa/capo" - requirements = HasLustre == True queue @@ -43,7 +38,9 @@ queue ingest_obs_sh = """#!/bin/sh set -o errexit -./ingest_envoy --observation $1 $2 +SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin + +${SBIN_PATH}/ingest_envoy --observation $1 $2 """ diff --git a/shared/workspaces/alembic/versions/569416c40ca8_ingest_obs_workflow_renaming.py b/shared/workspaces/alembic/versions/569416c40ca8_ingest_obs_workflow_renaming.py index d2e3775fa664abe5ce3446014ea766a520b143a3..49308d020701dd4897c531819f87b3f616b31533 100644 --- a/shared/workspaces/alembic/versions/569416c40ca8_ingest_obs_workflow_renaming.py +++ b/shared/workspaces/alembic/versions/569416c40ca8_ingest_obs_workflow_renaming.py @@ -18,7 +18,9 @@ depends_on = None old_ingest_obs_sh = """#!/bin/sh set -o errexit -./ingest_envoy --observation $1 $2 +SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin + +${SBIN_PATH}/ingest_envoy --observation $1 $2 """ @@ -30,9 +32,11 @@ running=$2.running ingested=$2.ingested failed=$2.failed +SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin + mv $2 $running -./ingest_envoy --observation $1 $running +${SBIN_PATH}/ingest_envoy --observation $1 $running status=$? [ $status -eq 0 ] && mv $running $ingested || mv $running $failed