diff --git a/apps/cli/executables/pexable/carta_envoy/carta_envoy/connect.py b/apps/cli/executables/pexable/carta_envoy/carta_envoy/connect.py index d5e1c77c785c58aff17936a57c529275c0ef7b65..867405bf0bb64473afdc31b1de2a81aa7ca359b9 100644 --- a/apps/cli/executables/pexable/carta_envoy/carta_envoy/connect.py +++ b/apps/cli/executables/pexable/carta_envoy/carta_envoy/connect.py @@ -274,6 +274,7 @@ class NotificationConnect: def send_session_expired(self): """ Notifies of expired CARTA session + :return: """ if "user_email" not in self.settings: @@ -297,14 +298,15 @@ class WorkflowConnect: self.settings = settings self.url = settings["workflow_url"] - def send_carta_url_to_ws(self, wrapper_url: str): + def send_carta_url(self, wrapper_url: str): """ Makes REST call to send message containing CARTA URL to workspaces system + :return: """ - send_url_message = f"{self.settings['workflow_url']}/workflows/carta/requests/{self.settings['wf_request_id']}/send-url-to-ws" - payload = {"carta_url": wrapper_url} - self.logger.info( - "Sending REST call to workflow service for WS CARTA messaging." + send_url_message = ( + f"{self.settings['workflow_url']}/workflows/carta/requests/{self.settings['wf_request_id']}/url" ) + payload = {"carta_url": wrapper_url} + self.logger.info("Sending REST call to workflow service for WS CARTA messaging.") requests.post(send_url_message, json=payload) diff --git a/apps/cli/executables/pexable/carta_envoy/carta_envoy/launchers.py b/apps/cli/executables/pexable/carta_envoy/carta_envoy/launchers.py index 20c19afea39c8285f5a5d306c49aa5aa549dd674..599303e54e9779bc11353eff93a5fe9cb4affee0 100644 --- a/apps/cli/executables/pexable/carta_envoy/carta_envoy/launchers.py +++ b/apps/cli/executables/pexable/carta_envoy/carta_envoy/launchers.py @@ -228,7 +228,7 @@ class CartaLauncher: if self.settings["send_ready"] == "true": self.logger.info(f"User Email") self.notification.send_session_ready(wrapper_url) - self.workflow_connect.send_carta_url_to_ws(wrapper_url) + self.workflow_connect.send_carta_url(wrapper_url) elif self.settings["send_ready"] == "false": self.logger.info(f"AAT Request Handler, with {wrapper_url}") self.archive_connect.send_carta_url_to_rh(wrapper_url) diff --git a/apps/cli/executables/pexable/carta_envoy/test/test_connect.py b/apps/cli/executables/pexable/carta_envoy/test/test_connect.py index 8971b4aa5d1b6be7d308a0a2c470060b3736a726..8ff1959c352a29009f743de45478308d54fef8f9 100644 --- a/apps/cli/executables/pexable/carta_envoy/test/test_connect.py +++ b/apps/cli/executables/pexable/carta_envoy/test/test_connect.py @@ -3,8 +3,10 @@ Tests for carta_envoy.connect """ from unittest.mock import patch +# pylint: disable=E0401, C0103, C0301, C0415, E1101, E1120, R0201, R0903, W0621 + import pytest -from carta_envoy.connect import ArchiveConnect, NotificationConnect, RedisConnect +from carta_envoy.connect import ArchiveConnect, NotificationConnect, RedisConnect, WorkflowConnect from carta_envoy.utilities import generate_random_str test_settings = { @@ -25,7 +27,9 @@ test_settings = { redis_connect = RedisConnect(settings=test_settings) notify_connect = NotificationConnect(settings=test_settings) archive_connect = ArchiveConnect(settings=test_settings) +workflow_connect = WorkflowConnect(settings=test_settings) +CARTA_POST = "carta_envoy.connect.requests.post" @pytest.fixture def redis(): @@ -40,11 +44,23 @@ def redis(): class TestRedisConnect: + "Test our connection to Redis server" + + @pytest.mark.skip("not yet implemented") def test_generate_ids(self): - pass + """ + TODO + + :return: + """ + @pytest.mark.skip("not yet implemented") def test_generate_url(self): - pass + """ + TODO + + :return: + """ def test_get_available_port(self): """ @@ -60,8 +76,13 @@ class TestRedisConnect: print(e) assert False + @pytest.mark.skip("not yet implemented") def test_prepare_redis(self): - pass + """ + TODO + + :return: + """ def test_connect_to_redis_error(self): """ @@ -108,7 +129,11 @@ class TestRedisConnect: :param redis: Custom fixture that provides a sandbox Redis server """ redis_connect.conn = redis - redis_connect.generated_ids = {"front_end_id": "1234abcd", "back_end_id": "5678efgh", "wrapper_id": "9876jklm"} + redis_connect.generated_ids = { + "front_end_id": "1234abcd", + "back_end_id": "5678efgh", + "wrapper_id": "9876jklm", + } redis.set("duplicate", "value") with patch("carta_envoy.connect.RedisConnect.get_redis_values") as values: redis_connect.check_for_duplicate_values({"duplicate": "value"}, 9897, 6543, 1234) @@ -119,8 +144,10 @@ class TestRedisConnect: class TestArchiveConnect: + """Test sending CARTA URL to archive request handler""" + def test_send_carta_url_to_rh(self): - with patch("carta_envoy.connect.requests.post") as mocked_post: + with patch(CARTA_POST) as mocked_post: archive_connect.send_carta_url_to_rh("http://fake.url/for/testing") mocked_post.assert_called_with( f"{test_settings['workflow_url']}/workflows/carta/requests/{test_settings['wf_request_id']}/send-url-to-aat", @@ -129,8 +156,10 @@ class TestArchiveConnect: class TestNotificationConnect: + """Test sending session-ready and session-expired messages""" + def test_send_session_ready(self): - with patch("carta_envoy.connect.requests.post") as mocked_post: + with patch(CARTA_POST) as mocked_post: url = "I am carta" notify_connect.send_session_ready(wrapper_url=url) mocked_post.assert_called_with( @@ -139,7 +168,7 @@ class TestNotificationConnect: ) def test_send_session_expired(self): - with patch("carta_envoy.connect.requests.post") as mocked_post: + with patch(CARTA_POST) as mocked_post: notify_connect.send_session_expired() mocked_post.assert_called_with( f"{test_settings['notification_url']}/notify/carta_expired/send", @@ -147,3 +176,16 @@ class TestNotificationConnect: "destination_email": test_settings["user_email"], }, ) + + +class TestWorkflowConnect: + """Test sending CARTA URL to Workspaces systesm""" + + def test_send_carta_url(self): + with patch(CARTA_POST) as mocked_post: + url = "my_very_own_special_CARTA_session" + workflow_connect.send_carta_url(url) + mocked_post.assert_called_with( + f"{test_settings['workflow_url']}/workflows/carta/requests/{test_settings['wf_request_id']}/url", + json={"carta_url": url}, + ) diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index 9ff9367fab5a0641c19ab4137ffe4624c389606d..91e40ced1bae38904c91ca58080908c1ccad4425 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -335,8 +335,8 @@ class WorkflowRequestRestService: body=f"SUCCESS: Sent CARTA URL {carta_url} to AAT system", ) - @view_config(request_method="POST", route_name="send_carta_url_to_ws") - def send_carta_url_to_ws(self): + @view_config(request_method="POST", route_name="send_carta_url") + def send_carta_url(self): """ Pyramid view that sends a given CARTA URL to the Workspaces system @@ -344,7 +344,7 @@ class WorkflowRequestRestService: """ request_id = self.request.matchdict["request_id"] carta_url = self.request.json_body["carta_url"] - self.request.workflows.send_carta_url_to_ws(request_id, carta_url) + self.request.workflows.send_carta_url(request_id, carta_url) return Response( status_code=http.HTTPStatus.OK, @@ -575,8 +575,8 @@ def main(global_config, **settings): ) # Use this route to send a CARTA envoy URL to the Workspaces system config.add_route( - "send_carta_url_to_ws", - "/workflows/carta/requests/{request_id}/send-url-to-ws", + "send_carta_url", + "/workflows/carta/requests/{request_id}/url", factory=lookup_request, ) diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 0fd02f0b214fdd84972237421432d7f901862344..803170b4567e0e0183701509127b09c8ed9b09aa 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -37,9 +37,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): """This is the workflow service REST client.""" def __init__(self): - self.url = ( - CapoConfig().settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl - ) + self.url = CapoConfig().settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl def execute(self, request: WorkflowRequestIF): """ @@ -105,9 +103,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): """ # 1. Handle the type ambiguity with the workflow argument - workflow_name = ( - workflow if isinstance(workflow, str) else workflow.workflow_name - ) + workflow_name = workflow if isinstance(workflow, str) else workflow.workflow_name # 2. Make the request result = requests.post( @@ -153,13 +149,9 @@ class WorkflowService(WorkflowServiceIF): self.archive_router = message_handler.archive_router self.messenger = message_handler.messenger self.archive_messenger = message_handler.archive_messenger - self.processing_settings = CapoConfig().settings( - "edu.nrao.workspaces.ProcessingSettings" - ) + self.processing_settings = CapoConfig().settings("edu.nrao.workspaces.ProcessingSettings") - def attach_file_to_request( - self, request: WorkflowRequestIF, filename: str, content: bytes - ): + def attach_file_to_request(self, request: WorkflowRequestIF, filename: str, content: bytes): """ Add a file to a request; complain if the filename is bad. @@ -207,7 +199,9 @@ class WorkflowService(WorkflowServiceIF): :param workflow_request_id: ID of carta workflow that wants to send this message :param carta_url: JSON blob with CARTA URL """ - logger.info(f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!") + logger.info( + f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!" + ) wf_request = self.info.lookup_workflow_request(workflow_request_id) routing_key = f"ws-workflow.carta-instance-ready.{workflow_request_id}" carta_url_msg = ArchiveMessageArchitect( @@ -215,17 +209,15 @@ class WorkflowService(WorkflowServiceIF): ).compose_message("carta_ready") self.archive_messenger.send_message(**carta_url_msg) - def send_carta_url_to_ws(self, workflow_request_id: int, carta_url: str): + def send_carta_url(self, workflow_request_id: int, carta_url: str): """ - Hit REST endpoint with CARTA URL from CARTA envoy. + Send AMQP message with CARTA URL to Workspaces. :param workflow_request_id: ID of CARTA workflow that wants to send this message :param carta_url: JSON blob containing CARTA URL :return: """ - logger.info( - f"SENDING CARTA MESSAGE to Workspaces for request #{workflow_request_id}!" - ) + logger.info(f"SENDING CARTA MESSAGE to Workspaces for request #{workflow_request_id}!") wf_request = self.info.lookup_workflow_request(workflow_request_id) carta_url_msg = WorkflowMessageArchitect( request=wf_request, carta_url=carta_url @@ -270,9 +262,7 @@ class WorkflowService(WorkflowServiceIF): # render all the templates if request.argument["need_project_metadata"] is True: self.determine_multiple_productfetch(request) - templated_files = self._render_with_metadata( - request, temp_folder, definition - ) + templated_files = self._render_with_metadata(request, temp_folder, definition) # catch aat_wrest failure and abort workflow if isinstance(templated_files, WorkflowRequest): return request @@ -321,13 +311,19 @@ class WorkflowService(WorkflowServiceIF): return temp_folder def determine_multiple_productfetch(self, request: WorkflowRequestIF): + """ + Are we fetching more than one product? + + :param request: the workflow request + :return: + """ if "need_data" in request.argument: if request.argument["need_data"] is True: spl_list = request.argument["product_locator"].split(",") fetcher_string = " --product-locator" - request.argument[ - "product_locator" - ] = fetcher_string + fetcher_string.join(" " + spl for spl in spl_list) + request.argument["product_locator"] = fetcher_string + fetcher_string.join( + " " + spl for spl in spl_list + ) request.argument["ramInGb"] = self.processing_settings.ramInGb def _render_with_metadata( @@ -363,9 +359,7 @@ class WorkflowService(WorkflowServiceIF): argument = eb argument2 = [] else: - logger.info( - f"No wrester found for workflow {name}. Does it actually require metadata?" - ) + logger.info(f"No wrester found for workflow {name}. Does it actually require metadata?") return wf_request logger.info(f" workflow {name} has wrest option: {wrest_type}") @@ -387,9 +381,9 @@ class WorkflowService(WorkflowServiceIF): else: logger.error(wf_json.decode()) logger.info("SENDING WORKFLOW FAIL MESSAGE!") - failed_msg = WorkflowMessageArchitect( - request=wf_request - ).compose_message("workflow_failed") + failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message( + "workflow_failed" + ) self.messenger.send_message(**failed_msg) return wf_request @@ -406,10 +400,7 @@ class WorkflowService(WorkflowServiceIF): usable_templates.append(template) # check if a newer version was supplied and is not in the list of restricted # access templates - if ( - template.filename in request.files - and template.filename in forbidden_templates - ): + if template.filename in request.files and template.filename in forbidden_templates: usable_templates.append(template) logger.info( f"Cannot append user file {request.files[template.filename]} to " @@ -423,9 +414,7 @@ class WorkflowService(WorkflowServiceIF): usable_files.append(file) for file in usable_templates: - self.info.save_file( - request=request, filename=file.filename, content=file.content - ) + self.info.save_file(request=request, filename=file.filename, content=file.content) return usable_templates + usable_files @@ -519,13 +508,13 @@ class WorkflowService(WorkflowServiceIF): def announce_qa_ready(self, wf_request_id): wf_request = self.info.lookup_workflow_request(wf_request_id) logger.info("ANNOUNCING QA READY!") - qa_ready_msg = WorkflowMessageArchitect(request=wf_request).compose_message( - "qa_ready" - ) + qa_ready_msg = WorkflowMessageArchitect(request=wf_request).compose_message("qa_ready") self.messenger.send_message(**qa_ready_msg) class WorkflowMessageHandler: + """Handles workflow messages. No, really!""" + def __init__(self, info: WorkflowInfoIF): self.info = info self.archive_router = Router("archive") @@ -544,9 +533,7 @@ class WorkflowMessageHandler: """ logger.info("RECEIVED WORKFLOW COMPLETE MESSAGE. SENDING DELIVERY MESSAGE.") # look up the workflow request to get the path - wf_request = self.info.lookup_workflow_request( - message["subject"]["workflow_request_id"] - ) + wf_request = self.info.lookup_workflow_request(message["subject"]["workflow_request_id"]) # find the path to the delivery.json file delivery_file = pathlib.Path(wf_request.results_dir) / "delivery.json" @@ -627,9 +614,7 @@ class WorkflowMessageHandler: self.clean_remote_workflow(request) self.clean_workflow(request) - elif ( - message["type"] == "delivery" or message["type"] == "ingestion-complete" - ): + elif message["type"] == "delivery" or message["type"] == "ingestion-complete": status = WorkflowRequestState.Complete.name else: status = "Unknown" @@ -650,9 +635,7 @@ class WorkflowMessageHandler: f"{request.workflow_request_id} to {status}: {exc}" ) else: - logger.warning( - f"Message {message} does not concern a workflow request. Ignoring." - ) + logger.warning(f"Message {message} does not concern a workflow request. Ignoring.") def send_archive_failed_event(self, **message: Dict): subject = message["subject"] @@ -672,9 +655,7 @@ class WorkflowMessageHandler: ).compose_message("carta_failed") self.archive_messenger.send_message(**carta_url_msg) if wf_request.workflow_name == ArchiveWorkflows.SECI.value: - logger.info( - f"SENDING FAILED SECI MESSAGE to VLASS Manager for request #{wf_id}!" - ) + logger.info(f"SENDING FAILED SECI MESSAGE to VLASS Manager for request #{wf_id}!") routing_key = f"ws-workflow.seci.{wf_id}" seci_msg = ArchiveMessageArchitect( routing_key=routing_key, request=wf_request @@ -687,9 +668,7 @@ class WorkflowMessageHandler: wf_request = self.info.lookup_workflow_request(wf_id) if wf_request.workflow_name == ArchiveWorkflows.SECI.value: - logger.info( - f"SENDING SECI COMPLETE MESSAGE to VLASS Manager for request #{wf_id}!" - ) + logger.info(f"SENDING SECI COMPLETE MESSAGE to VLASS Manager for request #{wf_id}!") routing_key = f"ws-workflow.seci.{wf_id}" seci_msg = ArchiveMessageArchitect( routing_key=routing_key, request=wf_request @@ -706,5 +685,5 @@ class WorkflowMessageHandler: def clean_workflow(self, request: WorkflowRequestIF): if request.htcondor_job_id is not None: # Request has a set condor_job_id; unset it - logger.info(f"Workflow finished. Unsetting HTCondor ID.") + logger.info("Workflow finished. Unsetting HTCondor ID.") request.htcondor_job_id = None