Skip to content
Snippets Groups Projects
Commit 93d49368 authored by Daniel Nemergut's avatar Daniel Nemergut
Browse files

Made an observation collector that moves everything from the drop location to the staging area

parent c0a90ece
No related branches found
No related tags found
2 merge requests!1452Merge 2.8.2 to main,!1446Using the default WS staging area instead of the capo property
......@@ -159,3 +159,31 @@ class SECICollector(CollectorIF):
exit(1)
aux_file_to_staging(self.staging_source_dir)
class ObservationCollector(CollectorIF):
"""Collect Observation Products for Ingestion"""
def __init__(self, parameters):
self.logger = logging.getLogger("ingest_envoy")
self.parameters = parameters
self.staging_source_dir = self.parameters["staging_area"] + "/" + self.parameters["workflowDir"]
def collect_products(self):
"""
Run observation product collection script
"""
self.logger.info("Collecting observation products for staging...")
cache_path = self.parameters["seciCachePath"]
workflow_dir = self.parameters["workflowDir"]
staging_dir = self.parameters["staging_area"]
# run script
collector = subprocess.run(
["./observation-product-collector.sh", cache_path, workflow_dir, staging_dir],
stdout=sys.stdout,
stderr=sys.stderr,
)
if collector.returncode != 0:
self.logger.error("ERROR: Observation product collection failed!")
exit(1)
......@@ -24,6 +24,7 @@ from typing import Union
from ingest_envoy.collectors import (
ImageCollector,
SECICollector,
ObservationCollector,
collect_image_metadata,
)
from ingest_envoy.ingestion_manifest import IngestionManifestBuilder
......@@ -216,6 +217,7 @@ class IngestObservationLauncher(IngestLauncherIF):
self.sci_product_type = "execution_block"
self.parameters = parameters
self.staging_source_dir = self.parameters["staging_area"] + "/" + self.parameters["workflowDir"]
self.collector = ObservationCollector(self.parameters)
def launch_ingestion(self) -> int:
"""
......@@ -236,12 +238,19 @@ class IngestObservationLauncher(IngestLauncherIF):
:return:
"""
self.logger.info("Preparing for ingest...")
# 1. run collection script to create calibration tarfile
# self.run_collection_script()
# 1. run collection script to move observation data to the staging area
self.run_collector()
# 2. create ingestion manifest
self.create_manifest()
def run_collector(self):
"""
Run ObservationCollector
"""
# 1. collect products to staging area
self.collector.collect_products()
def create_manifest(self, additional_file=None):
"""
Create the observation ingestion manifest
......
#!/usr/bin/env bash
#
# Copyright (C) 2021 Associated Universities, Inc. Washington DC, USA.
#
# This file is part of NRAO Workspaces.
#
# Workspaces is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Workspaces is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
#
# A file to copy all files from a provided source directory to
# the Workspaces staging directory for ingestion.
#
# Arguments:
# 1: The path to the source directory
# 2: The source directory name
# 3: The staging directory root path
#
set -o errexit -o nounset -o xtrace
SRC_PATH=$1;shift
SRC_NAME=$1;shift
STAGE_DIR=$1;shift
SRC_DIR=${SRC_PATH}/${SRC_NAME}
# Create the staging directory carefully:
mkdir -p ${STAGE_DIR}/${SRC_NAME}
# Link all the files over to the staging directory (Create hard link, and be insistent)
#
# We just want the file name, so we'll work in the source directory
pushd ${SRC_DIR}
# link the files
for srcFile in $(ls);do cp ${SRC_DIR}/${srcFile} ${STAGE_DIR}/${SRC_NAME};done
# move back to the working directory
popd
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment