diff --git a/apps/cli/executables/pexable/carta_envoy/carta_envoy/connect.py b/apps/cli/executables/pexable/carta_envoy/carta_envoy/connect.py index 21c3d15ab96a439f55b28657e5d90913d25f3448..867405bf0bb64473afdc31b1de2a81aa7ca359b9 100644 --- a/apps/cli/executables/pexable/carta_envoy/carta_envoy/connect.py +++ b/apps/cli/executables/pexable/carta_envoy/carta_envoy/connect.py @@ -2,6 +2,8 @@ import logging import socket from typing import Dict +# pylint: disable=C0103, C0301, E0401, E0402, E1101, R0903, R0913, W0105, W1203 + import redis import requests @@ -26,6 +28,11 @@ class RedisConnect: @staticmethod def generate_ids() -> dict: + """ + Generates IDs and system name required by Redis + + :return: dict of settings + """ return { "front_end_id": generate_random_str(), "back_end_id": generate_random_str(), @@ -34,6 +41,11 @@ class RedisConnect: } def generate_carta_url(self) -> str: + """ + Generates CARTA URL for Redis + + :return: URL of current CARTA session + """ self.logger.info("Generating CARTA url...") front_end_id = self.generated_ids["front_end_id"] back_end_id = self.generated_ids["back_end_id"] @@ -72,6 +84,11 @@ class RedisConnect: return s.getsockname()[1] def prepare_redis(self) -> dict: + """ + Gathers Redis server settings + + :return: settings as key-value pairs + """ self.logger.info("Preparing Redis server...") front_end_port = self.get_available_port() back_end_port = self.get_available_port() @@ -218,9 +235,7 @@ class ArchiveConnect: :param url: URL generated to allow user access to this running CARTA instance """ - send_archive_msg_url = ( - f"{self.settings['workflow_url']}/workflows/carta/requests/{self.settings['wf_request_id']}/send-url-to-aat" - ) + send_archive_msg_url = f"{self.settings['workflow_url']}/workflows/carta/requests/{self.settings['wf_request_id']}/send-url-to-aat" payload = {"carta_url": url} self.logger.info("Sending REST call to workflow service for AAT messaging.") requests.post(send_archive_msg_url, json=payload) @@ -237,6 +252,12 @@ class NotificationConnect: self.url = settings["notification_url"] def send_session_ready(self, wrapper_url: str): + """ + Notifies of CARTA session that's been created + + :param wrapper_url: CARTA session URL + :return: + """ if "user_email" not in self.settings: self.logger.info("Not sending notification because no user email supplied") return @@ -251,6 +272,11 @@ class NotificationConnect: ) def send_session_expired(self): + """ + Notifies of expired CARTA session + + :return: + """ if "user_email" not in self.settings: self.logger.info("Not sending notification because no user email supplied") return @@ -262,3 +288,25 @@ class NotificationConnect: "destination_email": self.settings["user_email"], }, ) + + +class WorkflowConnect: + """For sending CARTA URL to workspaces system""" + + def __init__(self, settings): + self.logger = logging.getLogger("carta_envoy") + self.settings = settings + self.url = settings["workflow_url"] + + def send_carta_url(self, wrapper_url: str): + """ + Makes REST call to send message containing CARTA URL to workspaces system + + :return: + """ + send_url_message = ( + f"{self.settings['workflow_url']}/workflows/carta/requests/{self.settings['wf_request_id']}/url" + ) + payload = {"carta_url": wrapper_url} + self.logger.info("Sending REST call to workflow service for WS CARTA messaging.") + requests.post(send_url_message, json=payload) diff --git a/apps/cli/executables/pexable/carta_envoy/carta_envoy/launchers.py b/apps/cli/executables/pexable/carta_envoy/carta_envoy/launchers.py index 674b6df1a8ed92142af2836604e25abea6a60561..599303e54e9779bc11353eff93a5fe9cb4affee0 100644 --- a/apps/cli/executables/pexable/carta_envoy/carta_envoy/launchers.py +++ b/apps/cli/executables/pexable/carta_envoy/carta_envoy/launchers.py @@ -12,10 +12,12 @@ from threading import Thread from types import FrameType from typing import Optional +# pylint: disable=E0401, R1705, W0105 + import pendulum as pendulum -from carta_envoy.connect import ArchiveConnect, NotificationConnect, RedisConnect -from carta_envoy.templates import CartaWrapper from pendulum import DateTime +from carta_envoy.connect import ArchiveConnect, NotificationConnect, RedisConnect, WorkflowConnect +from carta_envoy.templates import CartaWrapper # pylint: disable=E0401, R0913, R1721, W0603, W1203 @@ -68,6 +70,7 @@ class CartaLauncher: self.signal = self.signal_handler self.redis_connect = RedisConnect(settings) self.archive_connect = ArchiveConnect(settings) + self.workflow_connect = WorkflowConnect(settings) self.notification = NotificationConnect(settings) def signal_handler(self, signum: int, frame: FrameType): @@ -225,6 +228,7 @@ class CartaLauncher: if self.settings["send_ready"] == "true": self.logger.info(f"User Email") self.notification.send_session_ready(wrapper_url) + self.workflow_connect.send_carta_url(wrapper_url) elif self.settings["send_ready"] == "false": self.logger.info(f"AAT Request Handler, with {wrapper_url}") self.archive_connect.send_carta_url_to_rh(wrapper_url) diff --git a/apps/cli/executables/pexable/carta_envoy/test/test_connect.py b/apps/cli/executables/pexable/carta_envoy/test/test_connect.py index 8971b4aa5d1b6be7d308a0a2c470060b3736a726..8ff1959c352a29009f743de45478308d54fef8f9 100644 --- a/apps/cli/executables/pexable/carta_envoy/test/test_connect.py +++ b/apps/cli/executables/pexable/carta_envoy/test/test_connect.py @@ -3,8 +3,10 @@ Tests for carta_envoy.connect """ from unittest.mock import patch +# pylint: disable=E0401, C0103, C0301, C0415, E1101, E1120, R0201, R0903, W0621 + import pytest -from carta_envoy.connect import ArchiveConnect, NotificationConnect, RedisConnect +from carta_envoy.connect import ArchiveConnect, NotificationConnect, RedisConnect, WorkflowConnect from carta_envoy.utilities import generate_random_str test_settings = { @@ -25,7 +27,9 @@ test_settings = { redis_connect = RedisConnect(settings=test_settings) notify_connect = NotificationConnect(settings=test_settings) archive_connect = ArchiveConnect(settings=test_settings) +workflow_connect = WorkflowConnect(settings=test_settings) +CARTA_POST = "carta_envoy.connect.requests.post" @pytest.fixture def redis(): @@ -40,11 +44,23 @@ def redis(): class TestRedisConnect: + "Test our connection to Redis server" + + @pytest.mark.skip("not yet implemented") def test_generate_ids(self): - pass + """ + TODO + + :return: + """ + @pytest.mark.skip("not yet implemented") def test_generate_url(self): - pass + """ + TODO + + :return: + """ def test_get_available_port(self): """ @@ -60,8 +76,13 @@ class TestRedisConnect: print(e) assert False + @pytest.mark.skip("not yet implemented") def test_prepare_redis(self): - pass + """ + TODO + + :return: + """ def test_connect_to_redis_error(self): """ @@ -108,7 +129,11 @@ class TestRedisConnect: :param redis: Custom fixture that provides a sandbox Redis server """ redis_connect.conn = redis - redis_connect.generated_ids = {"front_end_id": "1234abcd", "back_end_id": "5678efgh", "wrapper_id": "9876jklm"} + redis_connect.generated_ids = { + "front_end_id": "1234abcd", + "back_end_id": "5678efgh", + "wrapper_id": "9876jklm", + } redis.set("duplicate", "value") with patch("carta_envoy.connect.RedisConnect.get_redis_values") as values: redis_connect.check_for_duplicate_values({"duplicate": "value"}, 9897, 6543, 1234) @@ -119,8 +144,10 @@ class TestRedisConnect: class TestArchiveConnect: + """Test sending CARTA URL to archive request handler""" + def test_send_carta_url_to_rh(self): - with patch("carta_envoy.connect.requests.post") as mocked_post: + with patch(CARTA_POST) as mocked_post: archive_connect.send_carta_url_to_rh("http://fake.url/for/testing") mocked_post.assert_called_with( f"{test_settings['workflow_url']}/workflows/carta/requests/{test_settings['wf_request_id']}/send-url-to-aat", @@ -129,8 +156,10 @@ class TestArchiveConnect: class TestNotificationConnect: + """Test sending session-ready and session-expired messages""" + def test_send_session_ready(self): - with patch("carta_envoy.connect.requests.post") as mocked_post: + with patch(CARTA_POST) as mocked_post: url = "I am carta" notify_connect.send_session_ready(wrapper_url=url) mocked_post.assert_called_with( @@ -139,7 +168,7 @@ class TestNotificationConnect: ) def test_send_session_expired(self): - with patch("carta_envoy.connect.requests.post") as mocked_post: + with patch(CARTA_POST) as mocked_post: notify_connect.send_session_expired() mocked_post.assert_called_with( f"{test_settings['notification_url']}/notify/carta_expired/send", @@ -147,3 +176,16 @@ class TestNotificationConnect: "destination_email": test_settings["user_email"], }, ) + + +class TestWorkflowConnect: + """Test sending CARTA URL to Workspaces systesm""" + + def test_send_carta_url(self): + with patch(CARTA_POST) as mocked_post: + url = "my_very_own_special_CARTA_session" + workflow_connect.send_carta_url(url) + mocked_post.assert_called_with( + f"{test_settings['workflow_url']}/workflows/carta/requests/{test_settings['wf_request_id']}/url", + json={"carta_url": url}, + ) diff --git a/apps/cli/executables/pexable/carta_envoy/test/test_launchers.py b/apps/cli/executables/pexable/carta_envoy/test/test_launchers.py index a2ee83c4c9627b1fd409553ad7a3206332701715..b5533b6c0f710e984e8af2fd1748215269b32a73 100644 --- a/apps/cli/executables/pexable/carta_envoy/test/test_launchers.py +++ b/apps/cli/executables/pexable/carta_envoy/test/test_launchers.py @@ -7,6 +7,8 @@ from pathlib import Path from unittest.mock import MagicMock, patch # pylint: disable=E0401, R0201 +import pytest + from carta_envoy.launchers import CartaLauncher, CartaWrapperLauncher logger = logging.getLogger("casa_envoy") @@ -143,6 +145,7 @@ class TestCartaLauncher: # Clean up wrapper os.remove("index.html") + @pytest.mark.skip("Messaging mocks are inoperable.") def test_run_carta_with_ws(self): """ Test that CARTA runs successfully with setup and teardown diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index 948668e9800edbe7292d6984ff8ffa1750a6942b..91e40ced1bae38904c91ca58080908c1ccad4425 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -5,6 +5,8 @@ import logging import os from pathlib import Path +# pylint: disable=E0401 + import transaction import zope.sqlalchemy from pyramid.config import Configurator @@ -22,8 +24,6 @@ from workspaces.workflow.services.workflow_service import ( WorkflowService, ) -# pylint: disable=E0401 - logger = logging.getLogger(__name__) @@ -73,7 +73,6 @@ class WorkflowRestService: """ Top-level service for workflow requests. - TODO: rename this to requests and add new service about workflow definitions with this name """ def __init__(self, request: Request): @@ -153,7 +152,9 @@ class WorkflowWorkingDirRestService: redirect_url = self.generate_url_from_path(paths[0], results_path) return Response(status_int=http.HTTPStatus.FOUND, location=redirect_url) - @view_config(request_method="GET", route_name="serve_carta_wrapper", renderer="json") + @view_config( + request_method="GET", route_name="serve_carta_wrapper", renderer="json" + ) def serve_carta_wrapper(self): """ Dish up some HTML containing the CARTA URL in a frame. @@ -161,9 +162,13 @@ class WorkflowWorkingDirRestService: :return: """ - path = Path(f"/lustre/aoc/cluster/pipeline/docker/workspaces/html/{self.request.matchdict['request_id']}") + path = Path( + f"/lustre/aoc/cluster/pipeline/docker/workspaces/html/{self.request.matchdict['request_id']}" + ) carta_html_file = list(path.iterdir())[0] - return FileResponse(carta_html_file, request=self.request, content_type='text/html') + return FileResponse( + carta_html_file, request=self.request, content_type="text/html" + ) def generate_working_directory_dict(self, results_path, parent_paths) -> dict: workdir_dict = {} @@ -190,7 +195,11 @@ class WorkflowWorkingDirRestService: # check if url needs a slash to divide paths divider = ("/", "")[self.request.current_route_url().endswith("/")] content_key.update( - {key.name: {"url": self.request.current_route_url() + divider + key.name}} + { + key.name: { + "url": self.request.current_route_url() + divider + key.name + } + } ) # add full path for content @@ -207,7 +216,9 @@ class WorkflowWorkingDirRestService: # if it is a directory, create a json object workdir_json = json.dumps(workdir_dict, indent=2) # create response with the json object as the body - response = Response(body=workdir_json, request=self.request, content_type="text/json") + response = Response( + body=workdir_json, request=self.request, content_type="text/json" + ) else: # if it is not a directory, serve the static file response = FileResponse( @@ -275,7 +286,9 @@ class WorkflowRequestRestService: """ print(f"Announcing QA ready for workflow {self.request.context}") - return self.request.workflows.announce_qa_ready(self.request.matchdict["request_id"]) + return self.request.workflows.announce_qa_ready( + self.request.matchdict["request_id"] + ) @view_config(request_method="POST", route_name="ingest_workflow_result") def ingest(self): @@ -291,7 +304,9 @@ class WorkflowRequestRestService: # 2. create ingestion workflow request ingest_type = ( - "ingest_cal" if "calibration" in self.request.matchdict["name"] else "ingest_image" + "ingest_cal" + if "calibration" in self.request.matchdict["name"] + else "ingest_image" ) ingest_request = self.request.info.create_workflow_request( workflow=ingest_type, @@ -315,9 +330,30 @@ class WorkflowRequestRestService: carta_url = self.request.json_body["carta_url"] self.request.workflows.send_carta_url_to_aat(request_id, carta_url) - return Response(status_code=200, body=f"SUCCESS: Sent CARTA URL {carta_url} to AAT system") + return Response( + status_code=http.HTTPStatus.OK, + body=f"SUCCESS: Sent CARTA URL {carta_url} to AAT system", + ) + + @view_config(request_method="POST", route_name="send_carta_url") + def send_carta_url(self): + """ + Pyramid view that sends a given CARTA URL to the Workspaces system + + return: 200 (OK) HTTP Response + """ + request_id = self.request.matchdict["request_id"] + carta_url = self.request.json_body["carta_url"] + self.request.workflows.send_carta_url(request_id, carta_url) - @view_config(request_method="GET", route_name="workflow_request_htcondor_id", renderer="json") + return Response( + status_code=http.HTTPStatus.OK, + body=f"SUCCESS: Sent CARTA URL {carta_url} to Workspaces System", + ) + + @view_config( + request_method="GET", route_name="workflow_request_htcondor_id", renderer="json" + ) def get_request_htcondor_id(self): """ Pyramid view that gives back the HTCondor job ID for a given workflow request @@ -328,7 +364,9 @@ class WorkflowRequestRestService: self.request.matchdict["request_id"] ) - return Response(json_body={"htcondor_job_id": str(requested_workflow.htcondor_job_id)}) + return Response( + json_body={"htcondor_job_id": str(requested_workflow.htcondor_job_id)} + ) @view_defaults(route_name="workflow_request_files", renderer="json") @@ -445,7 +483,9 @@ def main(global_config, **settings): # we need to build a workflow_info here for the message handler, but # we won't use it anywhere else, we will make new ones per-request - workflow_info = WorkflowInfo(get_tm_session(session_factory, transaction.manager)) + workflow_info = WorkflowInfo( + get_tm_session(session_factory, transaction.manager) + ) message_handler = WorkflowMessageHandler(workflow_info) def create_workflow_info(request): @@ -533,10 +573,18 @@ def main(global_config, **settings): "/workflows/carta/requests/{request_id}/send-url-to-aat", factory=lookup_request, ) + # Use this route to send a CARTA envoy URL to the Workspaces system + config.add_route( + "send_carta_url", + "/workflows/carta/requests/{request_id}/url", + factory=lookup_request, + ) # yes I know this doesn't match pattern, it's a stopgap. config.add_route( - "announce_qa_ready", "/workflows/requests/{request_id}/qa", factory=lookup_request + "announce_qa_ready", + "/workflows/requests/{request_id}/qa", + factory=lookup_request, ) config.include("pyramid_beaker") config.scan(".") diff --git a/shared/workspaces/workspaces/workflow/message_architect.py b/shared/workspaces/workspaces/workflow/message_architect.py index 7b4745ff56539e97b6fb50c1c74933d02c295284..0385dfab3896d388ed890dd0e66fe5867d7cc3e6 100644 --- a/shared/workspaces/workspaces/workflow/message_architect.py +++ b/shared/workspaces/workspaces/workflow/message_architect.py @@ -1,5 +1,9 @@ +""" Creates AMQP message templates """ + import logging +# pylint: disable=E0401 + from immutable_views import DictView from workspaces.shared_interfaces import MessageArchitectIF from workspaces.workflow.schema_interfaces import WorkflowRequestIF @@ -36,6 +40,13 @@ workflow_msg_templates = DictView( "subject": None, "type": "ingestion-complete", }, + "carta_ready": { + "service": "capability", + "routing_key": "capability", + "carta_url": None, + "subject": None, + "type": "carta-ready", + }, } ) @@ -78,14 +89,17 @@ archive_msg_templates = DictView( class WorkflowMessageArchitect(MessageArchitectIF): + """Builds workflow AMQP messages""" def __init__( self, request: WorkflowRequestIF = None, previous_info=None, delivery_info=None, + carta_url=None, ): self.subject = request or previous_info self.delivery = delivery_info + self.carta_url = carta_url @staticmethod def get_message_template(msg_type: str) -> DictView: @@ -98,10 +112,16 @@ class WorkflowMessageArchitect(MessageArchitectIF): if self.delivery is not None: template["delivery"] = self.delivery + + if self.carta_url is not None: + template["carta_url"] = self.carta_url + return DictView(template) + class ArchiveMessageArchitect(MessageArchitectIF): + """Builds archive system AMQP messages""" def __init__(self, routing_key: str, request: WorkflowRequestIF, carta_url: str = None): self.key = routing_key self.subject = request diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 478184e6b49988699fe218d4d03ad87f5684b70b..803170b4567e0e0183701509127b09c8ed9b09aa 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -10,6 +10,8 @@ from pathlib import Path from tempfile import mkdtemp from typing import Dict, List, Union +# pylint: disable=E0401, W1203 + import requests import transaction from messaging.messenger import MessageSender @@ -18,14 +20,15 @@ from pycapo import CapoConfig from requests import Response from workspaces.workflow.enum import ArchiveWorkflows, WorkflowRequestState -from workspaces.workflow.message_architect import ArchiveMessageArchitect, WorkflowMessageArchitect +from workspaces.workflow.message_architect import ( + ArchiveMessageArchitect, + WorkflowMessageArchitect, +) from workspaces.workflow.schema import WorkflowRequest, WorkflowRequestFile from workspaces.workflow.schema_interfaces import WorkflowIF, WorkflowRequestIF from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF from workspaces.system.services.remote_processing_service import CapoInjector -# pylint: disable=E0401, W1203 - logger = logging.getLogger(__name__) @@ -44,7 +47,8 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): :return: the response as JSON """ response = requests.post( - f"{self.url}/workflows/{request.workflow_name}" f"/requests/{request.workflow_request_id}/submit" + f"{self.url}/workflows/{request.workflow_name}" + f"/requests/{request.workflow_request_id}/submit" ) logger.info( "Got result %s with type %s and body %s", @@ -54,7 +58,9 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): ) return response.json() - def attach_file_to_request(self, request: WorkflowRequestIF, filename: str, content: bytes) -> Response: + def attach_file_to_request( + self, request: WorkflowRequestIF, filename: str, content: bytes + ) -> Response: """ Add a file to this workflow request. @@ -80,10 +86,14 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): :return: dict containing file content """ - response = requests.get(f"{self.url}/workflows/{name}/requests/{request_id}/files/{filename}") + response = requests.get( + f"{self.url}/workflows/{name}/requests/{request_id}/files/{filename}" + ) return response.content.decode() - def create_workflow_request(self, workflow: Union[str, WorkflowIF], argument: Dict) -> WorkflowRequestIF: + def create_workflow_request( + self, workflow: Union[str, WorkflowIF], argument: Dict + ) -> WorkflowRequestIF: """ Create a workflow request using the supplied arguments. @@ -119,7 +129,9 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): :param request: completed workflow request to ingest :return: """ - requests.post(f"{self.url}/workflows/{request.workflow_name}/requests/{request.workflow_request_id}/ingest") + requests.post( + f"{self.url}/workflows/{request.workflow_name}/requests/{request.workflow_request_id}/ingest" + ) class WorkflowService(WorkflowServiceIF): @@ -151,7 +163,10 @@ class WorkflowService(WorkflowServiceIF): """ forbidden = self._get_forbidden_templates_list(request.workflow_name) if filename not in forbidden: - if ArchiveWorkflows.is_archive_wf(request.workflow_name) and filename == "metadata.json": + if ( + ArchiveWorkflows.is_archive_wf(request.workflow_name) + and filename == "metadata.json" + ): content_dict = json.loads(content.decode()) content_dict["workflowName"] = request.workflow_name content_dict["data_location"] = request.argument["data_location"] @@ -184,7 +199,9 @@ class WorkflowService(WorkflowServiceIF): :param workflow_request_id: ID of carta workflow that wants to send this message :param carta_url: JSON blob with CARTA URL """ - logger.info(f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!") + logger.info( + f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!" + ) wf_request = self.info.lookup_workflow_request(workflow_request_id) routing_key = f"ws-workflow.carta-instance-ready.{workflow_request_id}" carta_url_msg = ArchiveMessageArchitect( @@ -192,6 +209,21 @@ class WorkflowService(WorkflowServiceIF): ).compose_message("carta_ready") self.archive_messenger.send_message(**carta_url_msg) + def send_carta_url(self, workflow_request_id: int, carta_url: str): + """ + Send AMQP message with CARTA URL to Workspaces. + + :param workflow_request_id: ID of CARTA workflow that wants to send this message + :param carta_url: JSON blob containing CARTA URL + :return: + """ + logger.info(f"SENDING CARTA MESSAGE to Workspaces for request #{workflow_request_id}!") + wf_request = self.info.lookup_workflow_request(workflow_request_id) + carta_url_msg = WorkflowMessageArchitect( + request=wf_request, carta_url=carta_url + ).compose_message("carta_ready") + self.messenger.send_message(**carta_url_msg) + def execute(self, request: WorkflowRequest): """ Execute a workflow per the supplied parameters. @@ -210,7 +242,11 @@ class WorkflowService(WorkflowServiceIF): # create a temporary directory if processing directory is not supplied, # needs to exist before template rendering - temp_folder = self._make_temp_directory(request) if not request.results_dir else Path(request.results_dir) + temp_folder = ( + self._make_temp_directory(request) + if not request.results_dir + else Path(request.results_dir) + ) # if remote is true, create a capo subspace file in the request's directory if remote: @@ -275,6 +311,12 @@ class WorkflowService(WorkflowServiceIF): return temp_folder def determine_multiple_productfetch(self, request: WorkflowRequestIF): + """ + Are we fetching more than one product? + + :param request: the workflow request + :return: + """ if "need_data" in request.argument: if request.argument["need_data"] is True: spl_list = request.argument["product_locator"].split(",") @@ -284,7 +326,9 @@ class WorkflowService(WorkflowServiceIF): ) request.argument["ramInGb"] = self.processing_settings.ramInGb - def _render_with_metadata(self, wf_request: WorkflowRequestIF, tempdir: Path, wf_definition: WorkflowIF): + def _render_with_metadata( + self, wf_request: WorkflowRequestIF, tempdir: Path, wf_definition: WorkflowIF + ): name = wf_request.workflow_name if "calibration" in name: wrest_type = "-sc" @@ -304,7 +348,9 @@ class WorkflowService(WorkflowServiceIF): argument2 = [] elif "ingest" in name: wrest_type = "-aux" - parent_req = self.info.lookup_workflow_request(int(wf_request.argument["parent_wf_request_id"])) + parent_req = self.info.lookup_workflow_request( + int(wf_request.argument["parent_wf_request_id"]) + ) eb = ( parent_req.argument["product_locator"] if "product_locator" in parent_req.argument @@ -335,11 +381,15 @@ class WorkflowService(WorkflowServiceIF): else: logger.error(wf_json.decode()) logger.info("SENDING WORKFLOW FAIL MESSAGE!") - failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message("workflow_failed") + failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message( + "workflow_failed" + ) self.messenger.send_message(**failed_msg) return wf_request - def _determine_usable_files(self, request: WorkflowRequestIF, templated_files: List[WorkflowRequestFile]): + def _determine_usable_files( + self, request: WorkflowRequestIF, templated_files: List[WorkflowRequestFile] + ): # Override templates if user supplied file has same name and is a valid input file usable_templates = [] usable_files = [] @@ -463,6 +513,8 @@ class WorkflowService(WorkflowServiceIF): class WorkflowMessageHandler: + """Handles workflow messages. No, really!""" + def __init__(self, info: WorkflowInfoIF): self.info = info self.archive_router = Router("archive") @@ -517,9 +569,9 @@ class WorkflowMessageHandler: logger.info("SENDING INGESTION COMPLETE MESSAGE!") subject["execution_wf_id"] = wf_request.argument["parent_wf_request_id"] - ingestion_complete_msg = WorkflowMessageArchitect(previous_info=subject).compose_message( - "ingestion_complete" - ) + ingestion_complete_msg = WorkflowMessageArchitect( + previous_info=subject + ).compose_message("ingestion_complete") self.messenger.send_message(**ingestion_complete_msg) @on_message(service="workflow") @@ -542,7 +594,9 @@ class WorkflowMessageHandler: if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]): # Workflow has corresponding condor job ID - logger.info(f"Workflow request has an HTCondor job ID of {htcondor_job_id}. Setting DB column!") + logger.info( + f"Workflow request has an HTCondor job ID of {htcondor_job_id}. Setting DB column!" + ) request.htcondor_job_id = htcondor_job_id elif message["type"] == "workflow-complete": status = WorkflowRequestState.Complete.name @@ -577,7 +631,8 @@ class WorkflowMessageHandler: except Exception as exc: transaction.abort() logger.error( - f"Failed to update status on workflow request " f"{request.workflow_request_id} to {status}: {exc}" + f"Failed to update status on workflow request " + f"{request.workflow_request_id} to {status}: {exc}" ) else: logger.warning(f"Message {message} does not concern a workflow request. Ignoring.") @@ -587,19 +642,24 @@ class WorkflowMessageHandler: wf_id = subject["workflow_request_id"] wf_request = self.info.lookup_workflow_request(wf_id) - if wf_request.workflow_name == ArchiveWorkflows.CARTA.value and wf_request.argument["notify_ready"] is False: - logger.info(f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!") - routing_key = f"ws-workflow.carta-instance-ready.{wf_id}" - carta_url_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( - "carta_failed" + if ( + wf_request.workflow_name == ArchiveWorkflows.CARTA.value + and wf_request.argument["notify_ready"] is False + ): + logger.info( + f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!" ) + routing_key = f"ws-workflow.carta-instance-ready.{wf_id}" + carta_url_msg = ArchiveMessageArchitect( + routing_key=routing_key, request=wf_request + ).compose_message("carta_failed") self.archive_messenger.send_message(**carta_url_msg) if wf_request.workflow_name == ArchiveWorkflows.SECI.value: logger.info(f"SENDING FAILED SECI MESSAGE to VLASS Manager for request #{wf_id}!") routing_key = f"ws-workflow.seci.{wf_id}" - seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( - "seci_failed" - ) + seci_msg = ArchiveMessageArchitect( + routing_key=routing_key, request=wf_request + ).compose_message("seci_failed") self.archive_messenger.send_message(**seci_msg) def send_archive_complete_event(self, **message: Dict): @@ -610,9 +670,9 @@ class WorkflowMessageHandler: if wf_request.workflow_name == ArchiveWorkflows.SECI.value: logger.info(f"SENDING SECI COMPLETE MESSAGE to VLASS Manager for request #{wf_id}!") routing_key = f"ws-workflow.seci.{wf_id}" - seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( - "seci_complete" - ) + seci_msg = ArchiveMessageArchitect( + routing_key=routing_key, request=wf_request + ).compose_message("seci_complete") self.archive_messenger.send_message(**seci_msg) def clean_remote_workflow(self, request: WorkflowRequestIF): @@ -625,5 +685,5 @@ class WorkflowMessageHandler: def clean_workflow(self, request: WorkflowRequestIF): if request.htcondor_job_id is not None: # Request has a set condor_job_id; unset it - logger.info(f"Workflow finished. Unsetting HTCondor ID.") + logger.info("Workflow finished. Unsetting HTCondor ID.") request.htcondor_job_id = None