Skip to content
Snippets Groups Projects
Commit 97212939 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

Merge branch 'update-reacquire' into '2.8.3.1-patch'

fixes and updates for the workflow reacquire system

See merge request !1664
parents 8d17af07 7f5b0d23
No related branches found
No related tags found
3 merge requests!1687Pickup patch changes,!1685merge patch to main,!1664fixes and updates for the workflow reacquire system
......@@ -21,7 +21,6 @@ Workspaces Workflow Service - Workflow Recovery Facility
"""
import logging
import os
import pathlib
from pathlib import Path
from typing import List
......@@ -46,7 +45,7 @@ def _ensure_initial_startup() -> bool:
ensure_initial_file = "reacquire.txt"
if pathlib.Path(ensure_initial_file).exists():
if Path(ensure_initial_file).exists():
return False
content = f"The Workflow Reacquire system was started within this container at {pendulum.now().utcnow()}\n"
......@@ -89,16 +88,20 @@ class MonitorRecover:
inflights = []
profile = get_environment()
for row in self.info.session.get_bind().execute(
text(
f"""
for row in (
self.info.session.get_bind()
.execute(
text(
f"""
SELECT workflow_request_id as wf_id
FROM workflow_requests
WHERE state not in ('Created', 'Complete', 'Failed', 'Error')
AND results_dir like ('%{profile}%')
AND cleaned = 'False'
"""
)
)
.fetchall()
):
inflights.append(self.info.lookup_workflow_request(request_id=row[0]))
......@@ -114,31 +117,35 @@ class MonitorRecover:
results_dir = Path(wf_request.results_dir)
if results_dir.is_dir():
# because for some reason these are not the same....
if results_dir == pathlib.Path(".") or results_dir == pathlib.Path.cwd():
if results_dir.resolve() == Path.cwd():
logger.info(f"Results directory for Request #{wf_request.workflow_request_id} not found. Skipping.")
return Path(".")
return Path.cwd()
else:
logger.info(f"Searching for log file for Request #{wf_request.workflow_request_id}...")
dag_files = list(results_dir.glob("*.dag"))
if dag_files:
# Start by looking for a dag log.
# Most workflows are DAGs and monitoring will terminate prematurely if condor.log is used
dag_log = list(results_dir.glob("*.dag.dagman.log"))
if dag_log:
logger.info("Found dagman logfile!")
return Path(str(dag_files[0]) + ".dagman.log")
for root, dirs, files in os.walk(results_dir):
for file in files:
if file.endswith(".log") and (
file.startswith(wf_request.workflow_name) or file.startswith("condor")
):
logger.info("Found condor logfile!")
return file
return dag_log[0]
# No dag log was found, fall back to condor.log
condor_log = list(results_dir.glob("condor.log"))
if condor_log:
logger.info("Found condor logfile!")
return condor_log[0]
else:
# No usable log was found
logger.info(f"No log file found for Request #{wf_request.workflow_request_id}. Skipping.")
return Path.cwd()
else:
logger.info(
f"Results directory {results_dir} for request #{wf_request.workflow_request_id} "
f"is not a directory! Skipping."
)
# signal not a directory with current location
return Path(".")
return Path.cwd()
@staticmethod
def _restart_monitor(logfile: Path, wf_request: WorkflowRequest):
......@@ -149,8 +156,7 @@ class MonitorRecover:
:param wf_request: The WorkflowRequest to update
:return:
"""
# because for some reason these are not the same....
if logfile == Path.cwd() or logfile == Path("."):
if logfile is None or logfile.resolve() == Path.cwd():
return
logger.info("Running wf_monitor.")
......@@ -167,10 +173,14 @@ class MonitorRecover:
"""
if _ensure_initial_startup():
inflight = self._on_restart()
inflights = self._on_restart()
if len(inflights) == 0:
logger.info("No in-flight workflows found, skipping workflow reacquisition.")
for request in inflight:
for request in inflights:
logger.info(f"Reacquiring monitor for Request #{request.workflow_request_id}...")
self._restart_monitor(self._reacquire_logfile(request), request)
logger.info("Workflow reacquisition complete.")
else:
logger.info("Not initial container startup, skipping workflow reacquisition.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment