Skip to content
Snippets Groups Projects
Commit 8cc607e2 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

QA Fail/Pass workflow fixes

parent b181b62d
No related branches found
No related tags found
1 merge request!651QA Fail/Pass workflow fixes
Pipeline #3651 passed
"""empty message
Revision ID: 9507363ddab7
Revises: ced8e001d262, f73dff1b2e6f
Create Date: 2021-11-12 11:25:07.715543
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '9507363ddab7'
down_revision = ('ced8e001d262', 'f73dff1b2e6f')
branch_labels = None
depends_on = None
def upgrade():
pass
def downgrade():
pass
......@@ -293,6 +293,30 @@ class WorkflowRequestRestService:
# 4. submit ingestion workflow request
self.request.workflows.execute(ingest_request)
@view_config(request_method="POST", route_name="qa_fail")
def qa_fail(self):
"""
Perform qa_fail on specified workflow
:return:
"""
print(f"QA Failing workflow request #{self.request.context}")
# 1. retrieve metadata.json for workflow request
self.request.matchdict["filename"] = "metadata.json"
file = lookup_file(request=self.request)
# 2. create qa fail workflow request
fail_request = self.request.info.create_workflow_request(
workflow="qa_fail",
argument={"parent_wf_request_id": self.request.matchdict["request_id"]},
)
# 3. attach metadata.json to fail wf request
self.request.workflows.attach_file_to_request(
request=fail_request, filename=file.filename, content=file.content
)
# 4. submit ingestion workflow request
self.request.workflows.execute(fail_request)
@view_config(request_method="POST", route_name="send_carta_url_to_aat")
def send_carta_url_to_aat(self):
"""
......
......@@ -598,6 +598,7 @@ class WorkflowMessageHandler:
self.message_router.register(self)
self.messenger = MessageSender("workflow")
self.archive_messenger = MessageSender("archive")
self.url = CapoConfig().settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl
@on_message(service="workflow", type="workflow-complete")
def propagate_delivery(self, **message: Dict):
......@@ -650,6 +651,50 @@ class WorkflowMessageHandler:
)
self.messenger.send_message(**ingestion_complete_msg)
@on_message(service="workflow", type="qa_pass")
def perform_qa_pass(self, **message: Dict):
"""
Start qa_fail workflow based on QA Fail message
:param message:
:return:
"""
subject = message["subject"]
wf_req_id = subject["workflow_request_id"]
wf_name = subject["workflow_name"]
response = requests.post(f"{self.url}/workflows/{wf_name}/requests/{wf_req_id}/ingest")
logger.info(
"Got result %s with type %s and body %s",
response,
type(response),
response.content,
)
return response.json()
@on_message(service="workflow", type="qa_fail")
def perform_qa_fail(self, **message: Dict):
"""
Start qa_fail workflow based on QA Fail message
:param message:
:return:
"""
subject = message["subject"]
wf_req_id = subject["workflow_request_id"]
wf_name = subject["workflow_name"]
response = requests.post(f"{self.url}/workflows/{wf_name}/requests/{wf_req_id}/qa_fail")
logger.info(
"Got result %s with type %s and body %s",
response,
type(response),
response.content,
)
return response.json()
@on_message(service="workflow")
def on_workflow_event(self, **message: Dict):
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment