Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (4)
Showing
with 583 additions and 319 deletions
......@@ -182,6 +182,10 @@ class ImagingLauncher(LauncherIF):
)
if check_input:
wf_name = self.metadata.content["workflowName"]
if "restore" in wf_name:
RestoreFoundation(self.parameters, self.metadata).data_foundation()
self.run_audit(self.parameters)
CasaLauncher(self.parameters).run()
else:
......
......@@ -4,7 +4,6 @@ import sys
from ingest_envoy.launchers import IngestCalibrationLauncher
from ingest_envoy.solicitor import Solicitor
from messaging.router import Router
from pycapo import CapoConfig
"""
......@@ -57,20 +56,6 @@ def check_ingest_type(args_type: str, parameters: dict) -> bool:
return False
def send_ingestion_complete_message(workflow_request_id: str):
"""
Send AMQP message to notify the system that ingestion has succeeded
:param workflow_request_id: ID of workflow request that has had its products ingested
"""
ingestion_complete_msg = {
"type": "ingestion-complete",
"workflow_request_id": workflow_request_id,
}
router = Router("workflow")
router.send_message(**ingestion_complete_msg)
def main():
args = arg_parser().parse_args()
......@@ -82,7 +67,9 @@ def main():
ingest_result = IngestCalibrationLauncher(parameters).launch_ingestion()
if ingest_result == 0:
# Ingestion succeeded
send_ingestion_complete_message(parameters["workflowRequestId"])
logger.info("Ingestion finished successfully!")
else:
logger.error("ERROR: Ingestion failure! Please check logs")
else:
logger.error(
......
......@@ -70,15 +70,6 @@ class ManifestIF(ManifestComponentIF):
self.files_found = [file for file in self.staging_source_dir.iterdir()]
@abc.abstractmethod
def create(self):
"""
Build and write the manifest, which includes gathering various items in
ingestion_path to get info for the manifest.
:return:
"""
@abc.abstractmethod
def write(self):
"""
......@@ -112,13 +103,13 @@ class IngestionManifestBuilder:
def __init__(
self,
staging_source_dir: Path,
sp_type: ScienceProductType,
sp_type: str,
locator: str,
telescope: Telescope,
):
self.telescope = telescope
self.staging_source_dir = staging_source_dir
self.sp_type = sp_type
self.sp_type = ScienceProductType(sp_type)
self.locator = locator
self.files_found = [file for file in staging_source_dir.iterdir()]
if len(self.files_found) == 0:
......@@ -260,7 +251,7 @@ class IngestionManifestBuilder:
class IngestionManifest(ManifestIF):
"""needed for ingestion-launching interface"""
"""write ingestion manifest to file"""
def build_ingest_parameters(self):
"""
......@@ -279,34 +270,6 @@ class IngestionManifest(ManifestIF):
staging_source_dir=self.staging_source_dir,
)
def _build_input_group(self):
"""
Create the input group using the parameters.
:return:
"""
# N.B. this is sufficient for most types of ingestion,
# but ALMA CALs will have multiple EB SPs, identified only by locator,
# and VLBAs have no input group at all.
sp_in = InputScienceProduct(sp_type=self.sp_type, locator=self.locator)
return InputGroup([sp_in])
def _build_output_group(self) -> OutputGroup:
"""
Create the output group using the parameters.
:return:
"""
sp_tar = self._find_science_product_tar()
find_output_science_products(self.files_found, self.staging_source_dir)
sps_out = [OutputScienceProduct(self.sp_type, sp_tar.name)]
# find ancillary products, if any
ancillary_products = self._find_ancillary_products()
weblog = Path(self.ingestion_path / WEBLOG)
if weblog.exists():
ancillary_products.append(AncillaryProduct(type=WEBLOG, filename=str(weblog)))
return OutputGroup(sps_out)
# @property
def ingestion_path(self) -> Path:
return self.parameters.ingestion_path
......@@ -326,27 +289,6 @@ class IngestionManifest(ManifestIF):
return output_path
def create(self):
"""
Create the ingestion manifest in this directory for a product of this type,
identified by this locator.
:return:
"""
if self.sp_type != ScienceProductType.EVLA_CAL:
raise NotImplementedError(
f"Don't yet know how to handle {self.sp_type.value} ingestion"
)
builder = IngestionManifestBuilder(
staging_source_dir=Path(self.staging_source_dir),
sp_type=self.sp_type,
locator=self.locator,
telescope=self.telescope,
)
builder.build()
def to_json(self) -> JSON:
"""
Turn this object into a JSON string suitable for writing to a file
......
import logging
import subprocess
import sys
from pathlib import Path
from ingest_envoy.ingestion_manifest import IngestionManifest
from ingest_envoy.ingestion_manifest import IngestionManifest, IngestionManifestBuilder
from ingest_envoy.interfaces import IngestLauncherIF
......@@ -21,6 +22,8 @@ class IngestCalibrationLauncher(IngestLauncherIF):
:return: Return code of ingestion script process
"""
self.logger.info("RUNNING CALIBRATION INGESTION!")
self.prepare_for_ingest()
return self.run_ingest()
......@@ -30,14 +33,16 @@ class IngestCalibrationLauncher(IngestLauncherIF):
:return: Return code of ingestion script process
"""
self.logger.info("Running ingestion!")
ingest_process = subprocess.run(
["./ingest", "--json", self.staging_source_dir],
["./ingest", "--json", "-p", self.staging_source_dir],
stdout=sys.stdout,
stderr=sys.stderr,
)
return ingest_process.returncode
def prepare_for_ingest(self):
self.logger.info("Preparing for ingest...")
# 1. run collection script to create calibration tarfile
self.run_collection_script()
......@@ -45,6 +50,7 @@ class IngestCalibrationLauncher(IngestLauncherIF):
self.create_manifest()
def run_collection_script(self):
self.logger.info("Collecting calibration tables for staging...")
workflow_dir = self.parameters["workflowDir"]
sdm_id = self.parameters["sdmId"]
cal_processing_datetime = self.parameters["processingStart"]
......@@ -56,7 +62,10 @@ class IngestCalibrationLauncher(IngestLauncherIF):
)
def create_manifest(self):
self.logger.info("Creating ingestion manifest...")
spl = self.parameters["spl"]
telescope = self.parameters["telescope"]
IngestionManifest(self.staging_source_dir, self.sci_product_type, spl, telescope).create()
IngestionManifestBuilder(
Path(self.staging_source_dir), self.sci_product_type, spl, telescope
).build()
......@@ -27,8 +27,10 @@ class Solicitor:
def solicit_parameters(self):
metadata = self.solicit_contents()
project_info = metadata["projectMetadata"]
return {
"sdmId": metadata["fileSetIds"],
"telescope": project_info["telescope"],
"workflowName": metadata["workflowName"],
"workflowRequestId": metadata["systemId"],
"spl": metadata["productLocator"],
......
#!/usr/bin/env bash
#
# A replacement for the old qaarchive script used by the
# data analysts. This script will tar & zip the pipeline
# weblog into WEBLOG.tar.gz and then collect everything but
# FITS files in the products directory from a CIPL run into
# a single tar file (naming convention TBD) that is created
# in a storage directory for ingestion.
#
# Arguments:
# 1: Working Directory in qa2 to be worked upon
# 2: Filename (if different from the above)
#
#
#Basics: Path modification
set -o errexit -o nounset -o xtrace
#
# command line argument
#
# The tar file will be named after the working directory it came from
# which preserves the processing time information.
#
SUBDIRECTORY=$1;shift
PROFILE=$1;shift
# Get the qa2, spool, and staging paths from CAPO
SPOOL_DIR=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.spoolDirectory)
STAGE_DIR=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.stageDirectory)
STORE_DIR=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.storageDirectory)
#Yet More VLASS Specialness
if [[ ${PROFILE} != vlass* ]]
then
QA2_DIR=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.qaDirectory)
FILENAME=${SUBDIRECTORY}
else
# For VLASS, they don't want the data moved between qa2/ and spool/
QA2_DIR=${SPOOL_DIR}
# They also provide an extra layer of directory within the filename.
IFS='/' # redefine the character on which to split
read -ra COMPONENTS <<< "${SUBDIRECTORY}"
IFS=' ' # reset to default after
# We get: calibration/VLASS1.1_stuff --> FILENAME becomes VLASS1.1_stuff (in line with CIPL)
FILENAME=${COMPONENTS[1]}
fi
# Get the weblog caching directory from CAPO
WEBLOG_CACHE=$(capo -P ${PROFILE} -q edu.nrao.archive.workflow.config.CiplWorkflowSettings.cacheWeblogDirectory)
#
# For the ability to reproduce results, we'll want the PPR.xml file. Ensure it is
# included in the products/ directory:
#
# TODO: Check for base_dir/products/*.pprequest.xml. If it exists, do nothing. If not, use base_dir/PPR.xml
if [ ! -e ${QA2_DIR}/${SUBDIRECTORY}/products/PPR.xml ]
then
cp ${QA2_DIR}/${SUBDIRECTORY}/working/PPR.xml ${QA2_DIR}/${SUBDIRECTORY}/products/PPR.xml
fi
#
# The VLASS project wants the flux.csv file. Check if it's here, if not, check for it in
# the working directory parallel to this one. Don't fail if we can't find it, however (so
# we minimize the disruption to the CIPL system).
#
if [ ! -e ${QA2_DIR}/${SUBDIRECTORY}/products/flux.csv ]
then
if [ -e ${QA2_DIR}/${SUBDIRECTORY}/working/flux.csv ]
then
cp ${QA2_DIR}/${SUBDIRECTORY}/working/flux.csv ${QA2_DIR}/${SUBDIRECTORY}/products/flux.csv
else
echo "No flux.csv file found here or in parallel working directory. Continuing."
fi
fi
#
# Both the pipeline-YYYMMDDTHHMMSS directory and weblog.tgz should exist. We prefer the
# directory (in case of updates/edits), but fall back on the tgz file.
#
# Check that they're both home, as we expect
WEBLOG_DIR=$(ls -t ${QA2_DIR}/${SUBDIRECTORY}/products | grep pipeline- | head -1)
if [ -n "$WEBLOG_DIR" ]
then
# if weblog.tgz exists, we want to remove it
if [ -e ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz ]
then
rm -f ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz
fi
# Tar & Zip the weblog
tar -C${QA2_DIR}/${SUBDIRECTORY}/products -czf ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz ${WEBLOG_DIR}
if [ $? -ne 0 ]
then
echo "Creation of weblog.tgz failed, exiting"
exit -1
fi
else
# no weblog directory. If there's no weblog.tgz file, there's an issue: Issue a warning
if [ ! -e ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz ]
then
echo "Neither weblog.tgz or the weblog directory exist, continuing"
fi
fi
#
# Sanity checks: create a staging subdirectory for this cal, and if the file already exists, remove it.
#
mkdir -p ${STAGE_DIR}/${SUBDIRECTORY}
if [ -e ${STAGE_DIR}/${SUBDIRECTORY}/${FILENAME}.tar ]
then
echo "Calibration Tar File Already Exists! Removing the file for recreation"
#We could rename them with a version ...
#FILENAME=${SUBDIRECTORY}.$(ls -1 ${STAGE_DIR}/${SUBDIRECTORY} | wc -l)
# if we rename it... how do we tell the workflow?
/bin/rm -f ${STAGE_DIR}/${SUBDIRECTORY}/${FILENAME}.tar
fi
#
# tar all non-fits and non-weblog-related files into a tar archive in the storage path
# SSA-6115: Don't exclude the weblog.tgz: Users and DAs prefer it bundled in.
#
tar --exclude=\*.fits --exclude=pipeline-\* -C${QA2_DIR}/${SUBDIRECTORY} -cvf ${STAGE_DIR}/${SUBDIRECTORY}/${FILENAME}.tar products
if [ $? -ne 0 ]
then
echo "Creation of main tar file failed, exiting"
exit -1
fi
#
# Copy the weblog over, for ingestion as an ancillary file
#
/bin/cp -f ${QA2_DIR}/${SUBDIRECTORY}/products/weblog.tgz ${STAGE_DIR}/${SUBDIRECTORY}
if [ $? -ne 0 ]
then
echo "Copy of the weblog to staging location failed. Exiting."
exit -1
fi
#
# To stay consistent with current working methods: Copy from STAGE_DIR to STORE_DIR
#
cp ${STAGE_DIR}/${SUBDIRECTORY}/${FILENAME}.tar ${STORE_DIR}
if [ $? -ne 0 ]
then
# If something goes wrong, make some noise, but continue on.
echo "Failed to copy the calibration to ${STORE_DIR}, continuing."
$? = 0
fi
# Move subdirectories to the /spool/ copy of this directory
# if it exists, otherwise, just move what we have to /spool/
#
# If this is an ingestion for VLASS, don't move anything.
#
if [[ ${PROFILE} != vlass* ]]
then
if [ -e ${SPOOL_DIR}/${SUBDIRECTORY} ]
then
# Our base directory with the outputlogs is there, move our subdirectories back
/bin/mv -f ${QA2_DIR}/${SUBDIRECTORY}/products ${SPOOL_DIR}/${SUBDIRECTORY}
/bin/mv -f ${QA2_DIR}/${SUBDIRECTORY}/rawdata ${SPOOL_DIR}/${SUBDIRECTORY}
/bin/mv -f ${QA2_DIR}/${SUBDIRECTORY}/working ${SPOOL_DIR}/${SUBDIRECTORY}
# Cleanup the QA2 area
/bin/rm -rf ${QA2_DIR}/${SUBDIRECTORY}
else
#if no old directory exists, just move the whole thing back
/bin/mv -f ${QA2_DIR}/${SUBDIRECTORY} ${SPOOL_DIR}
fi
fi
......@@ -7,6 +7,7 @@
"projectMetadata": {
"projectCode": "16B-069",
"title": "JVLA monitoring of an in-progress extreme scattering event",
"telescope": "EVLA",
"startTime": "57685.661952546296",
"observer": "Keith Bannister"
},
......
......@@ -45,16 +45,6 @@ class TestIngest:
check2 = ingest.check_ingest_type(arg_type2, expected_settings)
assert check2 is False
def test_send_ingestion_complete_message(self):
fake_workflow_request_id = "2"
expected_msg = {
"type": "ingestion-complete",
"workflow_request_id": fake_workflow_request_id,
}
with patch("ingest_envoy.ingest.Router") as patched_router:
ingest.send_ingestion_complete_message(fake_workflow_request_id)
patched_router.return_value.send_message.assert_called_with(**expected_msg)
def test_parser(self):
parser = ingest.arg_parser()
assert isinstance(parser, argparse.ArgumentParser)
......@@ -66,11 +56,9 @@ class TestIngest:
with patch(
"ingest_envoy.launchers.IngestCalibrationLauncher.launch_ingestion", return_value=0
) as cal_ingest:
with patch("ingest_envoy.ingest.send_ingestion_complete_message") as patched_send:
ingest.main()
assert cal_ingest.call_count == 1
assert cal_ingest.return_value == 0
patched_send.assert_called_with("2")
ingest.main()
assert cal_ingest.call_count == 1
assert cal_ingest.return_value == 0
# reset for other testing
args.calibration = None
......@@ -13,6 +13,7 @@ expected_metadata = {
"projectMetadata": {
"projectCode": "16B-069",
"title": "JVLA monitoring of an in-progress extreme scattering event",
"telescope": "EVLA",
"startTime": "57685.661952546296",
"observer": "Keith Bannister",
},
......@@ -40,6 +41,7 @@ class TestSolicitor:
def test_solicit_parameters(self):
expected_parameters = {
"sdmId": "16B-069_sb32814386_1_001.57685.66193635417",
"telescope": "EVLA",
"workflowName": "std_calibration",
"workflowRequestId": "2",
"spl": "uid://evla/execblock/48ba4c9d-d7c7-4a8f-9803-1115cd52459b",
......
......@@ -32,9 +32,9 @@ SDM_ID=$1;shift
CAL_PROCESSING_DATETIME=$1;shift
# Get the spool, staging and storage paths from CAPO
SPOOL_DIR=$(pycapo -P "${CAPO_PROFILE}" -q edu.nrao.archive.workspaces.ProcessingSettings.rootDirectory)
STAGING_DIR=$(pycapo -P "${CAPO_PROFILE}" -q edu.nrao.archive.workspaces.IngestionSettings.stagingDirectory)
STORAGE_DIR=$(pycapo -P "${CAPO_PROFILE}" -q edu.nrao.archive.workspaces.IngestionSettings.storageDirectory)
SPOOL_DIR=$(./pycapo -P "${CAPO_PROFILE}" -q edu.nrao.archive.workspaces.ProcessingSettings.rootDirectory)
STAGING_DIR=$(./pycapo -P "${CAPO_PROFILE}" -q edu.nrao.archive.workspaces.IngestionSettings.stagingDirectory)
STORAGE_DIR=$(./pycapo -P "${CAPO_PROFILE}" -q edu.nrao.archive.workspaces.IngestionSettings.storageDirectory)
# File name for output tar of calibration products
#
......
"""fixing ingest_cal script
Revision ID: 34d7311d8290
Revises: c857908b6acb
Create Date: 2021-07-21 14:06:23.987647
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "34d7311d8290"
down_revision = "c857908b6acb"
branch_labels = None
depends_on = None
script_content = """#!/bin/sh
set -o errexit
./conveyor --retrieve $1
./ingest_envoy --calibration $1
"""
condor = """executable = ingest_cal.sh
arguments = metadata.json
output = ingest.out
error = ingest.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/pycapo, $(SBIN_PATH)/conveyor, $(SBIN_PATH)/ingest_envoy, $(SBIN_PATH)/ingest, $(SBIN_PATH)/calibration-table-collector.sh, ./metadata.json
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
old_script = """#!/bin/sh
set -o errexit
./conveyor --retrieve $1
./ingest_envoy --ingest-cal $1
"""
restore_metadata = """{
"fileSetIds": ["{{sdmId}}", "{{calSdmId}}"],
"workflowName": "restore_cms",
"systemId": "{{request_id}}",
"creationTime": "{{created_at}}",
"productLocator": "{{product_locator}}",
"calProductLocator": "{{cal_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"telescope": "{{telescope}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}"
}
"""
cal_metadata = """{
"fileSetIds": "{{sdmId}}",
"workflowName": "std_calibration",
"systemId": "{{request_id}}",
"creationTime": "{{created_at}}",
"productLocator": "{{product_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"telescope": "{{telescope}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}"
}
"""
cms_metadata = """{
"fileSetIds": "{{sdmId}}",
"workflowName": "std_cms_imaging",
"systemId": "{{request_id}}",
"creationTime": "{{created_at}}",
"productLocator": "{{product_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"telescope": "{{telescope}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}",
"calibrationSourceDirectory":"{{cms_path}}",
"cmsName":"{{sdmId}}.ms"
}
"""
old_condor = """executable = ingest_cal.sh
arguments = metadata.json
output = ingest.out
error = ingest.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/conveyor, $(SBIN_PATH)/ingest_envoy, $(SBIN_PATH)/ingest, $(SBIN_PATH)/calibration-table-collector.sh, ./metadata.json
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
old_restore_metadata = """{
"fileSetIds": ["{{sdmId}}", "{{calSdmId}}"],
"workflowName": "restore_cms",
"systemId": "{{request_id}}",
"creationTime": "{{created_at}}",
"productLocator": "{{product_locator}}",
"calProductLocator": "{{cal_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}"
}
"""
old_cal_metadata = """{
"fileSetIds": "{{sdmId}}",
"workflowName": "std_calibration",
"systemId": "{{request_id}}",
"creationTime": "{{created_at}}",
"productLocator": "{{product_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}"
}
"""
old_cms_metadata = """{
"fileSetIds": "{{sdmId}}",
"workflowName": "std_cms_imaging",
"systemId": "{{request_id}}",
"creationTime": "{{created_at}}",
"productLocator": "{{product_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}",
"calibrationSourceDirectory":"{{cms_path}}",
"cmsName":"{{sdmId}}.ms"
}
"""
def upgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{script_content}' WHERE filename='ingest_cal.sh'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{condor}' WHERE filename='ingest_cal.condor'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{restore_metadata}' WHERE filename='metadata.json' AND workflow_name='restore_cms'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{cal_metadata}' WHERE filename='metadata.json' AND workflow_name='std_calibration'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{cms_metadata}' WHERE filename='metadata.json' AND workflow_name='std_cms_imaging'
"""
)
def downgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_script}' WHERE filename='ingest_cal.sh'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_restore_metadata}' WHERE filename='metadata.json' AND workflow_name='restore_cms'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_cal_metadata}' WHERE filename='metadata.json' AND workflow_name='std_calibration'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_cms_metadata}' WHERE filename='metadata.json' AND workflow_name='std_cms_imaging'
"""
)
"""correcting restore and image
Revision ID: f2e524e1e04d
Revises: 34d7311d8290
Create Date: 2021-07-22 12:56:45.067121
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "f2e524e1e04d"
down_revision = "34d7311d8290"
branch_labels = None
depends_on = None
ppr = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ns2:SciPipeRequest xmlns:ns2="Common/pipelinescience/SciPipeRequest">
<ProjectSummary>
<ProposalCode>VLA/{{projectCode}}</ProposalCode>
<Observatory>NRAO</Observatory>
<Telescope>VLA</Telescope>
<ProcessingSite>Socorro</ProcessingSite>
<Operator>vlapipe</Operator>
<Mode>SCIENCE</Mode>
<Version>NGRH-ALMA-10_8</Version>
<CreationTime>{{created_at}}</CreationTime>
</ProjectSummary>
<ProjectStructure>TBD</ProjectStructure>
<ProcessingRequests>
<RootDirectory>{{root_directory}}</RootDirectory>
<ProcessingRequest>
<ProcessingIntents/>
<ProcessingProcedure>
<ProcedureTitle>Workspaces Restore Imaging</ProcedureTitle>
<ProcessingCommand>
<Command>hifv_restoredata</Command>
<ParameterSet/>
</ProcessingCommand>
<ProcessingCommand>
<Command>hifv_statwt</Command>
<ParameterSet/>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_mstransform</Command>
<ParameterSet>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_checkproductsize</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">maximsize</Keyword>
<Value xmlns="">16384</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_makeimlist</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">specmode</Keyword>
<Value xmlns="">cont</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_makeimages</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">hm_masking</Keyword>
<Value xmlns="">none</Value>
</Parameter>
<Parameter>
<Keyword xmlns="">hm_cyclefactor</Keyword>
<Value xmlns="">3.0</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hifv_exportdata</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">imaging_products_only</Keyword>
<Value xmlns="">True</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
</ProcessingProcedure>
<DataSet>
<RelativePath>{{relative_path}}</RelativePath>
<SdmIdentifier>{{sdmId}}</SdmIdentifier>
<DataType>asdm</DataType>
</DataSet>
</ProcessingRequest>
</ProcessingRequests>
</ns2:SciPipeRequest>
"""
old_ppr = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ns2:SciPipeRequest xmlns:ns2="Common/pipelinescience/SciPipeRequest">
<ProjectSummary>
<ProposalCode>VLA/{{projectCode}}</ProposalCode>
<Observatory>NRAO</Observatory>
<Telescope>VLA</Telescope>
<ProcessingSite>Socorro</ProcessingSite>
<Operator>vlapipe</Operator>
<Mode>SCIENCE</Mode>
<Version>NGRH-ALMA-10_8</Version>
<CreationTime>{{created_at}}</CreationTime>
</ProjectSummary>
<ProjectStructure>TBD</ProjectStructure>
<ProcessingRequests>
<RootDirectory>{{root_directory}}</RootDirectory>
<ProcessingRequest>
<ProcessingIntents/>
<ProcessingProcedure>
<ProcedureTitle>Workspaces Restore Imaging</ProcedureTitle>
<ProcessingCommand>
<Command>hifv_restoredata</Command>
<ParameterSet/>
</ProcessingCommand>
<ProcessingCommand>
<Command>hifv_statwt</Command>
<ParameterSet/>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">breakpoint</Command>
<ParameterSet>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_mstransform</Command>
<ParameterSet>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_checkproductsize</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">maximsize</Keyword>
<Value xmlns="">16384</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_makeimlist</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">specmode</Keyword>
<Value xmlns="">cont</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hif_makeimages</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">hm_masking</Keyword>
<Value xmlns="">none</Value>
</Parameter>
<Parameter>
<Keyword xmlns="">hm_cyclefactor</Keyword>
<Value xmlns="">3.0</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
<ProcessingCommand>
<Command xmlns="">hifv_exportdata</Command>
<ParameterSet>
<Parameter>
<Keyword xmlns="">imaging_products_only</Keyword>
<Value xmlns="">True</Value>
</Parameter>
</ParameterSet>
</ProcessingCommand>
</ProcessingProcedure>
<DataSet>
<RelativePath>{{relative_path}}</RelativePath>
<SdmIdentifier>{{sdmId}}</SdmIdentifier>
<DataType>asdm</DataType>
</DataSet>
</ProcessingRequest>
</ProcessingRequests>
</ns2:SciPipeRequest>
"""
def upgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{ppr}' WHERE filename='PPR.xml' AND workflow_name='std_restore_imaging'
"""
)
def downgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_ppr}' WHERE filename='PPR.xml' AND workflow_name='std_restore_imaging'
"""
)
......@@ -4,7 +4,7 @@
File containing definitions for the other half of the capability side of the Workspaces REST API,
concerning capability requests
"""
import json
from pyramid.httpexceptions import HTTPBadRequest, HTTPNotFound, HTTPPreconditionFailed
from pyramid.request import Request
from pyramid.response import Response
......@@ -126,19 +126,25 @@ def create_follow_on_capability_request(request: Request) -> Response:
followon_type = request.matchdict["followon_type"]
capability_request = request.capability_info.lookup_capability_request(request_id)
parameters = capability_request.current_execution.parameters
parameters = capability_request.current_version.parameters
user_email = parameters["user_email"]
previous_workflow_id = capability_request.current_execution.current_workflow_request_id
metadata = request.workflow_service.retrieve_file_content(
metadata_content = request.workflow_service.retrieve_file_content(
followon_type, previous_workflow_id, "metadata.json"
)
cms_path = metadata["cms_path"]
sdm_id = metadata["fileSetIds"]
metadata = json.loads(metadata_content)
cms_path = metadata["destinationDirectory"] + "/working"
sdm_id = (
metadata["fileSetIds"][0]
if isinstance(metadata["fileSetIds"], list)
else metadata["fileSetIds"]
)
new_capability_request = request.capability_service.create_request(
followon_type, parameters={"cmsPath": cms_path, "sdmId": sdm_id, "user_email": user_email}
followon_type, parameters={"cms_path": cms_path, "sdmId": sdm_id, "user_email": user_email}
)
return Response(json_body=new_capability_request.__json__())
......
......@@ -24,7 +24,7 @@ QA_DIR=/lustre/aoc/cluster/pipeline/"$CAPO_PROFILE"/workspaces/qa2
WEBLOG_DIR=/lustre/aoc/cluster/pipeline/"$CAPO_PROFILE"/workspaces/cache
IMAGE_QA_DIR=/lustre/aoc/cluster/pipeline/"$CAPO_PROFILE"/workspaces/image-qa
STAGING_DIR=lustre/aoc/cluster/pipeline/"$CAPO_PROFILE"/workspaces/staging
STORAGE_DIR=lustre/aoc/cluster/pipeline/"$CAPO_PROFILE"/workspaces/staging
STORAGE_DIR=lustre/aoc/cluster/pipeline/"$CAPO_PROFILE"/workspaces/storage
mkdir -p "$SPOOL_DIR"
mkdir -p "$QA_DIR"
mkdir -p "$WEBLOG_DIR"
......
......@@ -252,7 +252,10 @@ class WorkflowRequestRestService:
# 2. create ingestion workflow request
ingest_type = "ingest_cal" if "calibration" in self.request.matchdict["name"] else "ingest"
ingest_request = self.request.info.create_workflow_request(ingest_type)
ingest_request = self.request.info.create_workflow_request(
workflow=ingest_type,
argument={"parent_wf_request_id": self.request.matchdict["request_id"]},
)
# 3. attach metadata.json to ingestion wf request
self.request.workflows.attach_file_to_request(
request=ingest_request, filename=file.filename, content=file.content
......
......@@ -17,9 +17,17 @@ class TestCapabilityService:
mock_capability_info: CapabilityInfo,
mock_capability_execution: CapabilityExecution,
):
fake_subject = {
"type": "WorkflowRequest",
"workflow_request_id": -1,
"execution_wf_id": "-1",
}
fake_ingestion_complete_msg = {
"subject": fake_subject,
"service": "workflow",
"routing_key": "workflow",
"type": "ingestion-complete",
"workflow_request_id": "-1",
}
save_entity_old_call_count = mock_capability_info.save_entity.call_count
mock_capability_service.on_ingestion_complete(**fake_ingestion_complete_msg)
......
......@@ -272,7 +272,7 @@ class CapabilityInfo(CapabilityInfoIF):
return (
self.session.query(CapabilityExecution)
.filter_by(current_workflow_request_id=workflow_request_id)
.one()
.first()
)
def requests_for_capability(self, capability_name: str) -> List[CapabilityRequest]:
......
......@@ -188,7 +188,7 @@ class CapabilityService(CapabilityServiceIF):
}
self.message_router.send_message(**step_complete_msg)
@on_message(type="ingestion-complete")
@on_message(service="workflow", type="ingestion-complete")
def on_ingestion_complete(self, **message: Dict[str, str]):
"""
Catch an ingestion-complete message sent from ingest_envoy and update the ingested flag
......@@ -196,9 +196,11 @@ class CapabilityService(CapabilityServiceIF):
:param message: Ingestion-complete message
"""
logger.info(f"RECEIVED INGESTION COMPLETE MESSAGE: {message}")
execution = self.capability_info.lookup_execution_by_workflow_request_id(
int(message["workflow_request_id"])
int(message["subject"]["execution_wf_id"])
)
request = self.capability_info.lookup_capability_request(execution.capability_request_id)
request = self.capability_info.lookup_capability_request(execution.capability_request.id)
request.ingested = True
self.capability_info.save_entity(request)
......@@ -81,13 +81,18 @@ class ExecutionManager:
execution = self.capability_info.lookup_execution_by_workflow_request_id(
subject["workflow_request_id"]
)
step_complete_msg = {
"service": "capability",
"routing_key": "capability",
"subject": execution.__json__(),
"type": "step-complete",
}
self.message_router.send_message(**step_complete_msg)
if execution:
step_complete_msg = {
"service": "capability",
"routing_key": "capability",
"subject": execution.__json__(),
"type": "step-complete",
}
self.message_router.send_message(**step_complete_msg)
else:
logger.info(
f'No capability execution found for workflow id {subject["workflow_request_id"]}'
)
@on_message(service="workflow", type="workflow-failed")
def on_workflow_failed(self, **message: Dict):
......@@ -102,16 +107,21 @@ class ExecutionManager:
subject["workflow_request_id"]
)
execution.state = ExecutionState.Failed.name
execution_failed_msg = {
"service": "capability",
"routing_key": "capability",
"subject": execution.__json__(),
"type": "execution-failed",
}
self.message_router.send_message(**execution_failed_msg)
self.capability_info.save_execution(execution)
if execution:
execution.state = ExecutionState.Failed.name
execution_failed_msg = {
"service": "capability",
"routing_key": "capability",
"subject": execution.__json__(),
"type": "execution-failed",
}
self.message_router.send_message(**execution_failed_msg)
self.capability_info.save_execution(execution)
else:
logger.info(
f'No capability execution found for workflow id {subject["workflow_request_id"]}'
)
@on_message(message="Ingestion complete")
def on_ingestion_complete(self, **message: Dict):
......
......@@ -6,21 +6,35 @@ reacts to certain events by triggering actions and going into another state.
"""
import abc
import json
from typing import Optional
class State(abc.ABC):
"""
A state that a machine could reside in.
A state has a suite of transitions to other states. When an event comes in, we match against it; if we find a
matching pattern, we perform that transition to another state.
"""
@abc.abstractmethod
def matches(self, other: "State") -> bool:
"""
This is most likely implemented by doing a string-equality test.
:param other: the other state to compare to
:return: true if we and the other state match
"""
pass
def __init__(self, transitions: list["TransitionIF"]):
# We have a bit of a chicken-and-egg problem here, in that the State needs Transitions to be initialized but
# the Transition needs States to be initialized. Going from prototype to production here will mean breaking
# this cycle, possibly by introducing a builder of some kind, but for now we can just pretend that they are
# built successfully somehow.
self.transitions = transitions
def on_event(self, event: dict) -> Optional["State"]:
# Locate the first matching transition
matching_transition = None
for transition in self.transitions:
if transition.matches(event):
matching_transition = transition
break
# take this transition
if matching_transition is not None:
return matching_transition.take()
class Action(abc.ABC):
......@@ -34,6 +48,7 @@ class Action(abc.ABC):
- StartWorkflow(workflow_name, additional_args) that starts a workflow with the
provided name, the event and additional arguments
"""
@abc.abstractmethod
def execute(self):
pass
......@@ -56,20 +71,21 @@ class TransitionIF(abc.ABC):
"""
A transition between states
"""
def __init__(self, from_state: State, to_state: State, pattern: Pattern, action: Action):
self.from_state, self.to_state = from_state, to_state
self.pattern = pattern
self.action = action
@abc.abstractmethod
def matches(self, state: State, event: dict) -> bool:
def matches(self, event: dict) -> bool:
"""
True if this transition is applicable in the supplied state and matches the supplied event.
:param state: state to check against
:param event: event to match against
:return: true if everything matches
"""
return self.from_state.matches(state) and self.pattern.matches(event)
return self.pattern.matches(event)
@abc.abstractmethod
def take(self) -> State:
......@@ -88,6 +104,7 @@ class MealyMachine:
I am a state machine for a given capability. I am responsible for handling events
and transitioning to other states.
"""
def __init__(self):
self.transitions = []
self.current_state: State = None
......@@ -122,6 +139,7 @@ class CapabilityInfoForMachines:
This is a demonstration of the sort of query I expect we'll use to locate executions
that are active and need to be acted on in response to an event of some kind.
"""
def find_requests_matching_transition(self, event: dict) -> list["CapabilityExecution"]:
"""
The concept here is to let the database do the heavy lifting and actually tell us
......@@ -131,7 +149,8 @@ class CapabilityInfoForMachines:
:param event: the event to check
:return: a list of matching capability executions
"""
return self.session.query("""
return self.session.query(
"""
SELECT *
FROM transitions t
JOIN machines m ON t.machine_id = m.id
......@@ -139,30 +158,36 @@ class CapabilityInfoForMachines:
JOIN capability_requests cr on cr.capability_name = c.name
JOIN capability_executions ce on cr.capability_request_id = ce.capability_request_id
WHERE %(event)s @? t.pattern AND ce.state = t.from_state
""", {"event": json.dumps(event)})
""",
{"event": json.dumps(event)},
)
def build_tables(self):
"""
This is just a demonstration method to hold some SQL to demo the tables I have
in mind for this system.
"""
self.session.execute("""
self.session.execute(
"""
CREATE TABLE machines(id serial primary key);
CREATE TABLE actions(id serial primary key, action_type varchar, action_arguments json);
CREATE TABLE transitions (
id serial primary key,
machine_id integer references(machines),
machine_id integer references machines(id),
from_state varchar,
to_state varchar,
pattern jsonpath,
action_id integer references(actions)
action_id integer references actions(id)
);
""")
"""
)
class CapabilityExecution:
machine: MealyMachine = None
def process(self, event):
self.machine.on_event(event)
......@@ -173,6 +198,7 @@ class CapabilityServiceMachineMananger:
machines alive during the execution of the program. The idea here is to be more
efficient and more event-driven.
"""
def __init__(self):
self.info = CapabilityInfoForMachines()
......