From 19a660d4ab20cf5597f414835c1a949447245c58 Mon Sep 17 00:00:00 2001 From: Daniel K Lyons <dlyons@nrao.edu> Date: Tue, 8 Sep 2020 11:44:21 -0600 Subject: [PATCH] Refactor to use Marshmallow. Mov WorkflowEvent to workspaces schema. --- .../wf_monitor/src/wf_monitor/monitor.py | 13 ++-- .../wf_monitor/src/wf_monitor/wf_event.py | 43 ------------- shared/workspaces/src/workspaces/json.py | 24 ++++--- shared/workspaces/src/workspaces/schema.py | 62 ++++++++++++++++++- shared/workspaces/test/__init__.py | 0 shared/workspaces/test/json_tests.py | 11 ++++ 6 files changed, 96 insertions(+), 57 deletions(-) delete mode 100644 apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py create mode 100644 shared/workspaces/test/__init__.py create mode 100644 shared/workspaces/test/json_tests.py diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py index d7a9cfd94..bd3a7073a 100644 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py +++ b/apps/cli/utilities/wf_monitor/src/wf_monitor/monitor.py @@ -12,9 +12,10 @@ from pathlib import Path import pika import pendulum +import workspaces from pycapo import CapoConfig -from .wf_event import WorkflowEvent, EventType +from workspaces.schema import WorkflowEvent, WorkflowEventType from ._version import ___version___ as VERSION WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status' @@ -60,7 +61,7 @@ def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None: :param frame: Stack frame the signal was sent from """ msg = 'Timeout elapsed when attempting to read HTCondor log file. The job either did ' \ - 'not complete or the path to the log was not entered correctly.' + 'not complete or the path to the log was not entered correctly.' raise TimeoutError(msg) @@ -188,11 +189,11 @@ class WorkflowMonitor: jobs[job_id] = get_job_name(match.group('body')) try: - event_type = EventType(int(match.group('eventnum'))) + event_type = WorkflowEventType(int(match.group('eventnum'))) except ValueError: - event_type = EventType.OTHER + event_type = WorkflowEventType.OTHER - if event_type == EventType.TERMINATED: + if event_type == WorkflowEventType.TERMINATED: retval = get_retval(match.group('body')) else: retval = None @@ -272,7 +273,7 @@ def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent): return channel.basic_publish( exchange=WORKFLOW_STATUS_EXCH, routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}', - body=event.json() + body=workspaces.json.workflow_event.dump(event) ) diff --git a/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py b/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py deleted file mode 100644 index cb255f28d..000000000 --- a/apps/cli/utilities/wf_monitor/src/wf_monitor/wf_event.py +++ /dev/null @@ -1,43 +0,0 @@ -import enum -import json -from typing import Dict - - -class EventType(enum.Enum): - SUBMITTED = 0 - EXECUTING = 1 - TERMINATED = 5 - OTHER = -1 - -class WorkflowEvent: - def __init__( - self, - job_name: str, - job_id: str, - event_type: EventType, - timestamp: str, - log: str, - retval: int = None - ): - self.job_name = job_name - self.job_id = str(job_id) - self.type = event_type - self.timestamp = timestamp - self.log = log - self.retval = retval - - def json(self) -> str: - d = { - 'job_name': self.job_name, - 'type': str(self.type), - 'timestamp': self.timestamp, - 'return_value': self.retval, - 'log': self.log - } - return json.dumps(d) - - def __str__(self): - return f'WorkflowEvent with data {self.json()}' - - def __repr__(self): - return f'<wf_monitor.WorkflowEvent {self.__dict__}>' diff --git a/shared/workspaces/src/workspaces/json.py b/shared/workspaces/src/workspaces/json.py index 3da6d556d..cdb72af2d 100644 --- a/shared/workspaces/src/workspaces/json.py +++ b/shared/workspaces/src/workspaces/json.py @@ -1,19 +1,29 @@ -from marshmallow import Schema, fields +import pendulum +from marshmallow import Schema, fields, post_load +from workspaces.schema import WorkflowEventType, WorkflowEvent class WorkflowEventSchema(Schema): """ - Default schema for serializing WorkflowEvent. + Default schema for serializing WorkflowEvents. """ job_name = fields.String() - type = fields.Method("get_type", deserialize="load_type") + job_id = fields.String() + event_type = fields.Method("get_type", deserialize="load_type") timestamp = fields.DateTime() log = fields.String() - retval = fields.Integer() + retval = fields.Integer(allow_none=True) # Enums apparently are not a first-class field type in Marshmallow - def get_type(self, obj: "EventType") -> str: + def get_type(self, obj: WorkflowEventType) -> int: return obj.type.value - def load_type(self, value: str) -> "EventType": - return EventType[value] + def load_type(self, value: int) -> WorkflowEventType: + return next(et for et in WorkflowEventType if et.value == value) + + @post_load + def make_event(self, data, **kwargs): + return WorkflowEvent(**data) + + +workflow_event = WorkflowEventSchema() \ No newline at end of file diff --git a/shared/workspaces/src/workspaces/schema.py b/shared/workspaces/src/workspaces/schema.py index b018f6e9c..ba35f242e 100644 --- a/shared/workspaces/src/workspaces/schema.py +++ b/shared/workspaces/src/workspaces/schema.py @@ -2,7 +2,8 @@ Model classes used by the Workspaces system. """ -import inspect +import enum +import json from pathlib import Path from typing import Dict, List @@ -35,6 +36,65 @@ class AbstractFile: return cls(path.name, path.read_bytes()) +class WorkflowEventType(enum.Enum): + """ + Kinds of events that a workflow can send. These four are derived from + the many kinds that HTCondor can send. + """ + SUBMITTED = 0 + EXECUTING = 1 + TERMINATED = 5 + OTHER = -1 + + +class WorkflowEvent: + """ + An event from the underlying workflow execution system, which is likely + to be HTCondor. + """ + def __init__( + self, + job_name: str, + job_id: str, + event_type: WorkflowEventType, + timestamp: str, + log: str, + retval: int = None): + # NOTE: when you modify the properties here, also update the schema + # in the .json package. + self.job_name = job_name + self.job_id = str(job_id) + self.type = event_type + self.timestamp = timestamp + self.log = log + self.retval = retval + + def json(self) -> str: + d = { + 'job_name': self.job_name, + 'type': str(self.type), + 'timestamp': self.timestamp, + 'return_value': self.retval, + 'log': self.log + } + return json.dumps(d) + + def __str__(self): + return f'WorkflowEvent with data {self.json()}' + + def __repr__(self): + return f'<WorkflowEvent {self.__dict__}>' + + def __eq__(self, other): + return self.job_name == other.job_name and \ + self.job_id == other.job_id and \ + self.type == other.type and \ + self.timestamp == other.timestamp and \ + self.log == other.log and \ + self.retval == other.retval + + + Base = declarative_base() diff --git a/shared/workspaces/test/__init__.py b/shared/workspaces/test/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/shared/workspaces/test/json_tests.py b/shared/workspaces/test/json_tests.py new file mode 100644 index 000000000..5bf82e5c9 --- /dev/null +++ b/shared/workspaces/test/json_tests.py @@ -0,0 +1,11 @@ +import pendulum +from workspaces.json import WorkflowEventSchema +from workspaces.schema import WorkflowEvent, WorkflowEventType + + +def test_schema_dumpload(): + e = WorkflowEvent('foo', 1, WorkflowEventType.SUBMITTED, pendulum.now(), 'nothing to speak of') + schema = WorkflowEventSchema() + event_text = schema.dump(e) + r = schema.load(event_text) + assert e == r -- GitLab