From 8873f2489944676b1a8816f820bd8b5b301922d0 Mon Sep 17 00:00:00 2001 From: nhertz <nhertz@nrao.edu> Date: Wed, 15 Sep 2021 10:35:25 -0600 Subject: [PATCH] Formatting changes (changed line length to 120) --- .../workflow/services/workflow_service.py | 90 ++++++------------- 1 file changed, 28 insertions(+), 62 deletions(-) diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 61becb843..58e3496a8 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -47,8 +47,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): :return: the response as JSON """ response = requests.post( - f"{self.url}/workflows/{request.workflow_name}" - f"/requests/{request.workflow_request_id}/submit" + f"{self.url}/workflows/{request.workflow_name}" f"/requests/{request.workflow_request_id}/submit" ) logger.info( "Got result %s with type %s and body %s", @@ -58,9 +57,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): ) return response.json() - def attach_file_to_request( - self, request: WorkflowRequestIF, filename: str, content: bytes - ) -> Response: + def attach_file_to_request(self, request: WorkflowRequestIF, filename: str, content: bytes) -> Response: """ Add a file to this workflow request. @@ -86,14 +83,10 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): :return: dict containing file content """ - response = requests.get( - f"{self.url}/workflows/{name}/requests/{request_id}/files/{filename}" - ) + response = requests.get(f"{self.url}/workflows/{name}/requests/{request_id}/files/{filename}") return response.content.decode() - def create_workflow_request( - self, workflow: Union[str, WorkflowIF], argument: Dict - ) -> WorkflowRequestIF: + def create_workflow_request(self, workflow: Union[str, WorkflowIF], argument: Dict) -> WorkflowRequestIF: """ Create a workflow request using the supplied arguments. @@ -129,9 +122,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF): :param request: completed workflow request to ingest :return: """ - requests.post( - f"{self.url}/workflows/{request.workflow_name}/requests/{request.workflow_request_id}/ingest" - ) + requests.post(f"{self.url}/workflows/{request.workflow_name}/requests/{request.workflow_request_id}/ingest") class WorkflowService(WorkflowServiceIF): @@ -163,10 +154,7 @@ class WorkflowService(WorkflowServiceIF): """ forbidden = self._get_forbidden_templates_list(request.workflow_name) if filename not in forbidden: - if ( - ArchiveWorkflows.is_archive_wf(request.workflow_name) - and filename == "metadata.json" - ): + if ArchiveWorkflows.is_archive_wf(request.workflow_name) and filename == "metadata.json": content_dict = json.loads(content.decode()) content_dict["workflowName"] = request.workflow_name content_dict["data_location"] = request.argument["data_location"] @@ -199,9 +187,7 @@ class WorkflowService(WorkflowServiceIF): :param workflow_request_id: ID of carta workflow that wants to send this message :param carta_url: JSON blob with CARTA URL """ - logger.info( - f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!" - ) + logger.info(f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!") wf_request = self.info.lookup_workflow_request(workflow_request_id) routing_key = f"ws-workflow.carta-instance-ready.{workflow_request_id}" carta_url_msg = ArchiveMessageArchitect( @@ -227,11 +213,7 @@ class WorkflowService(WorkflowServiceIF): # create a temporary directory if processing directory is not supplied, # needs to exist before template rendering - temp_folder = ( - self._make_temp_directory(request) - if not request.results_dir - else Path(request.results_dir) - ) + temp_folder = self._make_temp_directory(request) if not request.results_dir else Path(request.results_dir) # if remote is true, create a capo subspace file in the request's directory if remote: @@ -305,9 +287,7 @@ class WorkflowService(WorkflowServiceIF): ) request.argument["ramInGb"] = self.processing_settings.ramInGb - def _render_with_metadata( - self, wf_request: WorkflowRequestIF, tempdir: Path, wf_definition: WorkflowIF - ): + def _render_with_metadata(self, wf_request: WorkflowRequestIF, tempdir: Path, wf_definition: WorkflowIF): name = wf_request.workflow_name if "calibration" in name: wrest_type = "-sc" @@ -327,9 +307,7 @@ class WorkflowService(WorkflowServiceIF): argument2 = [] elif "ingest" in name: wrest_type = "-aux" - parent_req = self.info.lookup_workflow_request( - int(wf_request.argument["parent_wf_request_id"]) - ) + parent_req = self.info.lookup_workflow_request(int(wf_request.argument["parent_wf_request_id"])) eb = ( parent_req.argument["product_locator"] if "product_locator" in parent_req.argument @@ -360,15 +338,11 @@ class WorkflowService(WorkflowServiceIF): else: logger.error(wf_json.decode()) logger.info("SENDING WORKFLOW FAIL MESSAGE!") - failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message( - "workflow_failed" - ) + failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message("workflow_failed") self.messenger.send_message(**failed_msg) return wf_request - def _determine_usable_files( - self, request: WorkflowRequestIF, templated_files: List[WorkflowRequestFile] - ): + def _determine_usable_files(self, request: WorkflowRequestIF, templated_files: List[WorkflowRequestFile]): # Override templates if user supplied file has same name and is a valid input file usable_templates = [] usable_files = [] @@ -591,9 +565,9 @@ class WorkflowMessageHandler: logger.info("SENDING INGESTION COMPLETE MESSAGE!") subject["execution_wf_id"] = wf_request.argument["parent_wf_request_id"] - ingestion_complete_msg = WorkflowMessageArchitect( - previous_info=subject - ).compose_message("ingestion_complete") + ingestion_complete_msg = WorkflowMessageArchitect(previous_info=subject).compose_message( + "ingestion_complete" + ) self.messenger.send_message(**ingestion_complete_msg) @on_message(service="workflow") @@ -616,9 +590,7 @@ class WorkflowMessageHandler: if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]): # Workflow has corresponding condor job ID - logger.info( - f"Workflow request has an HTCondor job ID of {htcondor_job_id}. Setting DB column!" - ) + logger.info(f"Workflow request has an HTCondor job ID of {htcondor_job_id}. Setting DB column!") request.htcondor_job_id = htcondor_job_id elif message["type"] == "workflow-complete": status = WorkflowRequestState.Complete.name @@ -653,8 +625,7 @@ class WorkflowMessageHandler: except Exception as exc: transaction.abort() logger.error( - f"Failed to update status on workflow request " - f"{request.workflow_request_id} to {status}: {exc}" + f"Failed to update status on workflow request " f"{request.workflow_request_id} to {status}: {exc}" ) else: logger.warning(f"Message {message} does not concern a workflow request. Ignoring.") @@ -664,24 +635,19 @@ class WorkflowMessageHandler: wf_id = subject["workflow_request_id"] wf_request = self.info.lookup_workflow_request(wf_id) - if ( - wf_request.workflow_name == ArchiveWorkflows.CARTA.value - and wf_request.argument["notify_ready"] is False - ): - logger.info( - f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!" - ) + if wf_request.workflow_name == ArchiveWorkflows.CARTA.value and wf_request.argument["notify_ready"] is False: + logger.info(f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!") routing_key = f"ws-workflow.carta-instance-ready.{wf_id}" - carta_url_msg = ArchiveMessageArchitect( - routing_key=routing_key, request=wf_request - ).compose_message("carta_failed") + carta_url_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "carta_failed" + ) self.archive_messenger.send_message(**carta_url_msg) if wf_request.workflow_name == ArchiveWorkflows.SECI.value: logger.info(f"SENDING FAILED SECI MESSAGE to VLASS Manager for request #{wf_id}!") routing_key = f"ws-workflow.seci.{wf_id}" - seci_msg = ArchiveMessageArchitect( - routing_key=routing_key, request=wf_request - ).compose_message("seci_failed") + seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "seci_failed" + ) self.archive_messenger.send_message(**seci_msg) def send_archive_complete_event(self, **message: Dict): @@ -692,9 +658,9 @@ class WorkflowMessageHandler: if wf_request.workflow_name == ArchiveWorkflows.SECI.value: logger.info(f"SENDING SECI COMPLETE MESSAGE to VLASS Manager for request #{wf_id}!") routing_key = f"ws-workflow.seci.{wf_id}" - seci_msg = ArchiveMessageArchitect( - routing_key=routing_key, request=wf_request - ).compose_message("seci_complete") + seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message( + "seci_complete" + ) self.archive_messenger.send_message(**seci_msg) def clean_remote_workflow(self, request: WorkflowRequestIF): -- GitLab