diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/collectors.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/collectors.py index fa14fc19d2cecade6c9cc20d2d928b26a58b104b..b303bf3e1fa711ffb1cfb80e5a53af32988dc6d9 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/collectors.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/collectors.py @@ -159,3 +159,31 @@ class SECICollector(CollectorIF): exit(1) aux_file_to_staging(self.staging_source_dir) + + +class ObservationCollector(CollectorIF): + """Collect Observation Products for Ingestion""" + + def __init__(self, parameters): + self.logger = logging.getLogger("ingest_envoy") + self.parameters = parameters + self.staging_source_dir = self.parameters["staging_area"] + "/" + self.parameters["workflowDir"] + + def collect_products(self): + """ + Run observation product collection script + """ + self.logger.info("Collecting observation products for staging...") + cache_path = self.parameters["seciCachePath"] + workflow_dir = self.parameters["workflowDir"] + staging_dir = self.parameters["staging_area"] + + # run script + collector = subprocess.run( + ["./observation-product-collector.sh", cache_path, workflow_dir, staging_dir], + stdout=sys.stdout, + stderr=sys.stderr, + ) + if collector.returncode != 0: + self.logger.error("ERROR: Observation product collection failed!") + exit(1) diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py index 1f1c2482ddb34790ecdd0928dc44b9fb915ac58d..b871bf64eb59c22e8596afb624528fe452a7093e 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py @@ -24,6 +24,7 @@ from typing import Union from ingest_envoy.collectors import ( ImageCollector, SECICollector, + ObservationCollector, collect_image_metadata, ) from ingest_envoy.ingestion_manifest import IngestionManifestBuilder @@ -216,6 +217,7 @@ class IngestObservationLauncher(IngestLauncherIF): self.sci_product_type = "execution_block" self.parameters = parameters self.staging_source_dir = self.parameters["staging_area"] + "/" + self.parameters["workflowDir"] + self.collector = ObservationCollector(self.parameters) def launch_ingestion(self) -> int: """ @@ -236,12 +238,19 @@ class IngestObservationLauncher(IngestLauncherIF): :return: """ self.logger.info("Preparing for ingest...") - # 1. run collection script to create calibration tarfile - # self.run_collection_script() + # 1. run collection script to move observation data to the staging area + self.run_collector() # 2. create ingestion manifest self.create_manifest() + def run_collector(self): + """ + Run ObservationCollector + """ + # 1. collect products to staging area + self.collector.collect_products() + def create_manifest(self, additional_file=None): """ Create the observation ingestion manifest diff --git a/apps/cli/executables/wf_framework/ingest_requirements/observation-product-collector.sh b/apps/cli/executables/wf_framework/ingest_requirements/observation-product-collector.sh new file mode 100644 index 0000000000000000000000000000000000000000..59407a43ac70926723550b30f3bf4af5e0a2c0b4 --- /dev/null +++ b/apps/cli/executables/wf_framework/ingest_requirements/observation-product-collector.sh @@ -0,0 +1,47 @@ +#!/usr/bin/env bash +# +# Copyright (C) 2021 Associated Universities, Inc. Washington DC, USA. +# +# This file is part of NRAO Workspaces. +# +# Workspaces is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Workspaces is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Workspaces. If not, see <https://www.gnu.org/licenses/>. + +# +# A file to copy all files from a provided source directory to +# the Workspaces staging directory for ingestion. +# +# Arguments: +# 1: The path to the source directory +# 2: The source directory name +# 3: The staging directory root path +# +set -o errexit -o nounset -o xtrace + +SRC_PATH=$1;shift +SRC_NAME=$1;shift +STAGE_DIR=$1;shift + +SRC_DIR=${SRC_PATH}/${SRC_NAME} + +# Create the staging directory carefully: +mkdir -p ${STAGE_DIR}/${SRC_NAME} + +# Link all the files over to the staging directory (Create hard link, and be insistent) +# +# We just want the file name, so we'll work in the source directory +pushd ${SRC_DIR} +# link the files +for srcFile in $(ls);do cp ${SRC_DIR}/${srcFile} ${STAGE_DIR}/${SRC_NAME};done +# move back to the working directory +popd