diff --git a/apps/cli/utilities/wf_monitor/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/wf_monitor/monitor.py index 8bd65fd6d56c31e9225d0fee1abf438787ef72c6..a546340b961aed13a11299db371bce46518c8947 100644 --- a/apps/cli/utilities/wf_monitor/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/wf_monitor/monitor.py @@ -24,7 +24,6 @@ RabbitMQ exchange. Also input is a workflow request ID that is sent alongside th import argparse import functools import logging -import os import re import signal import time @@ -245,7 +244,7 @@ class WorkflowMonitor: time_remaining = timeout time_between_attempts = 3 # seconds signal.alarm(timeout) - while not os.path.exists(log_path): + while not log_path.is_file(): logger.info(f"Attempting to read log file {log_path}... {time_remaining} seconds left...") time.sleep(time_between_attempts) time_remaining -= time_between_attempts @@ -550,7 +549,10 @@ class WorkflowMonitor: return WorkflowStatusMessages.COMPLETE.value if return_value == 0 else WorkflowStatusMessages.FAILED.value def close(self): + logger.info("Waiting 3 seconds for final messages to clear....") + time.sleep(3) self.message_router.close() + logger.info(f"Monitor closed for {self.workflow_request_id}") def __str__(self): return f"WorkflowMonitor, monitoring {self.logfile_path} that has events {self.events}" @@ -619,6 +621,6 @@ def main(): """ args = make_arg_parser().parse_args() - wf_monitor = WorkflowMonitor(args.log_path, int(args.workflow_request_id), eval(args.is_dag)) + wf_monitor = WorkflowMonitor(args.log_path.rstrip(), int(args.workflow_request_id), eval(args.is_dag)) logger.info(f"Closing wf_monitor for workflow request #{args.workflow_request_id}") wf_monitor.close() diff --git a/apps/web/src/app/workspaces/services/workflow.service.ts b/apps/web/src/app/workspaces/services/workflow.service.ts index b67db62e81680129fdbc2defbe144a29f633a6fe..f5b2048058200c9a49eb0fe65de2776ec99bfce6 100644 --- a/apps/web/src/app/workspaces/services/workflow.service.ts +++ b/apps/web/src/app/workspaces/services/workflow.service.ts @@ -41,6 +41,6 @@ export class WorkflowService { abortWorkflow(requestID: string): Observable<string> { const url = this.endpoint + "requests/" + requestID + "/abort"; - return this.httpClient.post(url, JSON.stringify({"workflow_req_id": requestID}), {responseType: 'text'}); + return this.httpClient.post(url, JSON.stringify({"workflow_req_id": requestID, "clean_processing_dir": true}), {responseType: 'text'}); } } diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index 2f4d32f0b068a3467167db18f4514734b94ebec6..42f13670ef679b865911fccdaeff27a766201566 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -437,7 +437,8 @@ class WorkflowRequestRestService: Given the ID of a workflow request, abort the execution of the request if it is running """ request_id = self.request.matchdict["request_id"] - self.request.workflows.abort_running_workflow(request_id) + cleanup = self.request.json_body["clean_processing_dir"] + self.request.workflows.abort_running_workflow(request_id, clean_processing_dir=cleanup) return Response( status_code=http.HTTPStatus.OK, diff --git a/shared/workspaces/alembic/versions/13b9b6c84547_revert_vlass_imaging_syntax_changes.py b/shared/workspaces/alembic/versions/13b9b6c84547_revert_vlass_imaging_syntax_changes.py new file mode 100644 index 0000000000000000000000000000000000000000..faa59c0716792c97c028cb5880038b9423ffc774 --- /dev/null +++ b/shared/workspaces/alembic/versions/13b9b6c84547_revert_vlass_imaging_syntax_changes.py @@ -0,0 +1,69 @@ +"""revert vlass imaging syntax changes + +Revision ID: 13b9b6c84547 +Revises: b50243eb304b +Create Date: 2024-06-13 13:18:59.883782 + +""" +from pathlib import Path + +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "13b9b6c84547" +down_revision = "b50243eb304b" +branch_labels = None +depends_on = None + + +def set_content(wf_name: str, filename: str) -> str: + return (Path.cwd() / "versions" / "templates" / wf_name / filename).read_text() + + +def upgrade(): + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{set_content('vlass_coarse','envoy_condor_2.8.3.1.txt')}' + WHERE filename='vlass_coarse_envoy.condor' AND workflow_name='vlass_coarse' + """ + ) + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{set_content('vlass_quicklook', 'envoy_condor_2.8.3.1.txt')}' + WHERE filename='vlass_ql_envoy.condor' AND workflow_name='vlass_quicklook' + """ + ) + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{set_content('vlass_seci', 'envoy_condor_2.8.3.1.txt')}' + WHERE filename='vlass_seci_envoy.condor' AND workflow_name='vlass_seci' + """ + ) + + +def downgrade(): + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{set_content('vlass_coarse','envoy_condor_2.8.3.txt')}' + WHERE filename='vlass_coarse_envoy.condor' AND workflow_name='vlass_coarse' + """ + ) + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{set_content('vlass_quicklook', 'envoy_condor_2.8.3.txt')}' + WHERE filename='vlass_quicklook_envoy.condor' AND workflow_name='vlass_quicklook' + """ + ) + op.execute( + f""" + UPDATE workflow_templates + SET content=E'{set_content('vlass_seci', 'envoy_condor_2.8.3.txt')}' + WHERE filename='vlass_seci_envoy.condor' AND workflow_name='vlass_seci' + """ + ) diff --git a/shared/workspaces/alembic/versions/templates/vlass_coarse/envoy_condor_2.8.3.1.txt b/shared/workspaces/alembic/versions/templates/vlass_coarse/envoy_condor_2.8.3.1.txt new file mode 100644 index 0000000000000000000000000000000000000000..c05580373d6cbe2054209d4afa60bf1c5c24d8bd --- /dev/null +++ b/shared/workspaces/alembic/versions/templates/vlass_coarse/envoy_condor_2.8.3.1.txt @@ -0,0 +1,35 @@ +executable = vlass_coarse_envoy.sh +arguments = metadata.json PPR.xml {{requested_parallel}} + +output = envoy.out +error = envoy.err +log = condor.log + +VLASS_DIR = {{data_location}} +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +VLASS_BIN = /lustre/aoc/cluster/pipeline/vlass_{{vlass_env}}/workspaces/bin +should_transfer_files = yes +transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/.matplotlib, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/update_stage, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(VLASS_BIN)/planescraper, nraorsync://$(VLASS_DIR)/working, nraorsync://$(VLASS_DIR)/rawdata, nraorsync://$(VLASS_DIR)/products{{#remote}}, nraorsync://$(VLASS_DIR)/{{profile}}.properties{{/remote}}, nraorsync://$(VLASS_DIR)/PPR.xml, {{#sdm_id}}nraorsync://$(VLASS_DIR)/{{sdm_id}}_split.flagtargetstemplate.txt, {{/sdm_id}}nraorsync://$(VLASS_DIR)/metadata.json +when_to_transfer_output = ON_EXIT +transfer_output_files = .job.ad ++nrao_output_files = "working products planes.json" +output_destination = nraorsync://$(VLASS_DIR) ++WantIOProxy = True + +request_cpus = {{requested_parallel}} +request_memory = 150G +request_disk = 950G +getenv = True + +{{^remote}} +environment = "CAPO_PATH=/home/casa/capo" +requirements = (VLASS == True) && (HasLustre == True) ++partition = "VLASS" +{{/remote}} +{{#remote}} +requirements = (VLASS == True) ++partition = "VLASS" +Rank = (TARGET.VLASS == True) + (TARGET.VLASSTEST =!= True) + (HasLustre =!= True) +{{/remote}} + +queue diff --git a/shared/workspaces/alembic/versions/templates/vlass_quicklook/envoy_condor_2.8.3.1.txt b/shared/workspaces/alembic/versions/templates/vlass_quicklook/envoy_condor_2.8.3.1.txt new file mode 100644 index 0000000000000000000000000000000000000000..2919091811279f1b314290cf465e1363e4b6a681 --- /dev/null +++ b/shared/workspaces/alembic/versions/templates/vlass_quicklook/envoy_condor_2.8.3.1.txt @@ -0,0 +1,41 @@ +executable = vlass_ql_envoy.sh +arguments = metadata.json PPR.xml {{request_id}} + +output = envoy.out +error = envoy.err +log = condor.log + +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +VLASS_DIR = {{data_location}} +should_transfer_files = yes +transfer_input_files = {{#radial}}$ENV(HOME)/.ssh/condor_ssh_config, {{/radial}}$ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/.matplotlib, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/update_stage, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(VLASS_DIR)/working, nraorsync://$(VLASS_DIR)/rawdata, nraorsync://$(VLASS_DIR)/products{{#remote}}, nraorsync://$(VLASS_DIR)/{{profile}}.properties{{/remote}}, nraorsync://$(VLASS_DIR)/PPR.xml, {{#sdm_id}}nraorsync://$(VLASS_DIR)/{{sdm_id}}_split.flagtargetstemplate.txt, {{/sdm_id}}nraorsync://$(VLASS_DIR)/metadata.json{{files_to_transfer}} +transfer_output_files = .job.ad ++nrao_output_files = "working products" +when_to_transfer_output = ON_EXIT +output_destination = nraorsync://$(VLASS_DIR) ++WantIOProxy = True +{{#radial}} +universe = grid +grid_resource = condor radialhead.nrao.radial.local radialhead.nrao.radial.local ++remote_jobuniverse = 5 ++remote_requirements = True ++remote_ShouldTransferFiles = "YES" ++remote_WhenToTransferOutput = "ON_EXIT" +{{/radial}} + +request_memory = 31G +request_disk = 100G +getenv = True + +{{^remote}} +environment = "CAPO_PATH=/home/casa/capo" +requirements = (VLASS == True) && (HasLustre == True) ++partition = "VLASS" +{{/remote}} +{{#remote}} +requirements = (VLASS == True) ++partition = "VLASS" +Rank = (TARGET.VLASS == True) + (TARGET.VLASSTEST =!= True) + (HasLustre =!= True) +{{/remote}} + +queue diff --git a/shared/workspaces/alembic/versions/templates/vlass_seci/envoy_condor_2.8.3.1.txt b/shared/workspaces/alembic/versions/templates/vlass_seci/envoy_condor_2.8.3.1.txt new file mode 100644 index 0000000000000000000000000000000000000000..a8cf020bb14c4a0905f7ffe5a9d727b4aef94554 --- /dev/null +++ b/shared/workspaces/alembic/versions/templates/vlass_seci/envoy_condor_2.8.3.1.txt @@ -0,0 +1,33 @@ +executable = vlass_seci_envoy.sh +arguments = metadata.json PPR.xml + +output = envoy.out +error = envoy.err +log = condor.log + +VLASS_DIR = {{data_location}} +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +should_transfer_files = yes +transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/.matplotlib, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/update_stage, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(VLASS_DIR)/working, nraorsync://$(VLASS_DIR)/rawdata, nraorsync://$(VLASS_DIR)/products{{#remote}}, nraorsync://$(VLASS_DIR)/{{profile}}.properties{{/remote}}, nraorsync://$(VLASS_DIR)/PPR.xml, {{#sdm_id}}nraorsync://$(VLASS_DIR)/{{sdm_id}}_split.flagtargetstemplate.txt, {{/sdm_id}}nraorsync://$(VLASS_DIR)/metadata.json +when_to_transfer_output = ON_EXIT +transfer_output_files = .job.ad ++nrao_output_files = "working products" +output_destination = nraorsync://$(VLASS_DIR) ++WantIOProxy = True + +request_memory = 50G +request_disk = 200G +getenv = True + +{{^remote}} +environment = "CAPO_PATH=/home/casa/capo" +requirements = (VLASS == True) && (HasLustre == True) ++partition = "VLASS" +{{/remote}} +{{#remote}} +requirements = (VLASS == True) ++partition = "VLASS" +Rank = (TARGET.VLASS == True) + (TARGET.VLASSTEST =!= True) + (HasLustre =!= True) +{{/remote}} + +queue diff --git a/shared/workspaces/workspaces/capability/schema.py b/shared/workspaces/workspaces/capability/schema.py index c857e5303f3d9fb757dd60c138cb8d8875af04f6..127e6b856c052143536ff1fc38d180455fa06a03 100644 --- a/shared/workspaces/workspaces/capability/schema.py +++ b/shared/workspaces/workspaces/capability/schema.py @@ -693,7 +693,7 @@ class QaUtilities: """ workflow_service_url = CapoConfig().settings(WORKFLOW_SETTINGS_KEY).serviceUrl url = f"{workflow_service_url}/workflows/requests/{workflow_request_id}/abort" - requests.post(url, json={"capability_version": version_number}) + requests.post(url, json={"capability_version": version_number, "clean_processing_dir": True}) @staticmethod def run_qa_fail_workflow(version: CapabilityVersion): diff --git a/shared/workspaces/workspaces/workflow/services/recovery.py b/shared/workspaces/workspaces/workflow/services/recovery.py index 165dc1cd9e90a330a618ce9577905167af32cd41..674af671f8d2827a1bf728b3ab281e6be6e0db17 100644 --- a/shared/workspaces/workspaces/workflow/services/recovery.py +++ b/shared/workspaces/workspaces/workflow/services/recovery.py @@ -21,7 +21,6 @@ Workspaces Workflow Service - Workflow Recovery Facility """ import logging import os -import pathlib from pathlib import Path from typing import List @@ -46,7 +45,7 @@ def _ensure_initial_startup() -> bool: ensure_initial_file = "reacquire.txt" - if pathlib.Path(ensure_initial_file).exists(): + if Path(ensure_initial_file).exists(): return False content = f"The Workflow Reacquire system was started within this container at {pendulum.now().utcnow()}\n" @@ -89,16 +88,20 @@ class MonitorRecover: inflights = [] profile = get_environment() - for row in self.info.session.get_bind().execute( - text( - f""" + for row in ( + self.info.session.get_bind() + .execute( + text( + f""" SELECT workflow_request_id as wf_id FROM workflow_requests WHERE state not in ('Created', 'Complete', 'Failed', 'Error') AND results_dir like ('%{profile}%') AND cleaned = 'False' """ + ) ) + .fetchall() ): inflights.append(self.info.lookup_workflow_request(request_id=row[0])) @@ -114,31 +117,35 @@ class MonitorRecover: results_dir = Path(wf_request.results_dir) if results_dir.is_dir(): - # because for some reason these are not the same.... - if results_dir == pathlib.Path(".") or results_dir == pathlib.Path.cwd() or results_dir == pathlib.Path(""): + if results_dir.resolve() == Path.cwd(): logger.info(f"Results directory for Request #{wf_request.workflow_request_id} not found. Skipping.") - return Path(".") + return Path.cwd() else: logger.info(f"Searching for log file for Request #{wf_request.workflow_request_id}...") - dag_files = list(results_dir.glob("*.dag")) - if dag_files: + # Start by looking for a dag log. + # Most workflows are DAGs and monitoring will terminate prematurely if condor.log is used + dag_log = list(results_dir.glob("*.dag.dagman.log")) + if dag_log: logger.info("Found dagman logfile!") - return Path(str(dag_files[0]) + ".dagman.log") - - for root, dirs, files in os.walk(results_dir): - for file in files: - if file.endswith(".log") and ( - file.startswith(wf_request.workflow_name) or file.startswith("condor") - ): - logger.info("Found condor logfile!") - return file + return dag_log[0] + + # No dag log was found, fall back to condor.log + condor_log = list(results_dir.glob("condor.log")) + if condor_log: + logger.info("Found condor logfile!") + return condor_log[0] + else: + # No usable log was found + logger.info(f"No log file found for Request #{wf_request.workflow_request_id}. Skipping.") + return Path.cwd() + else: logger.info( f"Results directory {results_dir} for request #{wf_request.workflow_request_id} " f"is not a directory! Skipping." ) # signal not a directory with current location - return Path(".") + return Path.cwd() @staticmethod def _restart_monitor(logfile: Path, wf_request: WorkflowRequest): @@ -149,8 +156,7 @@ class MonitorRecover: :param wf_request: The WorkflowRequest to update :return: """ - # because for some reason these are not the same.... - if logfile == Path.cwd() or logfile == Path(".") or logfile is None: + if logfile is None or logfile.resolve() == Path.cwd(): return logger.info("Running wf_monitor.") @@ -167,10 +173,14 @@ class MonitorRecover: """ if _ensure_initial_startup(): - inflight = self._on_restart() + inflights = self._on_restart() + if len(inflights) == 0: + logger.info("No in-flight workflows found, skipping workflow reacquisition.") - for request in inflight: + for request in inflights: logger.info(f"Reacquiring monitor for Request #{request.workflow_request_id}...") self._restart_monitor(self._reacquire_logfile(request), request) + + logger.info("Workflow reacquisition complete.") else: logger.info("Not initial container startup, skipping workflow reacquisition.") diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 151c04ae7636268ed1920a060a339a835d17ee6a..34284290084dca0c4c8d2f5b3b9dd441869ca1c1 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -29,6 +29,7 @@ import stat import subprocess from pathlib import Path from tempfile import mkdtemp +from time import sleep from typing import Dict, List, Union import requests @@ -366,12 +367,14 @@ class WorkflowService(WorkflowServiceIF): request.argument["spool_dir"] = request.results_dir # Send results_dir to capability layer to store in the workflow metadata - dir_info = WorkflowMessageArchitect( - request=request, json_payload={"processing_dir": request.argument["spool_dir"]} - ).compose_message("update_wf_metadata") + if request.controller == "WS": + # Don't send to external controllers + dir_info = WorkflowMessageArchitect( + request=request, json_payload={"processing_dir": request.argument["spool_dir"]} + ).compose_message("update_wf_metadata") - # send the message - self.messenger.send_message(**dir_info) + # send the message + self.messenger.send_message(**dir_info) # render all the templates if request.argument["need_project_metadata"] is True: @@ -404,25 +407,63 @@ class WorkflowService(WorkflowServiceIF): return request - def abort_running_workflow(self, request_id: int): + def abort_running_workflow(self, request_id: int, clean_processing_dir: bool = False): """ Given the ID for a running workflow request, attempt to abort the request's execution (using condor_rm) :param request_id: ID of running request + :param clean_processing_dir: bool """ workflow_request = self.info.lookup_workflow_request(request_id) htcondor_job_id = workflow_request.htcondor_job_id if htcondor_job_id: - subprocess.run(["condor_rm", f"{htcondor_job_id!s}"]) + response = subprocess.run(["condor_rm", f"{htcondor_job_id!s}"], capture_output=True) + outerr = response.stderr.decode() + if "condor_rm:0:There are no jobs in the queue" in outerr: + logger.info(f"No condor job for request #{request_id} was found. Forcing fail transition.") + self.send_forced_fail(request_id) else: logger.warning( f"Workflow request #{request_id} doesn't have an associated HTCondor job ID. Forcing fail transition..." ) self.send_forced_fail(request_id) - # Remove the results - logger.info(f"Removing contents at {workflow_request.results_dir}") - shutil.rmtree(workflow_request.results_dir) + if clean_processing_dir: + # Remove the processing directory + if Path(workflow_request.results_dir).exists(): + logger.info(f"Removing contents at {workflow_request.results_dir}") + + wait_on_final_state = True + iteration = 1 + while wait_on_final_state: + workflow_request = self.info.lookup_workflow_request(request_id) + logger.debug(f"Current wf state: {workflow_request.state}") + if ( + workflow_request.state == WorkflowRequestState.Failed.name + or workflow_request.state == WorkflowRequestState.Error.name + ): + try: + shutil.rmtree(workflow_request.results_dir) + except OSError: + logger.error( + f"ERROR: unable to remove directory {workflow_request.results_dir}. Directory will wait for regular cleanup." + ) + wait_on_final_state = False + elif iteration >= 10: + # break loop if unable to detect final state update + logger.warning( + f"WARNING: workflow state updates may be stuck for request #{request_id}. " + f"Unable to remove directory {workflow_request.results_dir}. Directory will wait for regular cleanup." + ) + wait_on_final_state = False + else: + # exit current transaction to allow for object updates + transaction.commit() + iteration += 1 + # wait slightly for htcondor job to exit fully after abort and WS messaging to react for state update + sleep(1) + else: + logger.info(f"Processing directory {workflow_request.results_dir} has already been cleaned.") def _make_temp_directory(self, request: WorkflowRequest) -> Path: """ @@ -664,8 +705,7 @@ class WorkflowService(WorkflowServiceIF): """ # ensure the log file exists logfile = self._get_job_logfile_name(job_file) - logger.info("log file %s exists.", logfile) - logfile.touch() + self._create_base_logfile(logfile) # submit logger.info("submitting job to condor...") @@ -686,10 +726,12 @@ class WorkflowService(WorkflowServiceIF): :param dag_file: Path to DAG submit file :return: Path to workflow log file """ - # ensure the log file exists + # ensure the dagman.log file exists logfile = self._get_dag_logfile_name(dag_file) - logger.info("log file %s exists.", logfile) - logfile.touch() + self._create_base_logfile(logfile) + # ensure the condor.log file exists, don't rely on workflow creation, DAG monitoring requires both + condorlog_path = folder.absolute() / "condor.log" + self._create_base_logfile(condorlog_path) # submit logger.info("submitting DAG to condor...") @@ -741,6 +783,32 @@ class WorkflowService(WorkflowServiceIF): """ return Path(str(dag_file) + ".dagman.log") + def _create_base_logfile(self, logfile: Path, iteration: int = 1) -> bool: + """ + Create the expected log file for submission + + :param logfile: the log to create + :param iteration: the number of current creation attempts + :return: boolean representing if file was created + """ + if iteration == 3: + # try to create file several times, but don't be stupid about it + logger.warning(f"Warning: Failed to create {logfile} multiple times. Workflow may fail.") + return False + + # create needed logfile if it doesn't exist + logfile.touch() + # verify that the file was created + if logfile.is_file(): + logger.info("log file %s exists.", logfile) + return True + else: + logger.warning(f"Failed to create {logfile}. Trying again") + # sleep slightly, but not too long, and try again + sleep(0.5) + iteration += 1 + self._create_base_logfile(logfile, iteration) + @staticmethod def _get_forbidden_templates_list(wf_request: WorkflowRequest) -> List[str]: """ @@ -937,10 +1005,12 @@ class WorkflowMessageHandler: request.htcondor_job_id = htcondor_job_id request.htcondor_iterations = 1 # update UI - iterations_msg = WorkflowMessageArchitect( - request=request, json_payload={"htcondor_iterations": request.htcondor_iterations} - ).compose_message("update_wf_metadata") - self.messenger.send_message(**iterations_msg) + if request.controller == "WS": + # don't send to external systems + iterations_msg = WorkflowMessageArchitect( + request=request, json_payload={"htcondor_iterations": request.htcondor_iterations} + ).compose_message("update_wf_metadata") + self.messenger.send_message(**iterations_msg) elif message["type"] == "workflow-updated": status = WorkflowRequestState.Running.name if self._verify_state_change(request.state) else None @@ -952,6 +1022,10 @@ class WorkflowMessageHandler: if any(c.control_system == request.controller for c in self.info.all_external_controllers()): tack_on = {**message, **stage_info} self.send_external_event("update", **tack_on) + else: + logger.warning( + f"Unable to update stage for request #{request.workflow_request_id}, received stage_info {stage_info}" + ) elif message["type"] == "workflow-failed": request = self.info.refresh_request(request) @@ -980,19 +1054,21 @@ class WorkflowMessageHandler: logger.warning(user_message) # update UI - paused_msg = WorkflowMessageArchitect( - request=request, - json_payload={ - "user_warning_message": user_message, - }, - ).compose_message("update_wf_metadata") - self.messenger.send_message(**paused_msg) + if request.controller == "WS": + # don't send to external system + paused_msg = WorkflowMessageArchitect( + request=request, + json_payload={ + "user_warning_message": user_message, + }, + ).compose_message("update_wf_metadata") + self.messenger.send_message(**paused_msg) elif message["type"] == "workflow-restarted": request.htcondor_iterations = request.htcondor_iterations + 1 if request.htcondor_iterations > self.ITERATION_LIMIT: - status = WorkflowRequestState.Error + status = WorkflowRequestState.Error.name user_message = ( f"Found possible infinite rescheduling loop in " f"request #{request.workflow_request_id}, HTCondor iteration " @@ -1000,7 +1076,8 @@ class WorkflowMessageHandler: ) logger.error(user_message) url = self.url + f"/workflows/requests/{request.workflow_request_id}/abort" - requests.post(url) + payload = {"clean_processing_dir": False} + requests.post(url, json=payload) else: status = WorkflowRequestState.Running.name user_message = ( @@ -1010,15 +1087,16 @@ class WorkflowMessageHandler: ) logger.warning(user_message) - # update UI - iterations_msg = WorkflowMessageArchitect( - request=request, - json_payload={ - "htcondor_iterations": request.htcondor_iterations, - "user_warning_message": user_message, - }, - ).compose_message("update_wf_metadata") - self.messenger.send_message(**iterations_msg) + if request.controller == "WS": + # update UI, but don't try sending for externally initiated workflows + iterations_msg = WorkflowMessageArchitect( + request=request, + json_payload={ + "htcondor_iterations": request.htcondor_iterations, + "user_warning_message": user_message, + }, + ).compose_message("update_wf_metadata") + self.messenger.send_message(**iterations_msg) elif message["type"] == "workflow-complete": request = self.info.refresh_request(request)