Skip to content
Snippets Groups Projects
Commit 5ac100b1 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Merge issues

parent fbf29ab9
No related branches found
No related tags found
No related merge requests found
......@@ -13,31 +13,60 @@ from sqlalchemy.orm import Session
from workflow.event_catcher import EventCatcher
from workspaces.capability_interfaces import CapabilityIF
from .capability_interfaces import CapabilityServiceIF, CapabilityQueueIF, CapabilityInfoIF, \
CapabilityEngineIF, CapabilityName, ParameterIF
from .capability_interfaces import (
CapabilityServiceIF,
CapabilityQueueIF,
CapabilityInfoIF,
CapabilityEngineIF,
CapabilityName,
ParameterIF,
)
from .helpers import CapabilitySequence, ExecutionPriority, RequestState, ExecutionState
from .product_interfaces import FutureProductIF
from .workflow_interfaces import WorkflowServiceIF, WorkflowInfoIF
from .schema import Workflow, WorkflowEvent, WorkflowEventType, CapabilityRequest, Capability, \
CapabilityExecution, get_engine, get_session_factory, WorkflowRequest, AbstractFile
from channels.amqp_helpers import workflow_events,capability_events, CAPABILITY_STATUS_EXCH
from wf_monitor.monitor import WorkflowMonitor, WORKFLOW_STATUS_EXCH, log_decorator_factory
from .schema import (
Workflow,
WorkflowEvent,
WorkflowEventType,
CapabilityRequest,
Capability,
CapabilityExecution,
get_engine,
get_session_factory,
WorkflowRequest,
AbstractFile,
)
from channels.amqp_helpers import (
workflow_events,
capability_events,
CAPABILITY_STATUS_EXCH,
)
from wf_monitor.monitor import (
WorkflowMonitor,
WORKFLOW_STATUS_EXCH,
log_decorator_factory,
)
class CapabilityService(CapabilityServiceIF):
"""
The capability service: clients access this to request capability runs
"""
def __init__(self, info: CapabilityInfoIF):
self.execution_pool = []
self.queues = {}
self.capability_info = info
def create_request(self,
capability_name: str,
parameters: List[ParameterIF]=None,
products: List[FutureProductIF]=None) -> "CapabilityRequestIF":
self.capability_info.create_capability_request(capability_name, parameters, products)
def create_request(
self,
capability_name: str,
parameters: List[ParameterIF] = None,
products: List[FutureProductIF] = None,
) -> CapabilityRequest:
self.capability_info.create_capability_request(
capability_name, parameters, products
)
def run_capability(self, request: CapabilityRequest) -> CapabilityExecution:
"""
......@@ -50,9 +79,9 @@ class CapabilityService(CapabilityServiceIF):
return execution_record
def enqueue_execution(
self,
execution_record: CapabilityExecution,
priority: int = ExecutionPriority.Default.value
self,
execution_record: CapabilityExecution,
priority: int = ExecutionPriority.Default.value,
):
"""
Move execution record that is ready to execute a workflow into the appropriate capability
......@@ -76,6 +105,7 @@ class CapabilityEngine(CapabilityEngineIF):
"""
Executes a prepare and run workflow step of a capability
"""
def __init__(self, execution: CapabilityExecution):
self.execution = execution
......@@ -100,10 +130,7 @@ class CapabilityInfo(CapabilityInfoIF):
return self.session.query(Capability).filter_by(name=capability_name).first()
def create_capability(
self,
name: CapabilityName,
steps: CapabilitySequence,
max_jobs: int
self, name: CapabilityName, steps: CapabilitySequence, max_jobs: int
) -> int:
"""
Create new capability and save it in the database
......@@ -116,11 +143,11 @@ class CapabilityInfo(CapabilityInfoIF):
return self.save_entity(capability)
def create_capability_request(
self,
capability_name: str,
parameters: List[ParameterIF] = None,
future_products: List[FutureProductIF] = None,
versions: List[str] = None
self,
capability_name: str,
parameters: List[ParameterIF] = None,
future_products: List[FutureProductIF] = None,
versions: List[str] = None,
) -> int:
"""
Create new capability request and save it in the database
......@@ -145,14 +172,18 @@ class CapabilityInfo(CapabilityInfoIF):
:return: Integer identifier for the record
"""
record = CapabilityExecution(
state=ExecutionState.Ready.name, capability_request=request_id, current_step=0
state=ExecutionState.Ready.name,
capability_request=request_id,
current_step=0,
)
return self.save_entity(record)
def lookup_entity(
self,
entity_id: int,
entity_schema: Union[Type[Capability], Type[CapabilityRequest], Type[CapabilityExecution]]
self,
entity_id: int,
entity_schema: Union[
Type[Capability], Type[CapabilityRequest], Type[CapabilityExecution]
],
) -> Optional[Union[Capability, CapabilityRequest, CapabilityExecution]]:
"""
Look up entity in database and return object representation of it if found
......@@ -162,7 +193,9 @@ class CapabilityInfo(CapabilityInfoIF):
"""
return self.session.query(entity_schema).filter(entity_schema.id == entity_id)
def save_entity(self, entity: Union[Capability, CapabilityRequest, CapabilityExecution]) -> int:
def save_entity(
self, entity: Union[Capability, CapabilityRequest, CapabilityExecution]
) -> int:
"""
Save a given entity and return an integer identifier for it
:param entity: the entity to save
......@@ -227,7 +260,6 @@ class WorkflowService(WorkflowServiceIF):
# send amqp event and update database
self.on_workflow_event(e, record, temp_folder)
@staticmethod
def _prepare_files_for_condor(files: List[AbstractFile]) -> Path:
"""
......@@ -244,7 +276,7 @@ class WorkflowService(WorkflowServiceIF):
(temp_folder / file.filename).write_bytes(file.content)
# 3. make any scripts in there executable
for file in temp_folder.glob('*.sh'):
for file in temp_folder.glob("*.sh"):
file.chmod(file.stat().st_mode | stat.S_IEXEC)
# finished, return folder
......@@ -258,17 +290,17 @@ class WorkflowService(WorkflowServiceIF):
:param folder: the path to the folder to execute
:return: the path to the log file
"""
print(f'executing on folder {folder}')
print(f"executing on folder {folder}")
# some file in here should end in .dag; that file is our dagman input
dagman = list(folder.glob('*.dag'))[0]
dagman = list(folder.glob("*.dag"))[0]
# ensure the log file exists
logfile = folder / 'condor.log'
logfile = folder / "condor.log"
logfile.touch()
# submit
subprocess.run(['condor_submit_dag', str(dagman)], cwd=str(folder.absolute()))
subprocess.run(["condor_submit_dag", str(dagman)], cwd=str(folder.absolute()))
# return the logfile
return logfile
......@@ -282,7 +314,9 @@ class WorkflowService(WorkflowServiceIF):
# self.channel = channels.workflow_events.listen(self.on_workflow_event)
raise NotImplementedError
def on_workflow_event(self, event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path):
def on_workflow_event(
self, event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path
):
# 1. log that we received this event, somehow
# 2. update the WorkflowRequest record with the state we got
# 3. do per-event-type stuff, such as level change events, database
......@@ -290,14 +324,20 @@ class WorkflowService(WorkflowServiceIF):
catcher = EventCatcher()
decorated_workflow_send = log_decorator_factory('Sending Workflow Event...')(workflow_events.send)
decorated_capability_send = log_decorator_factory('Sending Capability Event...')(capability_events.send)
decorated_workflow_send = log_decorator_factory("Sending Workflow Event...")(
workflow_events.send
)
decorated_capability_send = log_decorator_factory(
"Sending Capability Event..."
)(capability_events.send)
# 1. send amqp event to workflow channel
decorated_workflow_send(event, WORKFLOW_STATUS_EXCH)
# 2. update request record with new status
print(f'Updating state on request {request_record.workflow_request_id} to {event.type.name}...')
print(
f"Updating state on request {request_record.workflow_request_id} to {event.type.name}..."
)
request_record.update_status(event.type.name)
# 3. do per-event-type stuff
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment