Skip to content
Snippets Groups Projects
Commit f90bfa13 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

WS-551: adding Restored CMS delivery functionality

parent 924f662e
No related branches found
No related tags found
1 merge request!358WS-551: fix delivery to support restores
Pipeline #2256 failed
......@@ -8,7 +8,7 @@ import os
import pathlib
from typing import Iterator
from .products import ExecutionBlock, SpooledProduct
from .products import ExecutionBlock, SpooledProduct, CalibratedMS
class ProductFinder(abc.ABC):
......@@ -46,5 +46,7 @@ class HeuristicProductFinder(ProductFinder):
:return: List of products
"""
for subdir in self.path.glob("*"):
if subdir.is_dir() and not subdir.name.startswith("tmp"):
if subdir.is_dir() and subdir.name.endswith(".ms"):
yield CalibratedMS(subdir)
elif subdir.is_dir() and not subdir.name.startswith("tmp"):
yield ExecutionBlock(subdir)
import abc
import glob
import pathlib
import re
from typing import Dict, Tuple
......@@ -59,6 +60,23 @@ class ExecutionBlock(SpooledProduct):
return f"execution block {self.eb_name}"
class CalibratedMS(SpooledProduct):
"""
A Calibrated Measurement Set is the end result of a restore capability request and is used as an input to imaging.
Deliver all files in the request's products directory.
"""
@property
def cms_name(self):
return self.path.absolute().name
def deliver_to(self, destination: Destination):
destination.add_directory(self.path, self.cms_name[:-3])
def __str__(self):
return f"calibrated MS {self.cms_name}"
# Future types of product that might be needed:
#
class Calibration(SpooledProduct):
......
......@@ -79,7 +79,7 @@ class VelaProduct:
sdm_id = (
parameters["fileSetIds"][0]
if isinstance(parameters["workflowName"], list)
if isinstance(parameters["fileSetIds"], list)
else parameters["fileSetIds"]
)
filename = sdm_id + ".ms.calapply.txt"
......@@ -120,6 +120,9 @@ class VelaProduct:
for i in range(0, 5):
dirname = f"FORGED_MS_DIR_{i}"
os.mkdir(dirname)
dir_file = open(f"./{dirname}/table.f{i}", "x")
dir_file.write("I am a file.")
dir_file.close()
file = open(f"table.f{i}", "x")
file.write("I am a table.")
file.close()
......
......@@ -8,7 +8,7 @@
></app-create-new-version-form>
</div>
<div class="col-auto d-flex" *ngIf="capabilityRequest.state === 'Complete' &&
capabilityRequest.capability_name === 'std_calibration' || capabilityRequest.capability_name === 'restore_cms'">
(capabilityRequest.capability_name === 'std_calibration' || capabilityRequest.capability_name === 'restore_cms')">
<button
id="create-image-request"
type="button"
......
"""restore corrections
Revision ID: fd68d858d354
Revises: 87dae5acfd34
Create Date: 2021-07-20 10:45:50.997748
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "fd68d858d354"
down_revision = "87dae5acfd34"
branch_labels = None
depends_on = None
restore_condor_content = """executable = restore_cms.sh
arguments = {{product_locator}} {{cal_locator}} {{request_id}} metadata.json PPR.xml
output = restore.out
error = restore.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/deliver, ./PPR.xml, ./metadata.json
transfer_output_files = working, rawdata, products, delivery.json
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
restore_img_content = """executable = std_restore_imaging.sh
arguments = {{product_locator}} {{cal_product_locator}} {{request_id}} metadata.json PPR.xml
output = restore.out
error = restore.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/deliver, ./PPR.xml, ./metadata.json
transfer_output_files = working, rawdata, products, delivery.json
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
old_restore_img = """executable = std_restore_imaging.sh
arguments = {{product_locator}} {{cal_product_locator}} {{request_id}} metadata.json PPR.xml
output = restore.out
error = restore.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/deliver, ./PPR.xml, ./metadata.json
transfer_output_files = working, rawdata, products
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
old_restore_condor = """executable = restore_cms.sh
arguments = {{product_locator}} {{cal_locator}} {{request_id}} metadata.json PPR.xml
output = restore.out
error = restore.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/deliver, ./PPR.xml, ./metadata.json
transfer_output_files = working, rawdata, products
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
def upgrade():
op.execute(
"""
UPDATE capabilities
SET capability_steps='prepare-and-run-workflow restore_cms\nawait-workflow'
WHERE capability_name='restore_cms'
"""
)
op.execute(
"""
UPDATE capabilities
SET capability_steps='prepare-and-run-workflow std_restore_imaging\nawait-workflow'
WHERE capability_name='std_restore_imaging'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{restore_condor_content}' WHERE filename='restore_cms.condor'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{restore_img_content}' WHERE filename='std_restore_imaging.condor'
"""
)
def downgrade():
op.execute(
"""
UPDATE capabilities
SET capability_steps='prepare-and-run-workflow restore_cms\nawait-workflow\nawait-qa'
WHERE capability_name='restore_cms'
"""
)
op.execute(
"""
UPDATE capabilities
SET capability_steps='prepare-and-run-workflow std_restore_imaging\nawait-workflow\nawait-qa'
WHERE capability_name='std_restore_imaging'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_restore_condor}' WHERE filename='restore_cms.condor'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_restore_img}' WHERE filename='std_restore_imaging.condor'
"""
)
......@@ -483,6 +483,8 @@ class WorkflowService(WorkflowServiceIF):
status = WorkflowRequestState.Complete.name
elif message["type"] == "workflow-failed":
status = WorkflowRequestState.Failed.name
elif message["type"] == "delivery":
status = WorkflowRequestState.Complete.name
else:
status = "Unknown"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment