diff --git a/shared/workspaces/src/workspaces/services.py b/shared/workspaces/src/workspaces/services.py index ba36214f1662e63ea3de1232a030e1b850c2a9e8..a67799a3e94d61edbf0ecc6632647a87f8e90fad 100644 --- a/shared/workspaces/src/workspaces/services.py +++ b/shared/workspaces/src/workspaces/services.py @@ -13,31 +13,60 @@ from sqlalchemy.orm import Session from workflow.event_catcher import EventCatcher from workspaces.capability_interfaces import CapabilityIF -from .capability_interfaces import CapabilityServiceIF, CapabilityQueueIF, CapabilityInfoIF, \ - CapabilityEngineIF, CapabilityName, ParameterIF +from .capability_interfaces import ( + CapabilityServiceIF, + CapabilityQueueIF, + CapabilityInfoIF, + CapabilityEngineIF, + CapabilityName, + ParameterIF, +) from .helpers import CapabilitySequence, ExecutionPriority, RequestState, ExecutionState from .product_interfaces import FutureProductIF from .workflow_interfaces import WorkflowServiceIF, WorkflowInfoIF -from .schema import Workflow, WorkflowEvent, WorkflowEventType, CapabilityRequest, Capability, \ - CapabilityExecution, get_engine, get_session_factory, WorkflowRequest, AbstractFile -from channels.amqp_helpers import workflow_events,capability_events, CAPABILITY_STATUS_EXCH -from wf_monitor.monitor import WorkflowMonitor, WORKFLOW_STATUS_EXCH, log_decorator_factory +from .schema import ( + Workflow, + WorkflowEvent, + WorkflowEventType, + CapabilityRequest, + Capability, + CapabilityExecution, + get_engine, + get_session_factory, + WorkflowRequest, + AbstractFile, +) +from channels.amqp_helpers import ( + workflow_events, + capability_events, + CAPABILITY_STATUS_EXCH, +) +from wf_monitor.monitor import ( + WorkflowMonitor, + WORKFLOW_STATUS_EXCH, + log_decorator_factory, +) class CapabilityService(CapabilityServiceIF): """ The capability service: clients access this to request capability runs """ + def __init__(self, info: CapabilityInfoIF): self.execution_pool = [] self.queues = {} self.capability_info = info - def create_request(self, - capability_name: str, - parameters: List[ParameterIF]=None, - products: List[FutureProductIF]=None) -> "CapabilityRequestIF": - self.capability_info.create_capability_request(capability_name, parameters, products) + def create_request( + self, + capability_name: str, + parameters: List[ParameterIF] = None, + products: List[FutureProductIF] = None, + ) -> CapabilityRequest: + self.capability_info.create_capability_request( + capability_name, parameters, products + ) def run_capability(self, request: CapabilityRequest) -> CapabilityExecution: """ @@ -50,9 +79,9 @@ class CapabilityService(CapabilityServiceIF): return execution_record def enqueue_execution( - self, - execution_record: CapabilityExecution, - priority: int = ExecutionPriority.Default.value + self, + execution_record: CapabilityExecution, + priority: int = ExecutionPriority.Default.value, ): """ Move execution record that is ready to execute a workflow into the appropriate capability @@ -76,6 +105,7 @@ class CapabilityEngine(CapabilityEngineIF): """ Executes a prepare and run workflow step of a capability """ + def __init__(self, execution: CapabilityExecution): self.execution = execution @@ -100,10 +130,7 @@ class CapabilityInfo(CapabilityInfoIF): return self.session.query(Capability).filter_by(name=capability_name).first() def create_capability( - self, - name: CapabilityName, - steps: CapabilitySequence, - max_jobs: int + self, name: CapabilityName, steps: CapabilitySequence, max_jobs: int ) -> int: """ Create new capability and save it in the database @@ -116,11 +143,11 @@ class CapabilityInfo(CapabilityInfoIF): return self.save_entity(capability) def create_capability_request( - self, - capability_name: str, - parameters: List[ParameterIF] = None, - future_products: List[FutureProductIF] = None, - versions: List[str] = None + self, + capability_name: str, + parameters: List[ParameterIF] = None, + future_products: List[FutureProductIF] = None, + versions: List[str] = None, ) -> int: """ Create new capability request and save it in the database @@ -145,14 +172,18 @@ class CapabilityInfo(CapabilityInfoIF): :return: Integer identifier for the record """ record = CapabilityExecution( - state=ExecutionState.Ready.name, capability_request=request_id, current_step=0 + state=ExecutionState.Ready.name, + capability_request=request_id, + current_step=0, ) return self.save_entity(record) def lookup_entity( - self, - entity_id: int, - entity_schema: Union[Type[Capability], Type[CapabilityRequest], Type[CapabilityExecution]] + self, + entity_id: int, + entity_schema: Union[ + Type[Capability], Type[CapabilityRequest], Type[CapabilityExecution] + ], ) -> Optional[Union[Capability, CapabilityRequest, CapabilityExecution]]: """ Look up entity in database and return object representation of it if found @@ -162,7 +193,9 @@ class CapabilityInfo(CapabilityInfoIF): """ return self.session.query(entity_schema).filter(entity_schema.id == entity_id) - def save_entity(self, entity: Union[Capability, CapabilityRequest, CapabilityExecution]) -> int: + def save_entity( + self, entity: Union[Capability, CapabilityRequest, CapabilityExecution] + ) -> int: """ Save a given entity and return an integer identifier for it :param entity: the entity to save @@ -227,7 +260,6 @@ class WorkflowService(WorkflowServiceIF): # send amqp event and update database self.on_workflow_event(e, record, temp_folder) - @staticmethod def _prepare_files_for_condor(files: List[AbstractFile]) -> Path: """ @@ -244,7 +276,7 @@ class WorkflowService(WorkflowServiceIF): (temp_folder / file.filename).write_bytes(file.content) # 3. make any scripts in there executable - for file in temp_folder.glob('*.sh'): + for file in temp_folder.glob("*.sh"): file.chmod(file.stat().st_mode | stat.S_IEXEC) # finished, return folder @@ -258,17 +290,17 @@ class WorkflowService(WorkflowServiceIF): :param folder: the path to the folder to execute :return: the path to the log file """ - print(f'executing on folder {folder}') + print(f"executing on folder {folder}") # some file in here should end in .dag; that file is our dagman input - dagman = list(folder.glob('*.dag'))[0] + dagman = list(folder.glob("*.dag"))[0] # ensure the log file exists - logfile = folder / 'condor.log' + logfile = folder / "condor.log" logfile.touch() # submit - subprocess.run(['condor_submit_dag', str(dagman)], cwd=str(folder.absolute())) + subprocess.run(["condor_submit_dag", str(dagman)], cwd=str(folder.absolute())) # return the logfile return logfile @@ -282,7 +314,9 @@ class WorkflowService(WorkflowServiceIF): # self.channel = channels.workflow_events.listen(self.on_workflow_event) raise NotImplementedError - def on_workflow_event(self, event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path): + def on_workflow_event( + self, event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path + ): # 1. log that we received this event, somehow # 2. update the WorkflowRequest record with the state we got # 3. do per-event-type stuff, such as level change events, database @@ -290,14 +324,20 @@ class WorkflowService(WorkflowServiceIF): catcher = EventCatcher() - decorated_workflow_send = log_decorator_factory('Sending Workflow Event...')(workflow_events.send) - decorated_capability_send = log_decorator_factory('Sending Capability Event...')(capability_events.send) + decorated_workflow_send = log_decorator_factory("Sending Workflow Event...")( + workflow_events.send + ) + decorated_capability_send = log_decorator_factory( + "Sending Capability Event..." + )(capability_events.send) # 1. send amqp event to workflow channel decorated_workflow_send(event, WORKFLOW_STATUS_EXCH) # 2. update request record with new status - print(f'Updating state on request {request_record.workflow_request_id} to {event.type.name}...') + print( + f"Updating state on request {request_record.workflow_request_id} to {event.type.name}..." + ) request_record.update_status(event.type.name) # 3. do per-event-type stuff