Skip to content
Snippets Groups Projects
Commit e32d7145 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Merge branch 'release/WS-0.1' of https://open-bitbucket.nrao.edu/scm/ssa/data into release/WS-0.1

parents 31d304dd e27a99ec
No related branches found
No related tags found
No related merge requests found
...@@ -46,7 +46,7 @@ class WorkflowEventChannel(ChannelDefinition[WorkflowEvent]): ...@@ -46,7 +46,7 @@ class WorkflowEventChannel(ChannelDefinition[WorkflowEvent]):
) )
def routing_key_for(self, event: WorkflowEvent) -> str: def routing_key_for(self, event: WorkflowEvent) -> str:
return f"{event.job_name}.{event.job_id}.{event.type.name.lower()}" return f"{event.job_name}.{event.workflow_request_id}.{event.type.name.lower()}"
def exchange(self) -> str: def exchange(self) -> str:
return self.EXCHANGE return self.EXCHANGE
......
...@@ -111,9 +111,16 @@ class CapabilityInfoIF(ABC): ...@@ -111,9 +111,16 @@ class CapabilityInfoIF(ABC):
def lookup_capability(self, capability_name: str) -> CapabilityIF: def lookup_capability(self, capability_name: str) -> CapabilityIF:
raise NotImplementedError raise NotImplementedError
@abstractmethod
def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF: def lookup_execution(self, execution_id: int) -> CapabilityExecutionIF:
raise NotImplementedError raise NotImplementedError
@abstractmethod
def lookup_execution_by_workflow_request_id(
self, workflow_request_id: int
) -> CapabilityExecutionIF:
raise NotImplementedError
class ProjectSettingsIF(ABC): class ProjectSettingsIF(ABC):
""" """
......
...@@ -190,11 +190,13 @@ class CapabilityService(CapabilityServiceIF): ...@@ -190,11 +190,13 @@ class CapabilityService(CapabilityServiceIF):
# No corresponding capability event # No corresponding capability event
return None return None
self.capability_info.lookup_execution_by_workflow_request_id( execution = self.capability_info.lookup_execution_by_workflow_request_id(
event.workflow_request_id event.workflow_request_id
) )
return CapabilityEvent(event_type, execution_id) return CapabilityEvent(
event_type, execution.capability_request_id, execution.id
)
def listen_for_events(self): def listen_for_events(self):
""" """
...@@ -286,7 +288,9 @@ class CapabilityInfo(CapabilityInfoIF): ...@@ -286,7 +288,9 @@ class CapabilityInfo(CapabilityInfoIF):
self.session.add(entity) self.session.add(entity)
self.session.flush() self.session.flush()
def lookup_execution_by_workflow_request_id(self, workflow_request_id: int): def lookup_execution_by_workflow_request_id(
self, workflow_request_id: int
) -> CapabilityExecution:
return ( return (
self.session.query(CapabilityExecution) self.session.query(CapabilityExecution)
.filter_by(workflow_request_id=workflow_request_id) .filter_by(workflow_request_id=workflow_request_id)
...@@ -473,10 +477,14 @@ class WorkflowService(WorkflowServiceIF): ...@@ -473,10 +477,14 @@ class WorkflowService(WorkflowServiceIF):
# execution evolves. So we have to set up listening at some point # execution evolves. So we have to set up listening at some point
# in this class # in this class
def __init__(self, info: "WorkflowInfo"): def __init__(self, info: "WorkflowInfo"):
# 1. Start listening for events from the wf_monitor stream
# self.channel = workflow_events.listen(self.on_workflow_event)
self.info = info self.info = info
# Start listening for events from the wf_monitor stream
self.listener = threading.Thread(
target=lambda: workflow_events.listen(self.on_workflow_event)
)
self.listener.start()
def execute(self, request: WorkflowRequest, files: List[Path] = None): def execute(self, request: WorkflowRequest, files: List[Path] = None):
""" """
Execute a workflow per the supplied parameters. Execute a workflow per the supplied parameters.
...@@ -497,16 +505,11 @@ class WorkflowService(WorkflowServiceIF): ...@@ -497,16 +505,11 @@ class WorkflowService(WorkflowServiceIF):
# 5. execute condor and retrieve log file # 5. execute condor and retrieve log file
log_file = self._execute_prepared(temp_folder) log_file = self._execute_prepared(temp_folder)
# 6. start reading log file # 6. start reading the log file (background it)
monitor = WorkflowMonitor( subprocess.Popen(
logfile_path=(log_file), workflow_request_id=request.workflow_request_id ["wf_monitor", log_file, str(request.workflow_request_id)],
cwd=str(temp_folder.absolute()),
) )
# parse events from log
events = monitor.events
with workflow_events:
for e in events:
# send amqp event and update database
self.on_workflow_event(e, request, temp_folder)
@staticmethod @staticmethod
def _prepare_files_for_condor(files: List[WorkflowRequestFile]) -> Path: def _prepare_files_for_condor(files: List[WorkflowRequestFile]) -> Path:
...@@ -568,21 +571,8 @@ class WorkflowService(WorkflowServiceIF): ...@@ -568,21 +571,8 @@ class WorkflowService(WorkflowServiceIF):
# return the logfile # return the logfile
return logfile return logfile
@staticmethod def on_workflow_event(self, event: WorkflowEvent):
def on_workflow_event( request = self.info.lookup_workflow_request(event.workflow_request_id)
event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path
):
# 1. log that we received this event, somehow
# 2. update the WorkflowRequest record with the state we got
# 3. do per-event-type stuff, such as level change events, database
# updates, and logging
decorated_workflow_send = log_decorator_factory("Sending Workflow Event...")(
workflow_events.send
)
# 1. send amqp event to workflow channel
decorated_workflow_send(event)
status: str status: str
# 2. update request record with new status # 2. update request record with new status
...@@ -597,9 +587,9 @@ class WorkflowService(WorkflowServiceIF): ...@@ -597,9 +587,9 @@ class WorkflowService(WorkflowServiceIF):
status = WorkflowRequestState.Running.name status = WorkflowRequestState.Running.name
print( print(
f"Updating state on workflow request {request_record.workflow_request_id} to {status}..." f"Updating state on workflow request {request.workflow_request_id} to {status}..."
) )
request_record.update_status(status) request.update_status(status)
class WorkflowInfo(WorkflowInfoIF): class WorkflowInfo(WorkflowInfoIF):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment