Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (3)
#!/usr/bin/env bash
#
# A script from the archive repo adapted for Workspaces
#
# Old archive description:
# A replacement for the old qaarchive script used by the
# data analysts. This script will tar & zip the pipeline
# weblog into WEBLOG.tar.gz and then collect everything but
# FITS files in the products directory from a CIPL run into
# a single tar file (naming convention TBD) that is created
# in a storage directory for ingestion.
#
# Arguments:
# 1: Working directory to be worked upon
# 2: Project code of observation
# 3: Start time of running this script
# 4: CAPO profile
#
# Basics: Path modification
set -o errexit -o nounset -o xtrace
#
# Command-line arguments
#
# Dir that identifies particular workflow
WORKFLOW_DIR=$1;shift
# Metadata for the output tar's naming scheme
SDM_ID=$1;shift
CAL_PROCESSING_DATETIME=$1;shift
# Get the spool, staging and storage paths from CAPO
SPOOL_DIR=$(pycapo -P "${CAPO_PROFILE}" -q edu.nrao.archive.workspaces.ProcessingSettings.rootDirectory)
STAGING_DIR=$(pycapo -P "${CAPO_PROFILE}" -q edu.nrao.archive.workspaces.IngestionSettings.stagingDirectory)
STORAGE_DIR=$(pycapo -P "${CAPO_PROFILE}" -q edu.nrao.archive.workspaces.IngestionSettings.storageDirectory)
# File name for output tar of calibration products
#
# Naming scheme: <sdmId>.<calProcessingDatetime>.caltables.tar
# a.k.a. <project_code>.<sb>.<eb>.<obs_time_in_mjd>.<cal_processing_timestamp>.caltables.tar
FILENAME="${SDM_ID}"."${CAL_PROCESSING_DATETIME}".caltables
#
# For the ability to reproduce results, we'll want the PPR.xml file. Ensure it is
# included in the products/ directory:
#
# Check for spool/<workflow_dir>/products/*.pprequest.xml. If it exists, do nothing. If not, move base_dir/PPR.xml to products
if test -n "$(find "${SPOOL_DIR}"/"${WORKFLOW_DIR}"/products -maxdepth 1 -name '*.pprequest.xml' -print -quit)"
then
cp "${SPOOL_DIR}"/"${WORKFLOW_DIR}"/PPR.xml "${SPOOL_DIR}"/"${WORKFLOW_DIR}"/products/PPR.xml
fi
#
# The VLASS project wants the flux.csv file. Check if it's here, if not, check for it in
# the working directory parallel to this one. Don't fail if we can't find it, however (so
# we minimize the disruption to the CIPL system).
#
# HERE USED TO LIE VLASS-SPECIFIC CODE
#
# Both the pipeline-YYYMMDDTHHMMSS directory and weblog.tgz should exist. We prefer the
# directory (in case of updates/edits), but fall back on the tgz file.
#
# Check that they're both home, as we expect
WEBLOG_DIR=$(ls -t "${SPOOL_DIR}"/"${WORKFLOW_DIR}"/products | grep pipeline- | head -1)
if [ -n "$WEBLOG_DIR" ]
then
# if weblog.tgz exists, we want to remove it
if [ -e "${SPOOL_DIR}"/"${WORKFLOW_DIR}"/products/weblog.tgz ]
then
rm -f "${SPOOL_DIR}"/"${WORKFLOW_DIR}"/products/weblog.tgz
fi
# Tar & Zip the weblog
tar -C "${SPOOL_DIR}"/"${WORKFLOW_DIR}"/products -czf "${SPOOL_DIR}"/"${WORKFLOW_DIR}"/products/weblog.tgz "${WEBLOG_DIR}"
if [ $? -ne 0 ]
then
echo "Creation of weblog.tgz failed, exiting"
exit 1
fi
else
# no weblog directory. If there's no weblog.tgz file, there's an issue: Issue a warning
if [ ! -e "${SPOOL_DIR}"/"${WORKFLOW_DIR}"/products/weblog.tgz ]
then
echo "Neither weblog.tgz or the weblog directory exist, continuing"
fi
fi
#
# Sanity checks: create a staging subdirectory for this cal, and if the file already exists, remove it.
#
mkdir -p "${STAGING_DIR}"/"${WORKFLOW_DIR}"
if [ -e "${STAGING_DIR}"/"${WORKFLOW_DIR}"/"${FILENAME}".tar ]
then
echo "Calibration Tar File Already Exists! Removing the file for recreation"
/bin/rm -f "${STAGING_DIR}"/"${WORKFLOW_DIR}"/"${FILENAME}".tar
fi
#
# tar all non-fits and non-weblog-related files into a tar archive in the storage path
# SSA-6115: Don't exclude the weblog.tgz: Users and DAs prefer it bundled in.
#
tar --exclude=\*.fits --exclude=pipeline-\* -C "${SPOOL_DIR}"/"${WORKFLOW_DIR}" -cvf "${STAGING_DIR}"/"${WORKFLOW_DIR}"/"${FILENAME}".tar products
if [ $? -ne 0 ]
then
echo "Creation of main tar file failed, exiting"
exit 1
fi
#
# Copy the weblog over, for ingestion as an ancillary file
#
/bin/cp -f "${SPOOL_DIR}"/"${WORKFLOW_DIR}"/products/weblog.tgz "${STAGING_DIR}"/"${WORKFLOW_DIR}"
if [ $? -ne 0 ]
then
echo "Copy of the weblog to staging location failed. Exiting."
exit 1
fi
#
# To stay consistent with current working methods: Copy from STAGING_DIR to STORAGE_DIR
#
mkdir -p "${STORAGE_DIR}"/"${WORKFLOW_DIR}"
cp "${STAGING_DIR}"/"${WORKFLOW_DIR}"/"${FILENAME}".tar "${STORAGE_DIR}"/"${WORKFLOW_DIR}"/"${FILENAME}".tar
if [ $? -ne 0 ]
then
# If something goes wrong, make some noise, but continue on.
echo "Failed to copy the calibration to ${STORAGE_DIR}, continuing."
$? = 0
fi
#!/usr/bin/env bash
set -o errexit -o nounset
function usage {
echo "Usage: ingest-request [-c] [workflow_request_id]
This script sets off a Workspaces ingestion workflow of the specified type,
given by the chosen option, for the provided workflow request id.
Options:
-c, --calibration run the ingest_cal workflow for the specified request
-h, --help display this help and exit
"
}
option=$(echo "$1" | tr A-Z a-z)
case $option in
--calibration|-c)
action="ingest_cal"
;;
--help|-h)
usage
;;
esac
WORKFLOW_SERVICE=$(capo -q edu.nrao.archive.workspaces.WorkflowSettings.serviceUrl)
if [ "$action" = "ingest_cal" ]; then
curl -X POST $WORKFLOW_SERVICE/workflows/std_calibration/requests/$2/ingest
fi
import os
import shutil
import subprocess
import tarfile
from pathlib import Path
import pytest
@pytest.fixture
def stub_calibration_area() -> Path:
"""
Fixture that creates a temporary mimic of a calibration run's working area
:return: Path to the base of the working area
"""
path_to_lustre = Path("/lustre/aoc/cluster/pipeline/docker/workspaces")
# Populate spool dir
workflow_dir = "tmp0123456"
workflow_path = path_to_lustre / "spool" / workflow_dir
spool_dirs = ["products", "rawdata", "working"]
spool_files = ["metadata.json", "PPR.xml"]
for d in spool_dirs:
(workflow_path / d).mkdir(parents=True)
for f in spool_files:
(workflow_path / f).touch()
# Populate spool/products
products_path = workflow_path / "products"
products = [
"16B-069_sb32814386_1_001.57685.66193635417.ms.calapply.txt",
"16B-069_sb32814386_1_001.57685.66193635417.ms.flagversions.tgz",
"casa_commands.log",
"casa_pipescript.py",
"oussid.3C286_bp.X_band.cont.I.alpha.error.fits",
"oussid.3C286_bp.X_band.cont.I.alpha.fits",
"oussid.3C286_bp.X_band.cont.I.mask.fits",
"unknown.hifv_cal.auxproducts.tgz",
"unknown.hifv_cal.pprequest.xml",
"unknown.session_1.caltables.tgz",
"weblog.tgz",
]
for f in products:
(products_path / f).touch()
yield path_to_lustre
shutil.rmtree(path_to_lustre / "spool" / workflow_dir)
shutil.rmtree(path_to_lustre / "staging" / workflow_dir)
shutil.rmtree(path_to_lustre / "storage" / workflow_dir)
assert not (path_to_lustre / "spool" / workflow_dir).exists()
assert not (path_to_lustre / "staging" / workflow_dir).exists()
assert not (path_to_lustre / "storage" / workflow_dir).exists()
def test_calibration_table_collector(stub_calibration_area: Path):
"""
Test that `calibration-table-collector.sh` does indeed collect the calibration tables and place them in the right
places in the right forms
"""
subprocess.run(
["sh/calibration-table-collector.sh", "tmp0123456", "test_sdm_id", "cal_date_time"]
)
# Ensure tar exists in staging dir
assert (
stub_calibration_area / "staging" / "tmp0123456" / "test_sdm_id.cal_date_time.caltables.tar"
).exists()
# Ensure tar contains correct products
expected_products = [
"test_sdm_id.cal_date_time.caltables.tar",
"16B-069_sb32814386_1_001.57685.66193635417.ms.calapply.txt",
"16B-069_sb32814386_1_001.57685.66193635417.ms.flagversions.tgz",
"casa_commands.log",
"casa_pipescript.py",
"unknown.hifv_cal.auxproducts.tgz",
"unknown.hifv_cal.pprequest.xml",
"unknown.session_1.caltables.tgz",
"weblog.tgz",
]
with tarfile.open(
stub_calibration_area / "staging" / "tmp0123456" / "test_sdm_id.cal_date_time.caltables.tar"
) as products:
products.extractall()
for f in (stub_calibration_area / "staging" / "tmp0123456").iterdir():
assert f.name in expected_products
# Ensure weblog exists in staging dir
assert (stub_calibration_area / "staging" / "tmp0123456" / "weblog.tgz").exists()
# Ensure tar exists in storage dir
assert (
stub_calibration_area / "storage" / "tmp0123456" / "test_sdm_id.cal_date_time.caltables.tar"
).exists()
<div class="container-fluid py-3">
<div class="container border border-dark rounded py-3 mt-2">
<h4>Null Capability</h4>
<div id="button-container" class="d-flex justify-content-left py-2">
<div class="d-flex px-2">
<button
type="button"
id="launchNullCapabilityBtn"
class="btn btn-secondary btn-lg"
(click)="nullButtonOnClick()"
>
Launch null capability
</button>
</div>
</div>
</div>
<div class="container border border-dark rounded py-3 mt-2">
<h4>Download or Calibration</h4>
<div class="row p-3">
<div class="col-6">
<div class="md-form">
......@@ -39,17 +55,7 @@
</div>
</div>
</div>
<div id="button-container" class="d-flex justify-content-center py-2">
<div class="d-flex px-2">
<button
type="button"
id="launchNullCapabilityBtn"
class="btn btn-secondary btn-lg"
(click)="nullButtonOnClick()"
>
Launch null capability
</button>
</div>
<div id="button-container" class="d-flex justify-content-left py-2">
<div class="d-flex px-2">
<button
type="button"
......@@ -72,3 +78,52 @@
</div>
</div>
</div>
<div class="container border rounded border-dark py-3 my-2">
<h4>Standard CMS Imaging</h4>
<div class="row p-3 mt-4">
<div class="col-4">
<div class="md-form">
<label for="cmsPathInput" class="">CMS Path</label>
<input
type="text"
id="cmsPathInput"
(change)="setCmsPath($event.target.value)"
class="form-control"
/>
</div>
</div>
<div class="col-3">
<div class="md-form">
<label for="sdmId" class="">SDM ID</label>
<input
type="text"
id="sdmIdInput"
(change)="setSdmId($event.target.value)"
class="form-control"
/>
</div>
</div>
<div class="col-3">
<div class="md-form">
<label for="userEmail" class="">Email Address</label>
<input
type="text"
id="userEmail"
[value]="userEmail"
(change)="setUserEmail($event.target.value)"
class="form-control"
/>
</div>
</div>
</div>
<div id="button-container" class="d-flex justify-content-left py-2">
<div class="d-flex px-2">
<button
class="btn btn-lg btn-warning"
id="cms-imaging-submit"
(click)="LaunchImagingCapabilityOnClick('std_cms_imaging')">
Launch standard CMS imaging
</button>
</div>
</div>
</div>
......@@ -15,6 +15,8 @@ export class WorkspacesComponent implements OnInit {
public productLocator: string;
public userEmail: string;
public inputFileList: FileList;
public cmsPath: string;
public sdmId: string;
constructor(
private capabilityLauncher: CapabilityLauncherService,
......@@ -41,12 +43,27 @@ export class WorkspacesComponent implements OnInit {
* FIXME: Remove check for calibration once it is implemented
*/
standardLaunchCapabilityOnClick(capabilityName: string): void {
if (capabilityName == "std_cms_imaging") { this.productLocator = undefined; }
this.launchCapability(capabilityName, {
product_locator: this.productLocator,
user_email: this.userEmail,
});
}
/**
* OnClick method that creates a capability request for a imaging capability and submits it with the CMS imaging parameters:
* - User email
* - CMS Path
* - SDM ID
*/
LaunchImagingCapabilityOnClick(capabilityName: string): void {
this.launchCapability(capabilityName, {
user_email: this.userEmail,
cms_path: this.cmsPath,
sdmId: this.sdmId,
});
}
/**
* method that sets the user input Science Product Locator for the download capability
* @param spl the Science Product Locator to download
......@@ -71,6 +88,22 @@ export class WorkspacesComponent implements OnInit {
this.inputFileList = inputFileList;
}
/**
* method to set the CMS Path for Standard Imaging
* @param path CMS path for imaging
*/
setCmsPath(path: string): void {
this.cmsPath = path;
}
/**
* method to set the SDM ID for Standard Imaging
* @param path SDM ID for imaging
*/
setSdmId(id: string): void {
this.sdmId = id;
}
/**
* Method that uses the capabilityLauncher service to launch a capability
* @param capabilityName Name of capability
......
......@@ -41,10 +41,7 @@ services:
dockerfile: ./config/htcondor/execute/Dockerfile.local
volumes:
- ./delivery_root:/tmp/delivery_root
- ./lustre/aoc/cluster/pipeline/docker/workspaces/spool:/lustre/aoc/cluster/pipeline/docker/workspaces/spool
- ./lustre/aoc/cluster/pipeline/docker/workspaces/qa2:/lustre/aoc/cluster/pipeline/docker/workspaces/qa2
- ./lustre/aoc/cluster/pipeline/docker/workspaces/cache/weblog:/lustre/aoc/cluster/pipeline/docker/workspaces/cache/weblog
- ./lustre/aoc/cluster/pipeline/docker/workspaces/image-qa:/lustre/aoc/cluster/pipeline/docker/workspaces/image-qa
- ./lustre/aoc/cluster/pipeline/docker/workspaces:/lustre/aoc/cluster/pipeline/docker/workspaces
schema:
build:
......
......@@ -41,6 +41,12 @@ edu.nrao.archive.workflow.config.DeliverySettings.nraoDownloadUrl = http://local
edu.nrao.archive.workspaces.DeliverySettings.ciplDelivery = /lustre/aoc/cluster/pipeline/docker/workspaces/qa2
edu.nrao.archive.workspaces.DeliverySettings.cacheWeblogDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/cache/weblog
#
# Ingestion Settings
#
edu.nrao.archive.workspaces.IngestionSettings.stagingDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/staging
edu.nrao.archive.workspaces.IngestionSettings.storageDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/storage
#
# Standard Imaging Settings
#
......
......@@ -23,14 +23,19 @@ SPOOL_DIR=/lustre/aoc/cluster/pipeline/"$CAPO_PROFILE"/workspaces/spool
QA_DIR=/lustre/aoc/cluster/pipeline/"$CAPO_PROFILE"/workspaces/qa2
WEBLOG_DIR=/lustre/aoc/cluster/pipeline/"$CAPO_PROFILE"/workspaces/cache
IMAGE_QA_DIR=/lustre/aoc/cluster/pipeline/"$CAPO_PROFILE"/workspaces/image-qa
STAGING_DIR=lustre/aoc/cluster/pipeline/"$CAPO_PROFILE"/workspaces/staging
STORAGE_DIR=lustre/aoc/cluster/pipeline/"$CAPO_PROFILE"/workspaces/staging
mkdir -p "$SPOOL_DIR"
mkdir -p "$QA_DIR"
mkdir -p "$WEBLOG_DIR"
mkdir -p "$WEBLOG_DIR/weblog"
mkdir -p "$IMAGE_QA_DIR"
mkdir -p "$STAGING_DIR"
mkdir -p "$STORAGE_DIR"
# Copy wf_framework shell scripts to workflow dir
cp /packages/apps/cli/executables/wf_framework/sh/framework.sh "$WORKFLOW_DIR"
cp /packages/apps/cli/executables/wf_framework/sh/calibration-table-collector.sh "$WORKFLOW_DIR"
cp -R /packages/apps/cli/executables/wf_framework/casa_requirements/.matplotlib "$WORKFLOW_DIR"
......@@ -41,6 +46,8 @@ chown vlapipe:vlapipe "$SPOOL_DIR"
chown vlapipe:vlapipe "$QA_DIR"
chown -R vlapipe:vlapipe "$WEBLOG_DIR"
chown vlapipe:vlapipe "$IMAGE_QA_DIR"
chown vlapipe:vlapipe "$STAGING_DIR"
chown vlapipe:vlapipe "$STORAGE_DIR"
chmod 777 "$WORKFLOW_DIR"
chmod 777 "$WORKFLOW_DIR"/*
......
......@@ -238,6 +238,29 @@ class WorkflowRequestRestService:
return self.request.workflows.announce_qa_ready(self.request.matchdict["request_id"])
@view_config(request_method="POST", route_name="ingest_workflow_result")
def ingest(self):
"""
Ingest specified workflow request's results into NGAS and archive
:return:
"""
print(f"Ingesting results for workflow request {self.request.context}")
# 1. retrieve metadata.json for workflow request
self.request.matchdict["filename"] = "metadata.json"
file = lookup_file(request=self.request)
# 2. create ingestion workflow request
ingest_type = "ingest_cal" if "calibration" in self.request.matchdict["name"] else "ingest"
ingest_request = self.request.info.create_workflow_request(ingest_type)
# 3. attach metadata.json to ingestion wf request
self.request.workflows.attach_file_to_request(
request=ingest_request, filename=file.filename, content=file.content
)
# 4. submit ingestion workflow request
self.request.workflows.execute(ingest_request)
@view_defaults(route_name="workflow_request_files", renderer="json")
class WorkflowFilesRestService:
"""
......@@ -417,12 +440,15 @@ def main(global_config, **settings):
"/workflows/{name}/requests/{request_id}/submit",
factory=lookup_request,
)
config.add_route(
"ingest_workflow_result",
"/workflows/{name}/requests/{request_id}/ingest",
factory=lookup_request,
)
# yes I know this doesn't match pattern, it's a stopgap.
config.add_route(
"announce_qa_ready",
"/workflows/requests/{request_id}/qa",
factory=lookup_request
"announce_qa_ready", "/workflows/requests/{request_id}/qa", factory=lookup_request
)
config.include("pyramid_beaker")
config.scan(".")
......
......@@ -98,12 +98,21 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
Announce results are available for QA
THIS IS A WORKAROUND PENDING THE IMPLEMENTATION OF THE QA SYSTEM!
<insert unhappy tears here>
:param workflow_request: completed workflow request
:param workflow_request_id: id of completed workflow
:return:
"""
requests.post(f"{self.url}/workflows/requests/{workflow_request_id}/qa")
def ingest(self, request: WorkflowRequestIF):
"""
Ingest results for previously run workflow into NGAS and archive
:param request: completed workflow request to ingest
:return:
"""
requests.post(
f"{self.url}/workflows/{request.workflow_name}/requests/{request.workflow_request_id}/ingest"
)
class WorkflowService(WorkflowServiceIF):
"""
......