From 50d2c408ca5e9fb8e2af709a88719d1f820f38b7 Mon Sep 17 00:00:00 2001 From: "Janet L. Goldstein" <jgoldste@nrao.edu> Date: Mon, 1 Nov 2021 13:02:30 -0600 Subject: [PATCH] WS-765: * removed archive workflow enum value added in error * implemented call to "do not calibrate" REST endpoint --- .../pexable/conveyor/conveyor/retrieve.py | 12 ++++++----- .../pexable/conveyor/test/test_conveyor.py | 20 +++++++++++-------- .../test/test_workflow_service_rest_api.py | 4 +++- services/workflow/workflow/server.py | 12 ++++++++++- shared/workspaces/workspaces/workflow/enum.py | 1 - .../workflow/services/workflow_service.py | 17 ++++++++-------- 6 files changed, 41 insertions(+), 25 deletions(-) diff --git a/apps/cli/executables/pexable/conveyor/conveyor/retrieve.py b/apps/cli/executables/pexable/conveyor/conveyor/retrieve.py index 00e583f4b..1007bc9d7 100644 --- a/apps/cli/executables/pexable/conveyor/conveyor/retrieve.py +++ b/apps/cli/executables/pexable/conveyor/conveyor/retrieve.py @@ -22,6 +22,8 @@ import shutil from pathlib import Path from typing import Dict, List +import requests + from .interfaces import ConveyorIF """ @@ -53,16 +55,16 @@ class RetrievalConveyor(ConveyorIF): self.logger.info(f"Directories from {qa_path} have already been retrieved! Continuing...") if self.action == "QA Fail Cleanup": - self.send_qa_fail_msg() + self.send_qa_fail_msg(self.settings["sdm_ids"]) - def send_qa_fail_msg(self): + def send_qa_fail_msg(self, sdm_id: str): """ - TODO - Calls REST endpoint that shoots off QA Fail message + Call REST endpoint that shoots off QA Fail message :return: """ - print("Ready to send QA Fail message!") + url = f"/workflows/carta/requests/{self.settings['request_id']}/send-do-not-calibrate/{sdm_id}" + requests.post(url, json={"fileSetIds": sdm_id}) def break_symlinks(self, spool_path: Path, dir_list: List[str]): self.logger.info(f"Breaking symlinks between qa2 and spool for directory {spool_path.stem}...") diff --git a/apps/cli/executables/pexable/conveyor/test/test_conveyor.py b/apps/cli/executables/pexable/conveyor/test/test_conveyor.py index f3513f4c1..f79ccd288 100644 --- a/apps/cli/executables/pexable/conveyor/test/test_conveyor.py +++ b/apps/cli/executables/pexable/conveyor/test/test_conveyor.py @@ -90,11 +90,15 @@ class TestConveyor: :return: """ args.qa_fail = [TEST_JSON] - with patch("os.chdir"): - with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)): - with patch("conveyor.retrieve.RetrievalConveyor.convey") as mock_convey: - con.main() - assert mock_convey.call_count == 1 - - # reset for other testing - args.qa_fail = None + args.fileSetIds = "brain_000.58099.67095825232" + + try: + with patch("os.chdir"): + with patch("argparse.ArgumentParser.parse_args", MagicMock(return_value=args)): + with patch("conveyor.retrieve.RetrievalConveyor.convey") as mock_convey: + con.main() + assert mock_convey.call_count == 1 + finally: + # reset for other testing + args.qa_fail = None + args.fileSetIds = None diff --git a/services/workflow/test/test_workflow_service_rest_api.py b/services/workflow/test/test_workflow_service_rest_api.py index 6e7d5df75..c1d03d932 100644 --- a/services/workflow/test/test_workflow_service_rest_api.py +++ b/services/workflow/test/test_workflow_service_rest_api.py @@ -197,15 +197,17 @@ def test_send_do_not_calibate_msg_to_aat(workflow_request_request: DummyRequest) # Init dummy request values matchdict_savepoint = workflow_request_request.matchdict body_savepoint = workflow_request_request.json_body + request_id = -1 workflow_request_request.matchdict["request_id"] = request_id sdm_id = "brain_000.58099.67095825232" + workflow_request_request.json_body["sdm_id"] = sdm_id # Mock out WorkflowService method call side effect workflow_request_request.workflows.send_do_not_calibrate = MagicMock() expected_response = f"SUCCESS for request #{request_id}: Sent 'Do Not Calibrate' for {sdm_id} to AAT" try: - response = WorkflowRequestRestService(workflow_request_request).send_do_not_calibrate(request_id, sdm_id) + response = WorkflowRequestRestService(workflow_request_request).send_do_not_calibrate() workflow_request_request.workflows.send_do_not_calibrate.assert_called_with(request_id, sdm_id) assert response.status_code == http.HTTPStatus.OK assert response.text == expected_response diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index c9fdaea87..15fe64487 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -326,12 +326,14 @@ class WorkflowRequestRestService: ) @view_config(request_method="POST", route_name="send_do_not_calibrate") - def send_do_not_calibrate(self, request_id: str, sdm_id: str): + def send_do_not_calibrate(self): """ Pyramid view that sends a "Do Not Calibrate" message to the Workspaces system return: 200 (OK) HTTP Response """ + request_id = self.request.matchdict["request_id"] + sdm_id = self.request.json_body["sdm_id"] self.request.workflows.send_do_not_calibrate(request_id, sdm_id) return Response( status_code=http.HTTPStatus.OK, @@ -570,6 +572,14 @@ def main(global_config, **settings): "/workflows/{name}/requests/{request_id}/ingest", factory=lookup_request, ) + + # Use this route to send a Do Not Calibrate message to the AAT system + config.add_route( + "send_do_not_calibrate", + "/workflows/carta/requests/{request_id}/send-do-not-calibrate/{sdm_id}", + factory=lookup_request, + ) + # Use this route to send a CARTA envoy URL to the AAT system config.add_route( "send_carta_url_to_aat", diff --git a/shared/workspaces/workspaces/workflow/enum.py b/shared/workspaces/workspaces/workflow/enum.py index ce7962c79..978b0d951 100644 --- a/shared/workspaces/workspaces/workflow/enum.py +++ b/shared/workspaces/workspaces/workflow/enum.py @@ -40,7 +40,6 @@ class ArchiveWorkflows(Enum): CARTA = "carta" SECI = "vlass_seci" - DO_NOT_CALIBRATE = "do_not_calibrate" @classmethod def values(cls): diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index d0deb2a7e..ae94bde18 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -675,11 +675,13 @@ class WorkflowMessageHandler: self.clean_workflow(request) elif message["type"] == "delivery" or message["type"] == "ingestion-complete": status = WorkflowRequestState.Complete.name + + elif message["type"] == "qa-fail": + self.send_do_not_calibrate(request.workflow_request_id, message["sdm_id"]) + else: status = "Unknown" - logger.info(f">>> message type: {message['type']}") - raise ValueError() logger.info( "Updating state on workflow request %s to %s...", request.workflow_request_id, @@ -730,22 +732,19 @@ class WorkflowMessageHandler: ) self.archive_messenger.send_message(**seci_msg) - def send_do_not_calibrate_message(self, **message: Dict): + def send_do_not_calibrate(self, request_id: int, sdm_id: str): """ Tell the archive to set specified SDM to "Do Not Calibrate". :param message: message containing key-value pairs :return: """ - subject = message["subject"] - wf_id = subject["workflow_request_id"] - sdm_id = subject["fileset_id"] - wf_request = self.info.lookup_workflow_request(wf_id) + wf_request = self.info.lookup_workflow_request(request_id) if wf_request.workflow_name == ArchiveWorkflows.DO_NOT_CALIBRATE.value: - logger.info(f"SENDING 'DO NOT CALIBRATE' MESSAGE to AAT for request #{wf_id}!") + logger.info(f"SENDING 'DO NOT CALIBRATE' MESSAGE to AAT for request #{request_id}!") dnc_msg = ArchiveMessageArchitect( - routing_key="qa-script.fail", request=wf_id, sdm_id=sdm_id + routing_key="qa-script.fail", request=request_id, sdm_id=sdm_id ).compose_message("qa_fail") self.archive_messenger.send_message(**dnc_msg) -- GitLab