Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (3)
"""empty message
Revision ID: 9507363ddab7
Revises: ced8e001d262, f73dff1b2e6f
Create Date: 2021-11-12 11:25:07.715543
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9507363ddab7'
down_revision = ('ced8e001d262', 'f73dff1b2e6f')
branch_labels = None
depends_on = None
def upgrade():
pass
def downgrade():
pass
"""add qa_fail workflow templates
Revision ID: f73dff1b2e6f
Revises: acfdeb6777cb
Create Date: 2021-11-12 09:28:24.875315
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "f73dff1b2e6f"
down_revision = "acfdeb6777cb"
branch_labels = None
depends_on = None
script_content = """#!/bin/sh
set -o errexit
export HOME=$TMPDIR
./conveyor --qa-fail $1
"""
condor = """executable = qa_fail.sh
arguments = metadata.json
output = qa_fail.out
error = qa_fail.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/conveyor, nraorsync://$(SPOOL_DIR)/metadata.json
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
queue
"""
def upgrade():
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('qa_fail.sh', E'{script_content}', 'qa_fail');
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('qa_fail.condor', E'{condor}', 'qa_fail');
"""
)
def downgrade():
op.execute("DELETE FROM workflow_templates WHERE filename IN ('qa_fail.sh', 'qa_fail.condor')")
......@@ -293,6 +293,30 @@ class WorkflowRequestRestService:
# 4. submit ingestion workflow request
self.request.workflows.execute(ingest_request)
@view_config(request_method="POST", route_name="qa_fail")
def qa_fail(self):
"""
Perform qa_fail on specified workflow
:return:
"""
print(f"QA Failing workflow request #{self.request.context}")
# 1. retrieve metadata.json for workflow request
self.request.matchdict["filename"] = "metadata.json"
file = lookup_file(request=self.request)
# 2. create qa fail workflow request
fail_request = self.request.info.create_workflow_request(
workflow="qa_fail",
argument={"parent_wf_request_id": self.request.matchdict["request_id"]},
)
# 3. attach metadata.json to fail wf request
self.request.workflows.attach_file_to_request(
request=fail_request, filename=file.filename, content=file.content
)
# 4. submit ingestion workflow request
self.request.workflows.execute(fail_request)
@view_config(request_method="POST", route_name="send_carta_url_to_aat")
def send_carta_url_to_aat(self):
"""
......
......@@ -32,6 +32,7 @@ import requests
import sqlalchemy as sa
from pycapo import CapoConfig
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.orderinglist import ordering_list
from sqlalchemy.orm import relationship
from workspaces.capability.enums import (
......@@ -419,7 +420,12 @@ class CapabilityRequest(Base, CapabilityRequestIF):
nullable=False,
)
versions = relationship("CapabilityVersion", back_populates="request")
versions = relationship(
"CapabilityVersion",
back_populates="request",
order_by="CapabilityVersion.version_number",
collection_class=ordering_list("version_number"),
)
capability = relationship(Capability, back_populates="requests")
def update_state(self, state: CapabilityRequestState):
......
......@@ -232,12 +232,11 @@ class WorkflowService(WorkflowServiceIF):
"""
wf_request = self.info.lookup_workflow_request(request_id)
if wf_request.workflow_name == ArchiveWorkflows.DO_NOT_CALIBRATE.value:
logger.info(f"SENDING 'DO NOT CALIBRATE' MESSAGE to AAT for request #{request_id}!")
dnc_msg = ArchiveMessageArchitect(
routing_key="qa-script.fail", request=request_id, sdm_id=sdm_id
).compose_message("qa_fail")
self.archive_messenger.send_message(**dnc_msg)
logger.info(f"SENDING 'DO NOT CALIBRATE' MESSAGE to AAT for request #{request_id}!")
dnc_msg = ArchiveMessageArchitect(
routing_key="qa-script.fail", request=wf_request, sdm_id=sdm_id
).compose_message("qa_fail")
self.archive_messenger.send_message(**dnc_msg)
def execute(self, request: WorkflowRequest):
"""
......@@ -599,6 +598,7 @@ class WorkflowMessageHandler:
self.message_router.register(self)
self.messenger = MessageSender("workflow")
self.archive_messenger = MessageSender("archive")
self.url = CapoConfig().settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl
@on_message(service="workflow", type="workflow-complete")
def propagate_delivery(self, **message: Dict):
......@@ -651,6 +651,50 @@ class WorkflowMessageHandler:
)
self.messenger.send_message(**ingestion_complete_msg)
@on_message(service="workflow", type="qa_pass")
def perform_qa_pass(self, **message: Dict):
"""
Start qa_fail workflow based on QA Fail message
:param message:
:return:
"""
subject = message["subject"]
wf_req_id = subject["workflow_request_id"]
wf_name = subject["workflow_name"]
response = requests.post(f"{self.url}/workflows/{wf_name}/requests/{wf_req_id}/ingest")
logger.info(
"Got result %s with type %s and body %s",
response,
type(response),
response.content,
)
return response.json()
@on_message(service="workflow", type="qa_fail")
def perform_qa_fail(self, **message: Dict):
"""
Start qa_fail workflow based on QA Fail message
:param message:
:return:
"""
subject = message["subject"]
wf_req_id = subject["workflow_request_id"]
wf_name = subject["workflow_name"]
response = requests.post(f"{self.url}/workflows/{wf_name}/requests/{wf_req_id}/qa_fail")
logger.info(
"Got result %s with type %s and body %s",
response,
type(response),
response.content,
)
return response.json()
@on_message(service="workflow")
def on_workflow_event(self, **message: Dict):
"""
......