Skip to content
Snippets Groups Projects
Commit 612c9a51 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Now takes in a required workflow_request_id argument to conform to

updated WorkflowEvent shape
parent 3471f73b
No related branches found
No related tags found
No related merge requests found
......@@ -16,7 +16,7 @@ from ._version import ___version___ as VERSION
from channels.amqp_helpers import workflow_events
from workspaces.schema import WorkflowEvent, WorkflowEventType
logger = logging.getLogger('wf_monitor')
logger = logging.getLogger("wf_monitor")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
......@@ -27,12 +27,14 @@ def log_decorator_factory(msg: str):
:param msg: Message to be logged
:return: Log decorator
"""
def log_decorator(f: Callable) -> Callable:
"""
Decorator that logs a message before and after a function is called
:param f: Function to be decorated
:return: Wrapper function
"""
@functools.wraps(f)
def wrapper(*args: Tuple, **kwargs: Dict) -> Any:
"""
......@@ -45,7 +47,9 @@ def log_decorator_factory(msg: str):
result = f(*args, **kwargs)
logger.info("Done.")
return result
return wrapper
return log_decorator
......@@ -56,8 +60,10 @@ def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None:
:param signum: Signal number (would be signal.SIGALRM in this case)
:param frame: Stack frame the signal was sent from
"""
msg = 'Timeout elapsed when attempting to read HTCondor log file. The job either did ' \
'not complete or the path to the log was not entered correctly.'
msg = (
"Timeout elapsed when attempting to read HTCondor log file. The job either did "
"not complete or the path to the log was not entered correctly."
)
raise TimeoutError(msg)
......@@ -72,9 +78,9 @@ def get_job_name(body: str) -> str:
:param body: Body of the log entry
:return: Job name
"""
r_body_parse = r'[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)'
r_body_parse = r"[a-zA-Z0-9-_ ]+: (?P<jobname>[a-zA-Z0-9-_]+)"
body_match = re.search(r_body_parse, body)
return body_match.group('jobname')
return body_match.group("jobname")
def get_year(logfile_path: Path) -> str:
......@@ -96,18 +102,20 @@ def fmt_timestamp(timestamp: str, year: str) -> str:
:param year: Year HTCondor log was created as a string
:return: Formatted timestamp string
"""
r_nums = r'[0-9]{2}'
r_timestamp = rf'(?P<mon>{r_nums})/(?P<d>{r_nums}) ' \
rf'(?P<h>{r_nums}):(?P<m>{r_nums}):(?P<s>{r_nums})'
r_nums = r"[0-9]{2}"
r_timestamp = (
rf"(?P<mon>{r_nums})/(?P<d>{r_nums}) "
rf"(?P<h>{r_nums}):(?P<m>{r_nums}):(?P<s>{r_nums})"
)
match = re.search(r_timestamp, timestamp)
return '{year}-{mon}-{day}T{h}:{m}:{s}+07:00'.format(
return "{year}-{mon}-{day}T{h}:{m}:{s}+07:00".format(
year=year,
mon=match.group('mon'),
day=match.group('d'),
h=match.group('h'),
m=match.group('m'),
s=match.group('s')
mon=match.group("mon"),
day=match.group("d"),
h=match.group("h"),
m=match.group("m"),
s=match.group("s"),
)
......@@ -117,11 +125,11 @@ def get_retval(body: str) -> Union[int, None]:
:param body: Body of event log entry
:return: Return value as an int
"""
r_retval = r'return value (?P<retval>[0-9]+)'
r_retval = r"return value (?P<retval>[0-9]+)"
retval = re.search(r_retval, body)
if retval:
return int(retval.group('retval'))
return int(retval.group("retval"))
else:
return
......@@ -130,8 +138,10 @@ class WorkflowMonitor:
"""
Class that monitors the events in a given workflow via the log file produced by it
"""
def __init__(self, logfile_path: str, timeout: int = 60):
def __init__(self, logfile_path: str, workflow_request_id: int, timeout: int = 60):
self.logfile_path = Path(logfile_path)
self.workflow_request_id = workflow_request_id
try:
self.log = self.read_htcondor_log(timeout)
......@@ -142,7 +152,7 @@ class WorkflowMonitor:
self.events = self.parse_log()
@log_decorator_factory('Reading HTCondor log...')
@log_decorator_factory("Reading HTCondor log...")
def read_htcondor_log(self, timeout: int) -> str:
"""
Waits for HTCondor job to terminate and reads the resulting log file
......@@ -153,14 +163,14 @@ class WorkflowMonitor:
signal.alarm(timeout)
while not os.path.exists(self.logfile_path):
logger.info('Attempting to read log file...')
logger.info("Attempting to read log file...")
time.sleep(3)
logger.info('Log file found.')
with open(self.logfile_path, 'r') as f:
logger.info("Log file found.")
with open(self.logfile_path, "r") as f:
return f.read()
@log_decorator_factory('Parsing HTCondor log...')
@log_decorator_factory("Parsing HTCondor log...")
def parse_log(self) -> List[WorkflowEvent]:
"""
Parse log for relevant details:
......@@ -168,53 +178,62 @@ class WorkflowMonitor:
:return: List of WorkflowEvent objects
"""
# Regex that matches an event entry in the HTCondor log file.
r_htc_event = r'(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) ' \
r'(?P<timestamp>[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) ' \
r'(?P<desc>[a-zA-Z0-9 ?&:=<>\-._]+)\n' \
r'(?P<body>[\t ]+[^\.]*\n)*' \
r'...'
r_htc_event = (
r"(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) "
r"(?P<timestamp>[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) "
r"(?P<desc>[a-zA-Z0-9 ?&:=<>\-._]+)\n"
r"(?P<body>[\t ]+[^\.]*\n)*"
r"..."
)
jobs = {}
events = []
if sum(1 for _ in re.finditer(r_htc_event, self.log)) == 0:
# Regex didn't return any matches
raise ValueError('HTCondor log file not well-formatted. Was the file edited?')
raise ValueError(
"HTCondor log file not well-formatted. Was the file edited?"
)
for match in re.finditer(r_htc_event, self.log):
job_id = match.group('jobnum')
if match.group('eventnum') == '000':
job_id = match.group("jobnum")
if match.group("eventnum") == "000":
# Populate jobs dict with job name
jobs[job_id] = get_job_name(match.group('body'))
jobs[job_id] = get_job_name(match.group("body"))
try:
event_type = WorkflowEventType(int(match.group('eventnum')))
event_type = WorkflowEventType(int(match.group("eventnum")))
except ValueError:
event_type = WorkflowEventType.OTHER
if event_type == WorkflowEventType.TERMINATED:
retval = get_retval(match.group('body'))
retval = get_retval(match.group("body"))
else:
retval = None
year = get_year(self.logfile_path)
events.append(WorkflowEvent(
job_name=jobs[job_id],
job_id=job_id,
event_type=event_type,
timestamp=fmt_timestamp(match.group('timestamp'), year),
log=match.group(0),
retval=retval
))
events.append(
WorkflowEvent(
workflow_request_id=self.workflow_request_id,
job_name=jobs[job_id],
condor_job_id=job_id,
event_type=event_type,
timestamp=fmt_timestamp(match.group("timestamp"), year),
log=match.group(0),
retval=retval,
)
)
return events
def __str__(self):
return f'WorkflowMonitor, monitoring {self.logfile_path} that has events {self.events}'
return f"WorkflowMonitor, monitoring {self.logfile_path} that has events {self.events}"
def __repr__(self):
return f'{self.__class__.__name__} ({self.logfile_path}, {self.events})'
return f"{self.__class__.__name__} ({self.logfile_path}, {self.events})"
_DESCRIPTION = 'Workspaces workflow monitor, version {}. Monitor execution of a workflow from ' \
'the command line.'
_DESCRIPTION = (
"Workspaces workflow monitor, version {}. Monitor execution of a workflow from "
"the command line."
)
def make_arg_parser() -> argparse.ArgumentParser:
......@@ -224,11 +243,24 @@ def make_arg_parser() -> argparse.ArgumentParser:
"""
parser = argparse.ArgumentParser(
description=_DESCRIPTION.format(VERSION),
formatter_class=argparse.RawTextHelpFormatter
formatter_class=argparse.RawTextHelpFormatter,
)
parser.add_argument(
"-P",
"--profile",
action="store",
required=False,
default="nmtest",
help="profile name to use; e.g. test, production; DEFAULT: nmtest",
)
parser.add_argument(
"log_path", action="store", help="path to the workflow's HTCondor log"
)
parser.add_argument(
"workflow_request_id",
action="store",
help="ID of the workflow request that ran the workflow",
)
parser.add_argument('-P', '--profile', action='store', required=False, default='nmtest',
help='profile name to use; e.g. test, production; DEFAULT: nmtest')
parser.add_argument('log_path', action='store', help='path to the workflow\'s HTCondor log')
return parser
......@@ -236,8 +268,8 @@ def make_arg_parser() -> argparse.ArgumentParser:
def main():
# Parse command-line args
args = make_arg_parser().parse_args()
monitor = WorkflowMonitor(args.log_path)
decorated_send = log_decorator_factory('Sending event...')(workflow_events.send)
monitor = WorkflowMonitor(args.log_path, int(args.workflow_request_id))
decorated_send = log_decorator_factory("Sending event...")(workflow_events.send)
with workflow_events:
for event in monitor.events:
decorated_send(event)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment