Skip to content
Snippets Groups Projects
Commit ab3cca96 authored by Daniel Nemergut's avatar Daniel Nemergut
Browse files

Merge branch 'ws1808-ingest_obs_staging_changes' into '2.8.2-DEVELOPMENT'

Using the default WS staging area instead of the capo property

See merge request !1446
parents d11c1cef 85143fb1
No related branches found
No related tags found
2 merge requests!1452Merge 2.8.2 to main,!1446Using the default WS staging area instead of the capo property
Pipeline #11547 passed
......@@ -144,7 +144,7 @@ class SECICollector(CollectorIF):
:return:
"""
self.logger.info("Collecting SECI products for staging...")
cache_path = self.parameters["seciCachePath"]
cache_path = self.parameters["sourcePath"]
workflow_dir = self.parameters["workflowDir"]
staging_dir = self.parameters["staging_area"]
tar_name = self.create_artifacts_name()
......@@ -159,3 +159,31 @@ class SECICollector(CollectorIF):
exit(1)
aux_file_to_staging(self.staging_source_dir)
class ObservationCollector(CollectorIF):
"""Collect Observation Products for Ingestion"""
def __init__(self, parameters):
self.logger = logging.getLogger("ingest_envoy")
self.parameters = parameters
self.staging_source_dir = self.parameters["staging_area"] + "/" + self.parameters["workflowDir"]
def collect_products(self):
"""
Run observation product collection script
"""
self.logger.info("Collecting observation products for staging...")
source_path = self.parameters["sourcePath"]
workflow_dir = self.parameters["workflowDir"]
staging_dir = self.parameters["staging_area"]
# run script
collector = subprocess.run(
["./observation-product-collector.sh", source_path, workflow_dir, staging_dir],
stdout=sys.stdout,
stderr=sys.stderr,
)
if collector.returncode != 0:
self.logger.error("ERROR: Observation product collection failed!")
exit(1)
......@@ -65,12 +65,9 @@ def _get_settings(
parameters["calSpl"] = cal_spl
if source_dir is not None:
parameters["workflowDir"] = pathlib.Path(source_dir).name
parameters["seciCachePath"] = pathlib.Path(source_dir).parent
# Use the default staging area if it wasn't set by the solicitor
if not parameters["staging_area"]:
parameters["staging_area"] = ingestion_settings.stagingDirectory
parameters["sourcePath"] = pathlib.Path(source_dir).parent
parameters["staging_area"] = ingestion_settings.stagingDirectory
parameters["storage_area"] = ingestion_settings.storageDirectory
parameters["useIngest"] = strtobool(ingestion_settings.useIngest)
parameters["workflowUrl"] = workflow_url
......
......@@ -24,6 +24,7 @@ from typing import Union
from ingest_envoy.collectors import (
ImageCollector,
SECICollector,
ObservationCollector,
collect_image_metadata,
)
from ingest_envoy.ingestion_manifest import IngestionManifestBuilder
......@@ -216,6 +217,7 @@ class IngestObservationLauncher(IngestLauncherIF):
self.sci_product_type = "execution_block"
self.parameters = parameters
self.staging_source_dir = self.parameters["staging_area"] + "/" + self.parameters["workflowDir"]
self.collector = ObservationCollector(self.parameters)
def launch_ingestion(self) -> int:
"""
......@@ -236,12 +238,19 @@ class IngestObservationLauncher(IngestLauncherIF):
:return:
"""
self.logger.info("Preparing for ingest...")
# 1. run collection script to create calibration tarfile
# self.run_collection_script()
# 1. run collection script to move observation data to the staging area
self.run_collector()
# 2. create ingestion manifest
self.create_manifest()
def run_collector(self):
"""
Run ObservationCollector
"""
# 1. collect products to staging area
self.collector.collect_products()
def create_manifest(self, additional_file=None):
"""
Create the observation ingestion manifest
......
......@@ -25,8 +25,7 @@ import pathlib
from typing import List, Union
import requests
from ingest_envoy.utilities import IngestType, Telescope, VLASSIngestType
from pycapo import CapoConfig
from ingest_envoy.utilities import IngestType, VLASSIngestType
INVALID_INITIAL_VERSION = "Initial version not valid for ingest"
......@@ -218,14 +217,6 @@ class Solicitor:
"project": self.metadata["projectMetadata"]["projectCode"], # needed for post ingestion messaging
}
# VLBA and GMVA share a non-default staging directory
if obs["telescope"].upper() in [Telescope.VLBA.value, Telescope.GMVA.value]:
try:
obs["staging_area"] = \
CapoConfig().settings("edu.nrao.workspaces.IngestionSettings").vlbiStagingDirectory
except KeyError:
self.logger.info("Couldn't retrieve VLBI staging area from capo, using default staging directory")
return {**obs}
def solicit_seci_params(self) -> dict:
......
#!/usr/bin/env bash
#
# Copyright (C) 2021 Associated Universities, Inc. Washington DC, USA.
#
# This file is part of NRAO Workspaces.
#
# Workspaces is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Workspaces is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
#
# A file to copy all files from a provided source directory to
# the Workspaces staging directory for ingestion.
#
# Arguments:
# 1: The path to the source directory
# 2: The source directory name
# 3: The staging directory root path
#
set -o errexit -o nounset -o xtrace
SRC_PATH=$1;shift
SRC_NAME=$1;shift
STAGE_DIR=$1;shift
SRC_DIR=${SRC_PATH}/${SRC_NAME}
# Create the staging directory carefully:
mkdir -p ${STAGE_DIR}/${SRC_NAME}
# Link all the files over to the staging directory (Create hard link, and be insistent)
#
# We just want the file name, so we'll work in the source directory
pushd ${SRC_DIR}
# link the files
for srcFile in $(ls);do cp ${SRC_DIR}/${srcFile} ${STAGE_DIR}/${SRC_NAME};done
# move back to the working directory
popd
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment