Skip to content
Snippets Groups Projects
Commit 19a660d4 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Refactor to use Marshmallow. Mov WorkflowEvent to workspaces schema.

parent 04b05b66
No related branches found
No related tags found
No related merge requests found
......@@ -12,9 +12,10 @@ from pathlib import Path
import pika
import pendulum
import workspaces
from pycapo import CapoConfig
from .wf_event import WorkflowEvent, EventType
from workspaces.schema import WorkflowEvent, WorkflowEventType
from ._version import ___version___ as VERSION
WORKFLOW_STATUS_EXCH = 'workspaces.workflow-service.workflow-status'
......@@ -60,7 +61,7 @@ def timeout_handler(signum: int, frame: tracemalloc.Frame) -> None:
:param frame: Stack frame the signal was sent from
"""
msg = 'Timeout elapsed when attempting to read HTCondor log file. The job either did ' \
'not complete or the path to the log was not entered correctly.'
'not complete or the path to the log was not entered correctly.'
raise TimeoutError(msg)
......@@ -188,11 +189,11 @@ class WorkflowMonitor:
jobs[job_id] = get_job_name(match.group('body'))
try:
event_type = EventType(int(match.group('eventnum')))
event_type = WorkflowEventType(int(match.group('eventnum')))
except ValueError:
event_type = EventType.OTHER
event_type = WorkflowEventType.OTHER
if event_type == EventType.TERMINATED:
if event_type == WorkflowEventType.TERMINATED:
retval = get_retval(match.group('body'))
else:
retval = None
......@@ -272,7 +273,7 @@ def send_event_data(connection: pika.BlockingConnection, event: WorkflowEvent):
return channel.basic_publish(
exchange=WORKFLOW_STATUS_EXCH,
routing_key=f'{event.job_name}.{event.job_id}.{event.type.name.lower()}',
body=event.json()
body=workspaces.json.workflow_event.dump(event)
)
......
import enum
import json
from typing import Dict
class EventType(enum.Enum):
SUBMITTED = 0
EXECUTING = 1
TERMINATED = 5
OTHER = -1
class WorkflowEvent:
def __init__(
self,
job_name: str,
job_id: str,
event_type: EventType,
timestamp: str,
log: str,
retval: int = None
):
self.job_name = job_name
self.job_id = str(job_id)
self.type = event_type
self.timestamp = timestamp
self.log = log
self.retval = retval
def json(self) -> str:
d = {
'job_name': self.job_name,
'type': str(self.type),
'timestamp': self.timestamp,
'return_value': self.retval,
'log': self.log
}
return json.dumps(d)
def __str__(self):
return f'WorkflowEvent with data {self.json()}'
def __repr__(self):
return f'<wf_monitor.WorkflowEvent {self.__dict__}>'
from marshmallow import Schema, fields
import pendulum
from marshmallow import Schema, fields, post_load
from workspaces.schema import WorkflowEventType, WorkflowEvent
class WorkflowEventSchema(Schema):
"""
Default schema for serializing WorkflowEvent.
Default schema for serializing WorkflowEvents.
"""
job_name = fields.String()
type = fields.Method("get_type", deserialize="load_type")
job_id = fields.String()
event_type = fields.Method("get_type", deserialize="load_type")
timestamp = fields.DateTime()
log = fields.String()
retval = fields.Integer()
retval = fields.Integer(allow_none=True)
# Enums apparently are not a first-class field type in Marshmallow
def get_type(self, obj: "EventType") -> str:
def get_type(self, obj: WorkflowEventType) -> int:
return obj.type.value
def load_type(self, value: str) -> "EventType":
return EventType[value]
def load_type(self, value: int) -> WorkflowEventType:
return next(et for et in WorkflowEventType if et.value == value)
@post_load
def make_event(self, data, **kwargs):
return WorkflowEvent(**data)
workflow_event = WorkflowEventSchema()
\ No newline at end of file
......@@ -2,7 +2,8 @@
Model classes used by the Workspaces system.
"""
import inspect
import enum
import json
from pathlib import Path
from typing import Dict, List
......@@ -35,6 +36,65 @@ class AbstractFile:
return cls(path.name, path.read_bytes())
class WorkflowEventType(enum.Enum):
"""
Kinds of events that a workflow can send. These four are derived from
the many kinds that HTCondor can send.
"""
SUBMITTED = 0
EXECUTING = 1
TERMINATED = 5
OTHER = -1
class WorkflowEvent:
"""
An event from the underlying workflow execution system, which is likely
to be HTCondor.
"""
def __init__(
self,
job_name: str,
job_id: str,
event_type: WorkflowEventType,
timestamp: str,
log: str,
retval: int = None):
# NOTE: when you modify the properties here, also update the schema
# in the .json package.
self.job_name = job_name
self.job_id = str(job_id)
self.type = event_type
self.timestamp = timestamp
self.log = log
self.retval = retval
def json(self) -> str:
d = {
'job_name': self.job_name,
'type': str(self.type),
'timestamp': self.timestamp,
'return_value': self.retval,
'log': self.log
}
return json.dumps(d)
def __str__(self):
return f'WorkflowEvent with data {self.json()}'
def __repr__(self):
return f'<WorkflowEvent {self.__dict__}>'
def __eq__(self, other):
return self.job_name == other.job_name and \
self.job_id == other.job_id and \
self.type == other.type and \
self.timestamp == other.timestamp and \
self.log == other.log and \
self.retval == other.retval
Base = declarative_base()
......
import pendulum
from workspaces.json import WorkflowEventSchema
from workspaces.schema import WorkflowEvent, WorkflowEventType
def test_schema_dumpload():
e = WorkflowEvent('foo', 1, WorkflowEventType.SUBMITTED, pendulum.now(), 'nothing to speak of')
schema = WorkflowEventSchema()
event_text = schema.dump(e)
r = schema.load(event_text)
assert e == r
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment