From 68001a3cf269b88070372a2796f643e72d087542 Mon Sep 17 00:00:00 2001 From: "Janet L. Goldstein" <jgoldste@nrao.edu> Date: Tue, 14 Sep 2021 11:11:18 -0600 Subject: [PATCH] WS-649: infrastructure for sending CARTA event info to Workspaces system --- .../carta_envoy/carta_envoy/connect.py | 76 +++++++- .../carta_envoy/carta_envoy/launchers.py | 8 +- services/workflow/workflow/server.py | 78 ++++++-- .../workspaces/workflow/message_architect.py | 20 ++ .../workflow/services/workflow_service.py | 175 +++++++++++++----- 5 files changed, 284 insertions(+), 73 deletions(-) diff --git a/apps/cli/executables/pexable/carta_envoy/carta_envoy/connect.py b/apps/cli/executables/pexable/carta_envoy/carta_envoy/connect.py index 21c3d15ab..fecbba8b0 100644 --- a/apps/cli/executables/pexable/carta_envoy/carta_envoy/connect.py +++ b/apps/cli/executables/pexable/carta_envoy/carta_envoy/connect.py @@ -2,6 +2,8 @@ import logging import socket from typing import Dict +# pylint: disable=C0103, C0301, E0401, E0402, E1101, R0903, R0913, W0105, W1203 + import redis import requests @@ -26,6 +28,11 @@ class RedisConnect: @staticmethod def generate_ids() -> dict: + """ + Generates IDs and system name required by Redis + + :return: dict of settings + """ return { "front_end_id": generate_random_str(), "back_end_id": generate_random_str(), @@ -34,11 +41,18 @@ class RedisConnect: } def generate_carta_url(self) -> str: + """ + Generates CARTA URL for Redis + + :return: URL of current CARTA session + """ self.logger.info("Generating CARTA url...") front_end_id = self.generated_ids["front_end_id"] back_end_id = self.generated_ids["back_end_id"] proxy = self.settings["reverse_proxy"] - carta_url = f"https://{proxy}/{front_end_id}/?socketUrl=wss://{proxy}/{back_end_id}/" + carta_url = ( + f"https://{proxy}/{front_end_id}/?socketUrl=wss://{proxy}/{back_end_id}/" + ) if self.settings["single_image"]: carta_url = carta_url + "&file=" + self.settings["image_name"] @@ -72,6 +86,11 @@ class RedisConnect: return s.getsockname()[1] def prepare_redis(self) -> dict: + """ + Gathers Redis server settings + + :return: settings as key-value pairs + """ self.logger.info("Preparing Redis server...") front_end_port = self.get_available_port() back_end_port = self.get_available_port() @@ -162,15 +181,21 @@ class RedisConnect: f"traefik/http/routers/{carta_wrapper}/middlewares/0": "stripPrefixFE@file", } - unique_values = self.check_for_duplicate_values(values, front_end_port, back_end_port, wrapper_port) + unique_values = self.check_for_duplicate_values( + values, front_end_port, back_end_port, wrapper_port + ) self.redis_values = unique_values return unique_values - def check_for_duplicate_values(self, redis_values: dict, front_port: int, back_port: int, wrapper_port: int): + def check_for_duplicate_values( + self, redis_values: dict, front_port: int, back_port: int, wrapper_port: int + ): self.logger.info("Checking for duplicate values on server...") for key in redis_values: if self.conn.get(key): - self.logger.warning("WARNING: Redis value collision found. Generating new random IDs.") + self.logger.warning( + "WARNING: Redis value collision found. Generating new random IDs." + ) self.generated_ids = self.generate_ids() new_values = self.get_redis_values( self.settings["reverse_proxy"], @@ -192,7 +217,9 @@ class RedisConnect: :param redis_values: Dictionary of Redis entries :param timeout_minutes: Timeout of the valet in minutes """ - self.logger.info(f"Setting Redis values with a timeout of {timeout_minutes} minutes...") + self.logger.info( + f"Setting Redis values with a timeout of {timeout_minutes} minutes..." + ) for key, val in redis_values.items(): self.conn.setex(key, 60 * timeout_minutes + 60, val) @@ -218,9 +245,7 @@ class ArchiveConnect: :param url: URL generated to allow user access to this running CARTA instance """ - send_archive_msg_url = ( - f"{self.settings['workflow_url']}/workflows/carta/requests/{self.settings['wf_request_id']}/send-url-to-aat" - ) + send_archive_msg_url = f"{self.settings['workflow_url']}/workflows/carta/requests/{self.settings['wf_request_id']}/send-url-to-aat" payload = {"carta_url": url} self.logger.info("Sending REST call to workflow service for AAT messaging.") requests.post(send_archive_msg_url, json=payload) @@ -237,11 +262,19 @@ class NotificationConnect: self.url = settings["notification_url"] def send_session_ready(self, wrapper_url: str): + """ + Notifies of CARTA session that's been created + + :param wrapper_url: CARTA session URL + :return: + """ if "user_email" not in self.settings: self.logger.info("Not sending notification because no user email supplied") return - self.logger.info(f"Sending session ready notification with CARTA wrapper URL {wrapper_url}") + self.logger.info( + f"Sending session ready notification with CARTA wrapper URL {wrapper_url}" + ) requests.post( f"{self.url}/notify/carta_ready/send", json={ @@ -251,6 +284,10 @@ class NotificationConnect: ) def send_session_expired(self): + """ + Notifies of expired CARTA session + :return: + """ if "user_email" not in self.settings: self.logger.info("Not sending notification because no user email supplied") return @@ -262,3 +299,24 @@ class NotificationConnect: "destination_email": self.settings["user_email"], }, ) + + +class WorkflowConnect: + """For sending CARTA URL to workspaces system""" + + def __init__(self, settings): + self.logger = logging.getLogger("carta_envoy") + self.settings = settings + self.url = settings["workflow_url"] + + def send_url_message(self): + """ + Makes REST call to send message containing CARTA URL to workspaces system + :return: + """ + send_url_message = f"{self.settings['workflow_url']}/workflows/carta/requests/{self.settings['wf_request_id']}/send-url-to-ws" + payload = {"carta_url": self.url} + self.logger.info( + "Sending REST call to workflow service for WS CARTA messaging." + ) + requests.post(send_url_message, json=payload) diff --git a/apps/cli/executables/pexable/carta_envoy/carta_envoy/launchers.py b/apps/cli/executables/pexable/carta_envoy/carta_envoy/launchers.py index 674b6df1a..8538b3ef7 100644 --- a/apps/cli/executables/pexable/carta_envoy/carta_envoy/launchers.py +++ b/apps/cli/executables/pexable/carta_envoy/carta_envoy/launchers.py @@ -12,10 +12,12 @@ from threading import Thread from types import FrameType from typing import Optional +# pylint: disable=E0401, R1705, W0105 + import pendulum as pendulum -from carta_envoy.connect import ArchiveConnect, NotificationConnect, RedisConnect -from carta_envoy.templates import CartaWrapper from pendulum import DateTime +from carta_envoy.connect import ArchiveConnect, NotificationConnect, RedisConnect, WorkflowConnect +from carta_envoy.templates import CartaWrapper # pylint: disable=E0401, R0913, R1721, W0603, W1203 @@ -68,6 +70,7 @@ class CartaLauncher: self.signal = self.signal_handler self.redis_connect = RedisConnect(settings) self.archive_connect = ArchiveConnect(settings) + self.workflow_connect = WorkflowConnect(settings) self.notification = NotificationConnect(settings) def signal_handler(self, signum: int, frame: FrameType): @@ -225,6 +228,7 @@ class CartaLauncher: if self.settings["send_ready"] == "true": self.logger.info(f"User Email") self.notification.send_session_ready(wrapper_url) + self.workflow_connect.send_carta_url_to_rh(wrapper_url) elif self.settings["send_ready"] == "false": self.logger.info(f"AAT Request Handler, with {wrapper_url}") self.archive_connect.send_carta_url_to_rh(wrapper_url) diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index 948668e98..9ff9367fa 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -5,6 +5,8 @@ import logging import os from pathlib import Path +# pylint: disable=E0401 + import transaction import zope.sqlalchemy from pyramid.config import Configurator @@ -22,8 +24,6 @@ from workspaces.workflow.services.workflow_service import ( WorkflowService, ) -# pylint: disable=E0401 - logger = logging.getLogger(__name__) @@ -73,7 +73,6 @@ class WorkflowRestService: """ Top-level service for workflow requests. - TODO: rename this to requests and add new service about workflow definitions with this name """ def __init__(self, request: Request): @@ -153,7 +152,9 @@ class WorkflowWorkingDirRestService: redirect_url = self.generate_url_from_path(paths[0], results_path) return Response(status_int=http.HTTPStatus.FOUND, location=redirect_url) - @view_config(request_method="GET", route_name="serve_carta_wrapper", renderer="json") + @view_config( + request_method="GET", route_name="serve_carta_wrapper", renderer="json" + ) def serve_carta_wrapper(self): """ Dish up some HTML containing the CARTA URL in a frame. @@ -161,9 +162,13 @@ class WorkflowWorkingDirRestService: :return: """ - path = Path(f"/lustre/aoc/cluster/pipeline/docker/workspaces/html/{self.request.matchdict['request_id']}") + path = Path( + f"/lustre/aoc/cluster/pipeline/docker/workspaces/html/{self.request.matchdict['request_id']}" + ) carta_html_file = list(path.iterdir())[0] - return FileResponse(carta_html_file, request=self.request, content_type='text/html') + return FileResponse( + carta_html_file, request=self.request, content_type="text/html" + ) def generate_working_directory_dict(self, results_path, parent_paths) -> dict: workdir_dict = {} @@ -190,7 +195,11 @@ class WorkflowWorkingDirRestService: # check if url needs a slash to divide paths divider = ("/", "")[self.request.current_route_url().endswith("/")] content_key.update( - {key.name: {"url": self.request.current_route_url() + divider + key.name}} + { + key.name: { + "url": self.request.current_route_url() + divider + key.name + } + } ) # add full path for content @@ -207,7 +216,9 @@ class WorkflowWorkingDirRestService: # if it is a directory, create a json object workdir_json = json.dumps(workdir_dict, indent=2) # create response with the json object as the body - response = Response(body=workdir_json, request=self.request, content_type="text/json") + response = Response( + body=workdir_json, request=self.request, content_type="text/json" + ) else: # if it is not a directory, serve the static file response = FileResponse( @@ -275,7 +286,9 @@ class WorkflowRequestRestService: """ print(f"Announcing QA ready for workflow {self.request.context}") - return self.request.workflows.announce_qa_ready(self.request.matchdict["request_id"]) + return self.request.workflows.announce_qa_ready( + self.request.matchdict["request_id"] + ) @view_config(request_method="POST", route_name="ingest_workflow_result") def ingest(self): @@ -291,7 +304,9 @@ class WorkflowRequestRestService: # 2. create ingestion workflow request ingest_type = ( - "ingest_cal" if "calibration" in self.request.matchdict["name"] else "ingest_image" + "ingest_cal" + if "calibration" in self.request.matchdict["name"] + else "ingest_image" ) ingest_request = self.request.info.create_workflow_request( workflow=ingest_type, @@ -315,9 +330,30 @@ class WorkflowRequestRestService: carta_url = self.request.json_body["carta_url"] self.request.workflows.send_carta_url_to_aat(request_id, carta_url) - return Response(status_code=200, body=f"SUCCESS: Sent CARTA URL {carta_url} to AAT system") + return Response( + status_code=http.HTTPStatus.OK, + body=f"SUCCESS: Sent CARTA URL {carta_url} to AAT system", + ) + + @view_config(request_method="POST", route_name="send_carta_url_to_ws") + def send_carta_url_to_ws(self): + """ + Pyramid view that sends a given CARTA URL to the Workspaces system + + return: 200 (OK) HTTP Response + """ + request_id = self.request.matchdict["request_id"] + carta_url = self.request.json_body["carta_url"] + self.request.workflows.send_carta_url_to_ws(request_id, carta_url) - @view_config(request_method="GET", route_name="workflow_request_htcondor_id", renderer="json") + return Response( + status_code=http.HTTPStatus.OK, + body=f"SUCCESS: Sent CARTA URL {carta_url} to Workspaces System", + ) + + @view_config( + request_method="GET", route_name="workflow_request_htcondor_id", renderer="json" + ) def get_request_htcondor_id(self): """ Pyramid view that gives back the HTCondor job ID for a given workflow request @@ -328,7 +364,9 @@ class WorkflowRequestRestService: self.request.matchdict["request_id"] ) - return Response(json_body={"htcondor_job_id": str(requested_workflow.htcondor_job_id)}) + return Response( + json_body={"htcondor_job_id": str(requested_workflow.htcondor_job_id)} + ) @view_defaults(route_name="workflow_request_files", renderer="json") @@ -445,7 +483,9 @@ def main(global_config, **settings): # we need to build a workflow_info here for the message handler, but # we won't use it anywhere else, we will make new ones per-request - workflow_info = WorkflowInfo(get_tm_session(session_factory, transaction.manager)) + workflow_info = WorkflowInfo( + get_tm_session(session_factory, transaction.manager) + ) message_handler = WorkflowMessageHandler(workflow_info) def create_workflow_info(request): @@ -533,10 +573,18 @@ def main(global_config, **settings): "/workflows/carta/requests/{request_id}/send-url-to-aat", factory=lookup_request, ) + # Use this route to send a CARTA envoy URL to the Workspaces system + config.add_route( + "send_carta_url_to_ws", + "/workflows/carta/requests/{request_id}/send-url-to-ws", + factory=lookup_request, + ) # yes I know this doesn't match pattern, it's a stopgap. config.add_route( - "announce_qa_ready", "/workflows/requests/{request_id}/qa", factory=lookup_request + "announce_qa_ready", + "/workflows/requests/{request_id}/qa", + factory=lookup_request, ) config.include("pyramid_beaker") config.scan(".") diff --git a/shared/workspaces/workspaces/workflow/message_architect.py b/shared/workspaces/workspaces/workflow/message_architect.py index 7b4745ff5..0385dfab3 100644 --- a/shared/workspaces/workspaces/workflow/message_architect.py +++ b/shared/workspaces/workspaces/workflow/message_architect.py @@ -1,5 +1,9 @@ +""" Creates AMQP message templates """ + import logging +# pylint: disable=E0401 + from immutable_views import DictView from workspaces.shared_interfaces import MessageArchitectIF from workspaces.workflow.schema_interfaces import WorkflowRequestIF @@ -36,6 +40,13 @@ workflow_msg_templates = DictView( "subject": None, "type": "ingestion-complete", }, + "carta_ready": { + "service": "capability", + "routing_key": "capability", + "carta_url": None, + "subject": None, + "type": "carta-ready", + }, } ) @@ -78,14 +89,17 @@ archive_msg_templates = DictView( class WorkflowMessageArchitect(MessageArchitectIF): + """Builds workflow AMQP messages""" def __init__( self, request: WorkflowRequestIF = None, previous_info=None, delivery_info=None, + carta_url=None, ): self.subject = request or previous_info self.delivery = delivery_info + self.carta_url = carta_url @staticmethod def get_message_template(msg_type: str) -> DictView: @@ -98,10 +112,16 @@ class WorkflowMessageArchitect(MessageArchitectIF): if self.delivery is not None: template["delivery"] = self.delivery + + if self.carta_url is not None: + template["carta_url"] = self.carta_url + return DictView(template) + class ArchiveMessageArchitect(MessageArchitectIF): + """Builds archive system AMQP messages""" def __init__(self, routing_key: str, request: WorkflowRequestIF, carta_url: str = None): self.key = routing_key self.subject = request diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 478184e6b..0fd02f0b2 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -10,6 +10,8 @@ from pathlib import Path from tempfile import mkdtemp from typing import Dict, List, Union +# pylint: disable=E0401, W1203 + import requests import transaction from messaging.messenger import MessageSender @@ -18,14 +20,15 @@ from pycapo import CapoConfig from requests import Response from workspaces.workflow.enum import ArchiveWorkflows, WorkflowRequestState -from workspaces.workflow.message_architect import ArchiveMessageArchitect, WorkflowMessageArchitect +from workspaces.workflow.message_architect import ( + ArchiveMessageArchitect, + WorkflowMessageArchitect, +) from workspaces.workflow.schema import WorkflowRequest, WorkflowRequestFile from workspaces.workflow.schema_interfaces import WorkflowIF, WorkflowRequestIF from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF from workspaces.system.services.remote_processing_service import CapoInjector -# pylint: disable=E0401, W1203 - logger = logging.getLogger(__name__) @@ -34,7 +37,9 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): """This is the workflow service REST client.""" def __init__(self): - self.url = CapoConfig().settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl + self.url = ( + CapoConfig().settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl + ) def execute(self, request: WorkflowRequestIF): """ @@ -44,7 +49,8 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): :return: the response as JSON """ response = requests.post( - f"{self.url}/workflows/{request.workflow_name}" f"/requests/{request.workflow_request_id}/submit" + f"{self.url}/workflows/{request.workflow_name}" + f"/requests/{request.workflow_request_id}/submit" ) logger.info( "Got result %s with type %s and body %s", @@ -54,7 +60,9 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): ) return response.json() - def attach_file_to_request(self, request: WorkflowRequestIF, filename: str, content: bytes) -> Response: + def attach_file_to_request( + self, request: WorkflowRequestIF, filename: str, content: bytes + ) -> Response: """ Add a file to this workflow request. @@ -80,10 +88,14 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): :return: dict containing file content """ - response = requests.get(f"{self.url}/workflows/{name}/requests/{request_id}/files/{filename}") + response = requests.get( + f"{self.url}/workflows/{name}/requests/{request_id}/files/{filename}" + ) return response.content.decode() - def create_workflow_request(self, workflow: Union[str, WorkflowIF], argument: Dict) -> WorkflowRequestIF: + def create_workflow_request( + self, workflow: Union[str, WorkflowIF], argument: Dict + ) -> WorkflowRequestIF: """ Create a workflow request using the supplied arguments. @@ -93,7 +105,9 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): """ # 1. Handle the type ambiguity with the workflow argument - workflow_name = workflow if isinstance(workflow, str) else workflow.workflow_name + workflow_name = ( + workflow if isinstance(workflow, str) else workflow.workflow_name + ) # 2. Make the request result = requests.post( @@ -119,7 +133,9 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): :param request: completed workflow request to ingest :return: """ - requests.post(f"{self.url}/workflows/{request.workflow_name}/requests/{request.workflow_request_id}/ingest") + requests.post( + f"{self.url}/workflows/{request.workflow_name}/requests/{request.workflow_request_id}/ingest" + ) class WorkflowService(WorkflowServiceIF): @@ -137,9 +153,13 @@ class WorkflowService(WorkflowServiceIF): self.archive_router = message_handler.archive_router self.messenger = message_handler.messenger self.archive_messenger = message_handler.archive_messenger - self.processing_settings = CapoConfig().settings("edu.nrao.workspaces.ProcessingSettings") + self.processing_settings = CapoConfig().settings( + "edu.nrao.workspaces.ProcessingSettings" + ) - def attach_file_to_request(self, request: WorkflowRequestIF, filename: str, content: bytes): + def attach_file_to_request( + self, request: WorkflowRequestIF, filename: str, content: bytes + ): """ Add a file to a request; complain if the filename is bad. @@ -151,7 +171,10 @@ class WorkflowService(WorkflowServiceIF): """ forbidden = self._get_forbidden_templates_list(request.workflow_name) if filename not in forbidden: - if ArchiveWorkflows.is_archive_wf(request.workflow_name) and filename == "metadata.json": + if ( + ArchiveWorkflows.is_archive_wf(request.workflow_name) + and filename == "metadata.json" + ): content_dict = json.loads(content.decode()) content_dict["workflowName"] = request.workflow_name content_dict["data_location"] = request.argument["data_location"] @@ -192,6 +215,23 @@ class WorkflowService(WorkflowServiceIF): ).compose_message("carta_ready") self.archive_messenger.send_message(**carta_url_msg) + def send_carta_url_to_ws(self, workflow_request_id: int, carta_url: str): + """ + Hit REST endpoint with CARTA URL from CARTA envoy. + + :param workflow_request_id: ID of CARTA workflow that wants to send this message + :param carta_url: JSON blob containing CARTA URL + :return: + """ + logger.info( + f"SENDING CARTA MESSAGE to Workspaces for request #{workflow_request_id}!" + ) + wf_request = self.info.lookup_workflow_request(workflow_request_id) + carta_url_msg = WorkflowMessageArchitect( + request=wf_request, carta_url=carta_url + ).compose_message("carta_ready") + self.messenger.send_message(**carta_url_msg) + def execute(self, request: WorkflowRequest): """ Execute a workflow per the supplied parameters. @@ -210,7 +250,11 @@ class WorkflowService(WorkflowServiceIF): # create a temporary directory if processing directory is not supplied, # needs to exist before template rendering - temp_folder = self._make_temp_directory(request) if not request.results_dir else Path(request.results_dir) + temp_folder = ( + self._make_temp_directory(request) + if not request.results_dir + else Path(request.results_dir) + ) # if remote is true, create a capo subspace file in the request's directory if remote: @@ -226,7 +270,9 @@ class WorkflowService(WorkflowServiceIF): # render all the templates if request.argument["need_project_metadata"] is True: self.determine_multiple_productfetch(request) - templated_files = self._render_with_metadata(request, temp_folder, definition) + templated_files = self._render_with_metadata( + request, temp_folder, definition + ) # catch aat_wrest failure and abort workflow if isinstance(templated_files, WorkflowRequest): return request @@ -279,12 +325,14 @@ class WorkflowService(WorkflowServiceIF): if request.argument["need_data"] is True: spl_list = request.argument["product_locator"].split(",") fetcher_string = " --product-locator" - request.argument["product_locator"] = fetcher_string + fetcher_string.join( - " " + spl for spl in spl_list - ) + request.argument[ + "product_locator" + ] = fetcher_string + fetcher_string.join(" " + spl for spl in spl_list) request.argument["ramInGb"] = self.processing_settings.ramInGb - def _render_with_metadata(self, wf_request: WorkflowRequestIF, tempdir: Path, wf_definition: WorkflowIF): + def _render_with_metadata( + self, wf_request: WorkflowRequestIF, tempdir: Path, wf_definition: WorkflowIF + ): name = wf_request.workflow_name if "calibration" in name: wrest_type = "-sc" @@ -304,7 +352,9 @@ class WorkflowService(WorkflowServiceIF): argument2 = [] elif "ingest" in name: wrest_type = "-aux" - parent_req = self.info.lookup_workflow_request(int(wf_request.argument["parent_wf_request_id"])) + parent_req = self.info.lookup_workflow_request( + int(wf_request.argument["parent_wf_request_id"]) + ) eb = ( parent_req.argument["product_locator"] if "product_locator" in parent_req.argument @@ -313,7 +363,9 @@ class WorkflowService(WorkflowServiceIF): argument = eb argument2 = [] else: - logger.info(f"No wrester found for workflow {name}. Does it actually require metadata?") + logger.info( + f"No wrester found for workflow {name}. Does it actually require metadata?" + ) return wf_request logger.info(f" workflow {name} has wrest option: {wrest_type}") @@ -335,11 +387,15 @@ class WorkflowService(WorkflowServiceIF): else: logger.error(wf_json.decode()) logger.info("SENDING WORKFLOW FAIL MESSAGE!") - failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message("workflow_failed") + failed_msg = WorkflowMessageArchitect( + request=wf_request + ).compose_message("workflow_failed") self.messenger.send_message(**failed_msg) return wf_request - def _determine_usable_files(self, request: WorkflowRequestIF, templated_files: List[WorkflowRequestFile]): + def _determine_usable_files( + self, request: WorkflowRequestIF, templated_files: List[WorkflowRequestFile] + ): # Override templates if user supplied file has same name and is a valid input file usable_templates = [] usable_files = [] @@ -350,7 +406,10 @@ class WorkflowService(WorkflowServiceIF): usable_templates.append(template) # check if a newer version was supplied and is not in the list of restricted # access templates - if template.filename in request.files and template.filename in forbidden_templates: + if ( + template.filename in request.files + and template.filename in forbidden_templates + ): usable_templates.append(template) logger.info( f"Cannot append user file {request.files[template.filename]} to " @@ -364,7 +423,9 @@ class WorkflowService(WorkflowServiceIF): usable_files.append(file) for file in usable_templates: - self.info.save_file(request=request, filename=file.filename, content=file.content) + self.info.save_file( + request=request, filename=file.filename, content=file.content + ) return usable_templates + usable_files @@ -458,7 +519,9 @@ class WorkflowService(WorkflowServiceIF): def announce_qa_ready(self, wf_request_id): wf_request = self.info.lookup_workflow_request(wf_request_id) logger.info("ANNOUNCING QA READY!") - qa_ready_msg = WorkflowMessageArchitect(request=wf_request).compose_message("qa_ready") + qa_ready_msg = WorkflowMessageArchitect(request=wf_request).compose_message( + "qa_ready" + ) self.messenger.send_message(**qa_ready_msg) @@ -481,7 +544,9 @@ class WorkflowMessageHandler: """ logger.info("RECEIVED WORKFLOW COMPLETE MESSAGE. SENDING DELIVERY MESSAGE.") # look up the workflow request to get the path - wf_request = self.info.lookup_workflow_request(message["subject"]["workflow_request_id"]) + wf_request = self.info.lookup_workflow_request( + message["subject"]["workflow_request_id"] + ) # find the path to the delivery.json file delivery_file = pathlib.Path(wf_request.results_dir) / "delivery.json" @@ -517,9 +582,9 @@ class WorkflowMessageHandler: logger.info("SENDING INGESTION COMPLETE MESSAGE!") subject["execution_wf_id"] = wf_request.argument["parent_wf_request_id"] - ingestion_complete_msg = WorkflowMessageArchitect(previous_info=subject).compose_message( - "ingestion_complete" - ) + ingestion_complete_msg = WorkflowMessageArchitect( + previous_info=subject + ).compose_message("ingestion_complete") self.messenger.send_message(**ingestion_complete_msg) @on_message(service="workflow") @@ -542,7 +607,9 @@ class WorkflowMessageHandler: if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]): # Workflow has corresponding condor job ID - logger.info(f"Workflow request has an HTCondor job ID of {htcondor_job_id}. Setting DB column!") + logger.info( + f"Workflow request has an HTCondor job ID of {htcondor_job_id}. Setting DB column!" + ) request.htcondor_job_id = htcondor_job_id elif message["type"] == "workflow-complete": status = WorkflowRequestState.Complete.name @@ -560,7 +627,9 @@ class WorkflowMessageHandler: self.clean_remote_workflow(request) self.clean_workflow(request) - elif message["type"] == "delivery" or message["type"] == "ingestion-complete": + elif ( + message["type"] == "delivery" or message["type"] == "ingestion-complete" + ): status = WorkflowRequestState.Complete.name else: status = "Unknown" @@ -577,29 +646,39 @@ class WorkflowMessageHandler: except Exception as exc: transaction.abort() logger.error( - f"Failed to update status on workflow request " f"{request.workflow_request_id} to {status}: {exc}" + f"Failed to update status on workflow request " + f"{request.workflow_request_id} to {status}: {exc}" ) else: - logger.warning(f"Message {message} does not concern a workflow request. Ignoring.") + logger.warning( + f"Message {message} does not concern a workflow request. Ignoring." + ) def send_archive_failed_event(self, **message: Dict): subject = message["subject"] wf_id = subject["workflow_request_id"] wf_request = self.info.lookup_workflow_request(wf_id) - if wf_request.workflow_name == ArchiveWorkflows.CARTA.value and wf_request.argument["notify_ready"] is False: - logger.info(f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!") - routing_key = f"ws-workflow.carta-instance-ready.{wf_id}" - carta_url_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( - "carta_failed" + if ( + wf_request.workflow_name == ArchiveWorkflows.CARTA.value + and wf_request.argument["notify_ready"] is False + ): + logger.info( + f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!" ) + routing_key = f"ws-workflow.carta-instance-ready.{wf_id}" + carta_url_msg = ArchiveMessageArchitect( + routing_key=routing_key, request=wf_request + ).compose_message("carta_failed") self.archive_messenger.send_message(**carta_url_msg) if wf_request.workflow_name == ArchiveWorkflows.SECI.value: - logger.info(f"SENDING FAILED SECI MESSAGE to VLASS Manager for request #{wf_id}!") - routing_key = f"ws-workflow.seci.{wf_id}" - seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( - "seci_failed" + logger.info( + f"SENDING FAILED SECI MESSAGE to VLASS Manager for request #{wf_id}!" ) + routing_key = f"ws-workflow.seci.{wf_id}" + seci_msg = ArchiveMessageArchitect( + routing_key=routing_key, request=wf_request + ).compose_message("seci_failed") self.archive_messenger.send_message(**seci_msg) def send_archive_complete_event(self, **message: Dict): @@ -608,11 +687,13 @@ class WorkflowMessageHandler: wf_request = self.info.lookup_workflow_request(wf_id) if wf_request.workflow_name == ArchiveWorkflows.SECI.value: - logger.info(f"SENDING SECI COMPLETE MESSAGE to VLASS Manager for request #{wf_id}!") - routing_key = f"ws-workflow.seci.{wf_id}" - seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( - "seci_complete" + logger.info( + f"SENDING SECI COMPLETE MESSAGE to VLASS Manager for request #{wf_id}!" ) + routing_key = f"ws-workflow.seci.{wf_id}" + seci_msg = ArchiveMessageArchitect( + routing_key=routing_key, request=wf_request + ).compose_message("seci_complete") self.archive_messenger.send_message(**seci_msg) def clean_remote_workflow(self, request: WorkflowRequestIF): -- GitLab