Skip to content
Snippets Groups Projects
Commit 84f7e9d5 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

initial work for workflow execution

parent c12aed6c
No related branches found
No related tags found
No related merge requests found
......@@ -14,4 +14,7 @@ CL args:
- Finish consuming at some point?
"""
import channels.amqp_helpers
class EventCatcher:
......@@ -5,8 +5,8 @@ import pika
from marshmallow import Schema
from pika.adapters.blocking_connection import BlockingChannel
from pycapo import CapoConfig
from workspaces.json import WorkflowEventSchema
from workspaces.schema import WorkflowEvent
from workspaces.json import WorkflowEventSchema, CapabilityEventSchema
from workspaces.schema import WorkflowEvent, CapabilityEvent
T = TypeVar('T', contravariant=True)
# Global connection; remains open until manually closed (I imagine by some sort of cleanup process)
......@@ -51,6 +51,26 @@ class WorkflowEventChannel(ChannelDefinition[WorkflowEvent]):
def exchange(self) -> str:
return self.EXCHANGE
class CapabilityEventChannel(ChannelDefinition[CapabilityEvent]):
# TODO: find out real value
C_EXCHANGE = 'workspaces.capability-service.capability-status'
def schema(self) -> Schema:
return CapabilityEventSchema()
def declarations(self, chan: BlockingChannel):
chan.exchange_declare(self.C_EXCHANGE,
exchange_type='topic',
durable=True,
auto_delete=False)
def routing_key_for(self, event: CapabilityEvent) -> str:
# return f'{event.job_name}.{event.job_id}.{event.type.name.lower()}'
raise NotImplementedError
def exchange(self) -> str:
return self.C_EXCHANGE
ChannelDef = TypeVar('ChannelDef', bound=ChannelDefinition, covariant=True)
......@@ -153,3 +173,4 @@ class Channel(Generic[ChannelDef]):
# Predefined event channels for ease of use
workflow_events: Channel[WorkflowEvent] = Channel(WorkflowEventChannel())
capability_events: Channel[CapabilityEvent] = Channel(CapabilityEventChannel())
import pendulum
from marshmallow import Schema, fields, post_load
from workspaces.schema import WorkflowEventType, WorkflowEvent
from workspaces.schema import WorkflowEventType, WorkflowEvent, CapabilityEvent
class WorkflowEventSchema(Schema):
......@@ -26,4 +26,21 @@ class WorkflowEventSchema(Schema):
return WorkflowEvent(**data)
workflow_event = WorkflowEventSchema()
\ No newline at end of file
class CapabilityEventSchema(Schema):
"""
Default schema for serializing CapabilityEvents.
"""
def get_type(self) -> str:
raise NotImplementedError
def load_type(self, name:str):
raise NotImplementedError
@post_load
def make_event(self, data, **kwargs):
return CapabilityEvent(**data)
workflow_event = WorkflowEventSchema()
capability_event = CapabilityEventSchema()
......@@ -36,6 +36,20 @@ class AbstractFile:
return cls(path.name, path.read_bytes())
class AbstractRequest:
"""
Abstract File is an abstract concept that is used to create requests
"""
workflow_name: str
argument: json
def __init__(self, workflow_name: str, argument: json):
self.workflow_name, self.argument = workflow_name, argument
def __json__(self, request):
return {'workflow_name': self.workflow_name, 'argument': self.argument}
class WorkflowEventType(enum.Enum):
"""
Kinds of events that a workflow can send. These four are derived from
......@@ -52,6 +66,7 @@ class WorkflowEvent:
An event from the underlying workflow execution system, which is likely
to be HTCondor.
"""
def __init__(
self,
job_name: str,
......@@ -94,6 +109,25 @@ class WorkflowEvent:
self.retval == other.retval
class CapabilityEvent:
"""
An event from the underlying workflow execution system.
"""
def __init__(self):
raise NotImplementedError
def json(self) -> str:
raise NotImplementedError
def __str__(self):
return f'<CapabilityEvent with data{self.json()}>'
def __repr__(self):
return f'<CapabilityEvent {self.__dict__}>'
def __eq__(self, other):
raise NotImplementedError
Base = declarative_base()
......@@ -123,7 +157,7 @@ class Workflow(Base):
"""
# for file in files:
# render file
# render file
raise NotImplementedError
......@@ -165,6 +199,26 @@ class WorkflowRequest(Base):
argument = sa.Column('argument', sa.JSON)
files = relationship('WorkflowRequestFile', backref='request')
@property
def request(self) -> AbstractRequest:
return AbstractRequest(self.workflow_name, self.argument)
@request.setter
def create_request(self, request: AbstractRequest):
self.workflow_name, self.argument = request.workflow_name, request.argument
def update_status(self, status: str):
self.status = status
def set_start_time(self, time: str):
self.start_time = time
def set_end_time(self, time:str):
self.end_time = time
def __repr__(self):
return f'<WorkflowRequest workflow_request_id= {self.workflow_request_id}>'
class WorkflowRequestFile(Base):
"""
......
......@@ -5,7 +5,8 @@ import stat
import subprocess
from tempfile import mkdtemp
from wf_monitor.monitor import WorkflowMonitor
from channels.amqp_helpers import workflow_events
from wf_monitor.monitor import WorkflowMonitor, WORKFLOW_STATUS_EXCH, log_decorator_factory
from .interfaces import *
from .schema import *
......@@ -41,10 +42,10 @@ class WorkflowService(WorkflowServiceIF):
"""
# 1. look up workflow, returns workflow
info = WorkflowInfo.lookup_workflow_definition(self, workflow_name)
info = WorkflowInfo(self).lookup_workflow_definition(workflow_name)
# 2. create and save request, return request id
# record = WorkflowRequest.
record = WorkflowRequest().create_request(workflow_name, argument)
# 3. render templates to files, returns list of rendered files
contents = info.render_templates(argument, files)
......@@ -56,7 +57,14 @@ class WorkflowService(WorkflowServiceIF):
log_file = self._execute_prepared(temp_folder)
# 6. start reading log file
return WorkflowMonitor(str(log_file))
monitor = WorkflowMonitor(str(log_file))
# parse events from log
events = monitor.events
with workflow_events:
for e in events:
# send amqp event and update database
self.on_workflow_event(e)
@staticmethod
......@@ -113,11 +121,15 @@ class WorkflowService(WorkflowServiceIF):
# self.channel = channels.workflow_events.listen(self.on_workflow_event)
raise NotImplementedError
def on_workflow_event(self, event: WorkflowEvent):
def on_workflow_event(self, event: WorkflowEvent, request_record: WorkflowRequest):
# 1. log that we received this event, somehow
# 2. update the WorkflowRequest record with the state we got
# 3. do per-event-type stuff, such as level change events, database
# updates, and logging
decorated_send = log_decorator_factory('Sending event...')(workflow_events.send)
decorated_send(event, WORKFLOW_STATUS_EXCH)
if event.type == WorkflowEventType.SUBMITTED:
# this would be a good place to emit a level-changed event
# for the capability system, such as:
......@@ -125,15 +137,21 @@ class WorkflowService(WorkflowServiceIF):
# channels.capability_events.send(...)
#
# however, for now we can pass
request_record.update_status(event.type.SUBMITTED.name)
pass
elif event.type == WorkflowEventType.EXECUTING:
# another good place to emit a level-changed event
request_record.update_status(event.type.EXECUTING.name)
pass
elif event.type == WorkflowEventType.TERMINATED:
# another level-change event opportunity
# also, we must now locate the temporary directory and
# remove it
request_record.update_status(event.type.TERMINATED.name)
pass
else:
request_record.update_status(event.type.OTHER.name)
# there is an event type of OTHER which we can basically
# ignore for now
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment