Skip to content
Snippets Groups Projects
Commit 5de9b3ac authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

workflow messaging doesn't quite work right now, but it's getting there

parent d4a59732
No related branches found
No related tags found
No related merge requests found
......@@ -31,6 +31,9 @@ def upgrade():
comment='the unique id of the request. auto-generated'),
sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name')),
sa.Column('argument', sa.JSON, comment='the argument(s) used for the workflow in this request.'),
sa.Column('state', sa.String, comment='the current state of the workflow in this request.'),
sa.Column('execution_id', sa.Integer,
comment='the id of the parent execution awaiting the workflow'),
comment='A listing of requests for workflow execution.')
op.create_table('workflow_request_files',
......@@ -58,4 +61,4 @@ def downgrade():
op.drop_table('workflow_request_files')
op.drop_table('workflow_requests')
op.drop_table('workflow_templates')
op.drop_table('workflows')
\ No newline at end of file
op.drop_table('workflows')
......@@ -371,7 +371,7 @@ class Workflow(Base, WorkflowIF):
if filename == "null.condor":
contents = (
"executable = null\n"
f"arguments = {str(argument)[2:4]}\n" # TODO: fix templates
f"arguments = {str(argument)[2:len(argument)-3]}\n" # TODO: fix templates
"error = null.err\n"
"log = condor.log\n\n\n"
"queue".encode()
......@@ -429,6 +429,8 @@ class WorkflowRequest(Base, WorkflowRequestIF):
"workflow_name", sa.String, sa.ForeignKey("workflows.workflow_name")
)
argument = sa.Column("argument", sa.JSON)
state = sa.Column("state", sa.String)
execution_id = sa.Column("execution_id", sa.Integer)
files = relationship("WorkflowRequestFile", backref="request")
@property
......@@ -441,7 +443,7 @@ class WorkflowRequest(Base, WorkflowRequestIF):
# TODO: create following fields in table
def update_status(self, status: str):
self.status = status
self.state = status
def set_start_time(self, time: str):
self.start_time = time
......@@ -454,6 +456,8 @@ class WorkflowRequest(Base, WorkflowRequestIF):
workflow_request_id=self.workflow_request_id,
workflow_name=self.workflow_name,
argument=self.argument,
state=self.state,
execution_id=self.execution_id,
)
def __repr__(self):
......
......@@ -17,10 +17,8 @@ from channels.amqp_helpers import (
)
from sqlalchemy.orm import Session
from wf_monitor.monitor import (
log_decorator_factory,
log_decorator_factory, WorkflowMonitor,
)
from workflow.event_translator import EventTranslator
from .capability_interfaces import (
CapabilityServiceIF,
CapabilityQueueIF,
......@@ -409,7 +407,6 @@ class WorkflowService(WorkflowServiceIF):
# 1. Start listening for events from the wf_monitor stream
# self.channel = workflow_events.listen(self.on_workflow_event)
self.info = info
self.event_catcher = EventTranslator(self)
def execute(self, request: WorkflowRequest, files: List[Path]):
"""
......@@ -435,13 +432,13 @@ class WorkflowService(WorkflowServiceIF):
log_file = self._execute_prepared(temp_folder)
# 6. start reading log file
# monitor = WorkflowMonitor(str(log_file))
# # parse events from log
# events = monitor.events
# with workflow_events:
# for e in events:
# # send amqp event and update database
# self.on_workflow_event(e, request, temp_folder)
monitor = WorkflowMonitor(str(log_file))
# parse events from log
events = monitor.events
with workflow_events:
for e in events:
# send amqp event and update database
self.on_workflow_event(e, request, temp_folder)
@staticmethod
def _prepare_files_for_condor(files: List[WorkflowRequestFile]) -> Path:
......@@ -477,6 +474,7 @@ class WorkflowService(WorkflowServiceIF):
print(f"executing on folder {folder}")
# some file in here should end in .dag; that file is our dagman input
# TODO: not currently using DAG files for vulture
dagman = list(folder.glob("*.dag"))[0]
print(f"dagman file {dagman} exists.")
......@@ -486,7 +484,7 @@ class WorkflowService(WorkflowServiceIF):
# ensure the log file exists
logfile = folder / "condor.log"
print("condor.log exists.")
print(f"log file {logfile} exists.")
logfile.touch()
# submit
......@@ -501,54 +499,41 @@ class WorkflowService(WorkflowServiceIF):
return logfile
def on_workflow_event(
self, event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path
self, event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path
):
# 1. log that we received this event, somehow
# 2. update the WorkflowRequest record with the state we got
# 3. do per-event-type stuff, such as level change events, database
# updates, and logging
decorated_workflow_send = log_decorator_factory("Sending Workflow Event...")(
workflow_events.send
)
decorated_capability_send = log_decorator_factory(
"Sending Capability Event..."
)(capability_events.send)
decorated_workflow_send = log_decorator_factory("Sending Workflow Event...")(workflow_events.send)
decorated_capability_send = log_decorator_factory("Sending Capability Event...")(capability_events.send)
# 1. send amqp event to workflow channel
decorated_workflow_send(event)
# 2. update request record with new status
print(
f"Updating state on request {request_record.workflow_request_id} to {event.type.name}..."
)
print(f"Updating state on workflow request {request_record.workflow_request_id} to {event.type.name}...")
request_record.update_status(event.type.name)
capability_event = self.capability_event_from_workflow(event)
# decorated_capability_send(capability_event)
decorated_capability_send(capability_event)
# # 3. do per-event-type stuff
# if event.type == WorkflowEventType.SUBMITTED:
# # this would be a good place to emit a level-changed event
# # for the capability system
# # TODO: Find out capability states and replace strings
# CapabilityRequest.update_status("WAITING")
# elif event.type == WorkflowEventType.EXECUTING:
# # another good place to emit a level-changed event
# CapabilityRequest.update_status("STARTED")
# elif event.type == WorkflowEventType.TERMINATED:
# # another level-change event opportunity
# CapabilityRequest.update_status("COMPLETE")
#
# # remove temp directory and rendered templates
# # TODO: decide on lifetime of temp directory
# # shutil.rmtree(tmp_folder, ignore_errors=False, onerror=None)
# else:
# # there is an event type of OTHER which we can basically
# # ignore for now
# capability_change = catcher.update_capability_from_workflow(event)
# decorated_capability_send(capability_change)
# CapabilityRequest.update_status("OTHER")
@staticmethod
def capability_event_from_workflow(event: WorkflowEvent) -> Optional[CapabilityEvent]:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment