diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index e8fdd4280dd4749d207361d7426716f05d86bfcd..eaae705a54238057a34cb25d641ec63cfcb61762 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -368,7 +368,11 @@ class WorkflowService(WorkflowServiceIF): if isinstance(templated_files, WorkflowRequest): return request else: - self.determine_multiple_productfetch(request) + fetch_args = self.generate_productfetcher_arguments(request) + if isinstance(fetch_args, WorkflowRequest): + # Encountered an error generating fetch args, abort workflow + return request + request.argument["fetch_args"] = fetch_args # will only be used if key is in the templates request.argument["ramInGb"] = self.processing_settings.ramInGb @@ -484,27 +488,37 @@ class WorkflowService(WorkflowServiceIF): request.results_dir = temp_folder.__str__() return temp_folder - def determine_multiple_productfetch(self, wf_request: WorkflowRequest): + def generate_productfetcher_arguments(self, wf_request: WorkflowRequest) -> str | WorkflowRequest: """ - Are we fetching more than one product? + If productfetcher will be called by the given WFRequest, generate the arguments it should be called with - :param wf_request: the workflow request - :return: + :param wf_request: the WorkflowRequest to generate productfetcher arguments for + :return: String representing everything that should follow 'productfetcher' when it's called in wf_request + :return: WorkflowRequest to indicate an error in generating the arguments """ + argument = "" if wf_request.workflow_name in ["download", "restore_cms"] or ( "need_data" in wf_request.argument and wf_request.argument["need_data"] is True ): spl_list = wf_request.argument["product_locator"].split(",") fetcher_string = " --product-locator" + special_flag = "" if "products" in wf_request.argument and any( product["product_type"] == "Image" for product in wf_request.argument["products"] ): - image_flag = "--image-ancillaries" - wf_request.argument["fetch_args"] = ( - image_flag + fetcher_string + fetcher_string.join(" " + spl for spl in spl_list) - ) - else: - wf_request.argument["fetch_args"] = fetcher_string + fetcher_string.join(" " + spl for spl in spl_list) + special_flag = "--image-ancillaries" + elif "products" in wf_request.argument and any( + product["product_type"].lower() == "vlass coarse cube" for product in wf_request.argument["products"] + ): + if "requested_full_cube" not in wf_request.argument: + logger.info("SENDING WORKFLOW FAIL MESSAGE!") + failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message("workflow_failed") + self.messenger.send_message(**failed_msg) + return wf_request + if wf_request.argument["requested_full_cube"] is True: + special_flag = " --also-ancillaries" + argument = special_flag + fetcher_string + fetcher_string.join(" " + spl for spl in spl_list) + return argument def get_casa_home(self, wf_request: WorkflowRequest): """ @@ -608,7 +622,11 @@ class WorkflowService(WorkflowServiceIF): } wf_request.argument = merged_args if arg_list[0] in ["--product", "--carta", "-r"]: - self.determine_multiple_productfetch(wf_request) + fetch_args = self.generate_productfetcher_arguments(wf_request) + if isinstance(fetch_args, WorkflowRequest): + # Encountered an error generating fetch args, abort execution + return wf_request + wf_request.argument["fetch_args"] = fetch_args if wf_request.workflow.uses_casa: self.get_casa_home(wf_request)