Skip to content
Snippets Groups Projects
Commit 1b87bc14 authored by Daniel Nemergut's avatar Daniel Nemergut
Browse files

Merge branch 'ingest_obs_on_lustre' into 'main'

Ingest obs run on lustre

See merge request !1480
parents 547994a5 480cff76
No related branches found
No related tags found
1 merge request!1480Ingest obs run on lustre
Pipeline #12296 passed
......@@ -177,10 +177,11 @@ class ObservationCollector(CollectorIF):
source_path = self.parameters["sourcePath"]
workflow_dir = self.parameters["workflowDir"]
staging_dir = self.parameters["staging_area"]
sbin_dir = self.parameters["script_location"]
# run script
collector = subprocess.run(
["./observation-product-collector.sh", source_path, workflow_dir, staging_dir],
[f"{sbin_dir}/observation-product-collector.sh", source_path, workflow_dir, staging_dir],
stdout=sys.stdout,
stderr=sys.stderr,
)
......
......@@ -56,6 +56,7 @@ def _get_settings(
:return: a dict containing required settings to run the provided ingestion type
"""
ingestion_settings = CapoConfig().settings("edu.nrao.workspaces.IngestionSettings")
script_location = CapoConfig().settings("edu.nrao.workspaces.ProcessingSettings").scriptLocation
capability_url = CapoConfig().settings("edu.nrao.workspaces.CapabilitySettings").externalServiceUrl
workflow_url = CapoConfig().settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl
......@@ -70,6 +71,7 @@ def _get_settings(
parameters["staging_area"] = ingestion_settings.stagingDirectory
parameters["storage_area"] = ingestion_settings.storageDirectory
parameters["useIngest"] = strtobool(ingestion_settings.useIngest)
parameters["script_location"] = script_location
parameters["workflowUrl"] = workflow_url
return parameters
......
......@@ -33,17 +33,19 @@ from ingest_envoy.schema import AbstractTextFile
from ingest_envoy.utilities import IngestType, VLASSIngestType
def trigger_ingest(real_ingest: bool, staging_dir: str) -> int:
def trigger_ingest(real_ingest: bool, staging_dir: str, bin_dir: str = ".") -> int:
"""
Run ingest
:param real_ingest: real ingestion vs. testing flag
:param staging_dir: staging directory to ingest from
:param bin_dir: directory containing the ingestion script
:return: return code
"""
if real_ingest:
# TODO: This should be cleaned up when cal and image ingests don't transfer files into condor and use sbin
ingest_process = subprocess.run(
["./ingest", "--json", "-p", staging_dir],
[f"{bin_dir}/ingest", "--json", "-p", staging_dir],
stdout=sys.stdout,
stderr=sys.stderr,
)
......@@ -229,7 +231,7 @@ class IngestObservationLauncher(IngestLauncherIF):
self.prepare_for_ingest()
self.logger.info("Running ingest!")
return trigger_ingest(self.parameters["useIngest"], self.staging_source_dir)
return trigger_ingest(self.parameters["useIngest"], self.staging_source_dir, self.parameters["script_location"])
def prepare_for_ingest(self):
"""
......
......@@ -24,16 +24,11 @@ output = ingest.out
error = ingest.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/ingest_envoy, nraorsync://$(SBIN_PATH)/ingest, nraorsync://$(SBIN_PATH)/observation-product-collector.sh, ./metadata.json
should_transfer_files = NO
+WantIOProxy = True
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
queue
......@@ -43,7 +38,9 @@ queue
ingest_obs_sh = """#!/bin/sh
set -o errexit
./ingest_envoy --observation $1 $2
SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin
${SBIN_PATH}/ingest_envoy --observation $1 $2
"""
......
......@@ -18,7 +18,9 @@ depends_on = None
old_ingest_obs_sh = """#!/bin/sh
set -o errexit
./ingest_envoy --observation $1 $2
SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin
${SBIN_PATH}/ingest_envoy --observation $1 $2
"""
......@@ -30,9 +32,11 @@ running=$2.running
ingested=$2.ingested
failed=$2.failed
SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin
mv $2 $running
./ingest_envoy --observation $1 $running
${SBIN_PATH}/ingest_envoy --observation $1 $running
status=$?
[ $status -eq 0 ] && mv $running $ingested || mv $running $failed
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment