diff --git a/schema/versions/9507363ddab7_.py b/schema/versions/9507363ddab7_.py new file mode 100644 index 0000000000000000000000000000000000000000..24f0d615d216bf599134ab468dd4f7f815fc7102 --- /dev/null +++ b/schema/versions/9507363ddab7_.py @@ -0,0 +1,24 @@ +"""empty message + +Revision ID: 9507363ddab7 +Revises: ced8e001d262, f73dff1b2e6f +Create Date: 2021-11-12 11:25:07.715543 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '9507363ddab7' +down_revision = ('ced8e001d262', 'f73dff1b2e6f') +branch_labels = None +depends_on = None + + +def upgrade(): + pass + + +def downgrade(): + pass diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index c80f0ac4e891a4a8446c8dc72e4a94610281a15e..175c3955f836f037b5da02f74e1dcf679faaa792 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -293,6 +293,30 @@ class WorkflowRequestRestService: # 4. submit ingestion workflow request self.request.workflows.execute(ingest_request) + @view_config(request_method="POST", route_name="qa_fail") + def qa_fail(self): + """ + Perform qa_fail on specified workflow + :return: + """ + print(f"QA Failing workflow request #{self.request.context}") + + # 1. retrieve metadata.json for workflow request + self.request.matchdict["filename"] = "metadata.json" + file = lookup_file(request=self.request) + + # 2. create qa fail workflow request + fail_request = self.request.info.create_workflow_request( + workflow="qa_fail", + argument={"parent_wf_request_id": self.request.matchdict["request_id"]}, + ) + # 3. attach metadata.json to fail wf request + self.request.workflows.attach_file_to_request( + request=fail_request, filename=file.filename, content=file.content + ) + # 4. submit ingestion workflow request + self.request.workflows.execute(fail_request) + @view_config(request_method="POST", route_name="send_carta_url_to_aat") def send_carta_url_to_aat(self): """ diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 6a8d091244fd4702b356d23e2815453866c9658f..d44f092390ad96027b55838628cfc68580818f23 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -598,6 +598,7 @@ class WorkflowMessageHandler: self.message_router.register(self) self.messenger = MessageSender("workflow") self.archive_messenger = MessageSender("archive") + self.url = CapoConfig().settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl @on_message(service="workflow", type="workflow-complete") def propagate_delivery(self, **message: Dict): @@ -650,6 +651,50 @@ class WorkflowMessageHandler: ) self.messenger.send_message(**ingestion_complete_msg) + @on_message(service="workflow", type="qa_pass") + def perform_qa_pass(self, **message: Dict): + """ + Start qa_fail workflow based on QA Fail message + + :param message: + :return: + """ + + subject = message["subject"] + wf_req_id = subject["workflow_request_id"] + wf_name = subject["workflow_name"] + + response = requests.post(f"{self.url}/workflows/{wf_name}/requests/{wf_req_id}/ingest") + logger.info( + "Got result %s with type %s and body %s", + response, + type(response), + response.content, + ) + return response.json() + + @on_message(service="workflow", type="qa_fail") + def perform_qa_fail(self, **message: Dict): + """ + Start qa_fail workflow based on QA Fail message + + :param message: + :return: + """ + + subject = message["subject"] + wf_req_id = subject["workflow_request_id"] + wf_name = subject["workflow_name"] + + response = requests.post(f"{self.url}/workflows/{wf_name}/requests/{wf_req_id}/qa_fail") + logger.info( + "Got result %s with type %s and body %s", + response, + type(response), + response.content, + ) + return response.json() + @on_message(service="workflow") def on_workflow_event(self, **message: Dict): """