diff --git a/shared/workspaces/src/workspaces/schema.py b/shared/workspaces/src/workspaces/schema.py index 570e604232b31305cecc0aaba2f82895e7fe910c..95548def83253715833831a827c8a258e4c5b2cb 100644 --- a/shared/workspaces/src/workspaces/schema.py +++ b/shared/workspaces/src/workspaces/schema.py @@ -13,8 +13,14 @@ from sqlalchemy import create_engine from sqlalchemy.orm import relationship, sessionmaker from sqlalchemy.ext.declarative import declarative_base -from .capability_interfaces import CapabilityIF, CapabilityRequestIF, CapabilityExecutionIF, \ - CapabilitySequenceIF, ParameterIF, CapabilityVersionIF +from .capability_interfaces import ( + CapabilityIF, + CapabilityRequestIF, + CapabilityExecutionIF, + CapabilitySequenceIF, + ParameterIF, + CapabilityVersionIF, +) from .helpers import CapabilityStepType, CapabilityStep, CapabilitySequence from .product_interfaces import FutureProductIF @@ -24,6 +30,7 @@ class AbstractFile: Abstract file is exactly that, an abstract concept of what a file is, to be returned from various non-filesystem places. """ + def __init__(self, filename: str, content: bytes): self.filename, self.content = filename, content @@ -31,10 +38,10 @@ class AbstractFile: (directory / self.filename).write_bytes(self.content) def __json__(self, request): - return {'filename': self.filename, 'content': self.content} + return {"filename": self.filename, "content": self.content} @classmethod - def from_path(cls, path: Path) -> 'AbstractFile': + def from_path(cls, path: Path) -> "AbstractFile": return cls(path.name, path.read_bytes()) @@ -42,6 +49,7 @@ class AbstractRequest: """ Abstract File is an abstract concept that is used to create requests """ + workflow_name: str argument: json @@ -49,7 +57,7 @@ class AbstractRequest: self.workflow_name, self.argument = workflow_name, argument def __json__(self, request): - return {'workflow_name': self.workflow_name, 'argument': self.argument} + return {"workflow_name": self.workflow_name, "argument": self.argument} class WorkflowEventType(enum.Enum): @@ -57,6 +65,7 @@ class WorkflowEventType(enum.Enum): Kinds of events that a workflow can send. These four are derived from the many kinds that HTCondor can send. """ + SUBMITTED = 0 EXECUTING = 1 TERMINATED = 5 @@ -70,13 +79,13 @@ class WorkflowEvent: """ def __init__( - self, - job_name: str, - job_id: str, - event_type: WorkflowEventType, - timestamp: str, - log: str, - retval: int = None + self, + job_name: str, + job_id: str, + event_type: WorkflowEventType, + timestamp: str, + log: str, + retval: int = None, ): # NOTE: when you modify the properties here, also update the schema # in the .json package. @@ -89,55 +98,58 @@ class WorkflowEvent: def json(self) -> str: d = { - 'job_name': self.job_name, - 'type': str(self.type), - 'timestamp': self.timestamp, - 'return_value': self.retval, - 'log': self.log + "job_name": self.job_name, + "type": str(self.type), + "timestamp": self.timestamp, + "return_value": self.retval, + "log": self.log, } return json.dumps(d) def __str__(self): - return f'WorkflowEvent with data {self.json()}' + return f"WorkflowEvent with data {self.json()}" def __repr__(self): - return f'<WorkflowEvent {self.__dict__}>' + return f"<WorkflowEvent {self.__dict__}>" def __eq__(self, other): - return self.job_name == other.job_name and \ - self.job_id == other.job_id and \ - self.type == other.type and \ - self.timestamp == other.timestamp and \ - self.log == other.log and \ - self.retval == other.retval + return ( + self.job_name == other.job_name + and self.job_id == other.job_id + and self.type == other.type + and self.timestamp == other.timestamp + and self.log == other.log + and self.retval == other.retval + ) class CapabilityEvent: """ An event from the underlying workflow execution system. """ + # TODO: determine shape of Capability Event information @property def request_id(self): - return self.json['request_id'] + return self.json["request_id"] @property def execution_id(self): - return self.json['execution_id'] + return self.json["execution_id"] @property def complete(self): - return self.json['is_complete'] + return self.json["is_complete"] def __init__(self, json): self.json = json def __str__(self): - return f'<CapabilityEvent with data{self.json()}>' + return f"<CapabilityEvent with data{self.json()}>" def __repr__(self): - return f'<CapabilityEvent {self.__dict__}>' + return f"<CapabilityEvent {self.__dict__}>" def __eq__(self, other): return self.json == other.json @@ -161,37 +173,44 @@ class Capability(Base, CapabilityIF): @classmethod def from_file(cls, filename: str): self = cls() - self.id, self.name, self.max_jobs, self.steps = cls.parse_capability_file(filename) + self.id, self.name, self.max_jobs, self.steps = cls.parse_capability_file( + filename + ) return self @classmethod def from_json(cls, json_data: str): self = cls() json_dict = json.loads(json_data) - print(json_dict) - self.id = json_dict.get('id', None) - self.name = json_dict['name'] - self.max_jobs = json_dict['max_jobs'] - self.steps = json_dict['steps'] + self.id = json_dict.get("id", None) + self.name = json_dict["name"] + self.max_jobs = json_dict["max_jobs"] + self.steps = json_dict["steps"] return self @staticmethod - def parse_capability_file(filename: str) -> Tuple[int, str, int, CapabilitySequenceIF]: - with open(filename, 'r') as f: - cap_id, name, max_jobs = f.readline().split(' ') + def parse_capability_file( + filename: str, + ) -> Tuple[int, str, int, CapabilitySequenceIF]: + with open(filename, "r") as f: + cap_id, name, max_jobs = f.readline().split(" ") steps = [] for line in f.readlines(): - step_type, step_value = line.strip().split(' ') - steps.append(CapabilityStep(CapabilityStepType.from_string(step_type), step_value)) + step_type, step_value = line.strip().split(" ") + steps.append( + CapabilityStep( + CapabilityStepType.from_string(step_type), step_value + ) + ) return int(cap_id), name, int(max_jobs), CapabilitySequence(steps) def create_request( - self, - parameters: ParameterIF = None, - future_products: FutureProductIF = None, - versions: str = None + self, + parameters: ParameterIF = None, + future_products: FutureProductIF = None, + versions: str = None, ): """ Create a new request for this capability with specific options @@ -201,23 +220,24 @@ class Capability(Base, CapabilityIF): capability=self.id, parameters=parameters, future_products=future_products, - versions=versions + versions=versions, ) def to_dict(self): return { - 'id': self.id, - 'name': self.name, - 'max_jobs': self.max_jobs, - 'steps': str(self.steps) + "id": self.id, + "name": self.name, + "max_jobs": self.max_jobs, + "steps": str(self.steps), } def __str__(self): return ( - f'Capability object with ID {self.id}, name {self.name} and max simultaneous jobs of {self.max_jobs}' - f'\nSequence: {self.steps}' + f"Capability object with ID {self.id}, name {self.name} and max simultaneous jobs of {self.max_jobs}" + f"\nSequence: {self.steps}" ) + class CapabilityRequest(Base, CapabilityRequestIF): """ A capability request, which couples a capability to a product, representing @@ -231,9 +251,9 @@ class CapabilityRequest(Base, CapabilityRequestIF): sa.Integer, sa.ForeignKey('capabilities.capability_id') ) - parameters = sa.Column('parameters', sa.String) + parameters = sa.Column("parameters", sa.String) # FIXME: This needs to be changed to properly keep track of product locators. - #future_products = sa.Column('future_products', sa.String) + # future_products = sa.Column('future_products', sa.String) versions = relationship("CapabilityVersion", back_populates="capability_request") capability = relationship(Capability, back_populates='requests') @@ -253,38 +273,46 @@ class CapabilityVersion(Base, CapabilityVersionIF): """ Schema representation of a capability version """ - __tablename__ = 'capability_versions' - capability_request_id = sa.Column('capability_request_id', - sa.Integer, - sa.ForeignKey('capability_requests.capability_request_id'), - primary_key=True) - version_number = sa.Column('version_number', - sa.Integer, - primary_key=True) + + __tablename__ = "capability_versions" + capability_request_id = sa.Column( + "capability_request_id", + sa.Integer, + sa.ForeignKey("capability_requests.capability_request_id"), + primary_key=True, + ) + version_number = sa.Column("version_number", sa.Integer, primary_key=True) capability_request = relationship(CapabilityRequest, back_populates="versions") - executions = relationship("CapabilityExecution", back_populates="capability_version") + executions = relationship( + "CapabilityExecution", back_populates="capability_version" + ) class CapabilityExecution(Base, CapabilityExecutionIF): """ Schema representation of a capability request's execution record """ - __tablename__ = 'capability_executions' + __tablename__ = "capability_executions" - id = sa.Column('execution_id', sa.Integer, primary_key=True) - state = sa.Column('state', sa.String) + __tablename__ = "capability_executions" + + id = sa.Column("execution_id", sa.Integer, primary_key=True) + state = sa.Column("state", sa.String) capability_request_id = sa.Column( - 'capability_request_id', + "capability_request_id", sa.Integer, - sa.ForeignKey('capability_requests.capability_request_id') + sa.ForeignKey("capability_requests.capability_request_id"), ) - version_number = sa.Column('capability_version_number', sa.Integer) - current_step = sa.Column('current_step', sa.Integer) + version_number = sa.Column("capability_version_number", sa.Integer) + current_step = sa.Column("current_step", sa.Integer) capability_version = relationship(CapabilityVersion, back_populates="executions") - __table_args__ = (sa.ForeignKeyConstraint([capability_request_id, version_number], - [CapabilityVersion.capability_request_id, - CapabilityVersion.version_number]),) + __table_args__ = ( + sa.ForeignKeyConstraint( + [capability_request_id, version_number], + [CapabilityVersion.capability_request_id, CapabilityVersion.version_number], + ), + ) def __json__(self, request: Any) -> dict: return dict(id=self.id, state=self.state, capability_request_id=self.capability_request_id, @@ -296,16 +324,17 @@ class Workflow(Base): A Workflow is a suite of tasks defined by templates that must be executed together with some user-submitted files. """ - __tablename__ = 'workflows' - workflow_name = sa.Column('workflow_name', sa.String, primary_key=True) - templates = relationship('WorkflowTemplate', backref='workflows') - requests = relationship('WorkflowRequest', backref='workflows') + + __tablename__ = "workflows" + workflow_name = sa.Column("workflow_name", sa.String, primary_key=True) + templates = relationship("WorkflowTemplate", backref="workflows") + requests = relationship("WorkflowRequest", backref="workflows") def __json__(self, request): - return {'workflow_name': self.workflow_name, 'templates': self.templates} + return {"workflow_name": self.workflow_name, "templates": self.templates} def __repr__(self): - return f'<Workflow workflow_name={self.workflow_name}>' + return f"<Workflow workflow_name={self.workflow_name}>" def render_templates(self, argument: Dict, files: List[Path]) -> List[AbstractFile]: """ @@ -322,13 +351,19 @@ class WorkflowTemplate(Base): """ Workflow Templates are Mustache-formatted files associated with a given Workflow """ - __tablename__ = 'workflow_templates' - filename = sa.Column('filename', sa.String, primary_key=True) - content = sa.Column('content', sa.LargeBinary, nullable=False) - workflow_name = sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name'), primary_key=True) + + __tablename__ = "workflow_templates" + filename = sa.Column("filename", sa.String, primary_key=True) + content = sa.Column("content", sa.LargeBinary, nullable=False) + workflow_name = sa.Column( + "workflow_name", + sa.String, + sa.ForeignKey("workflows.workflow_name"), + primary_key=True, + ) def render(self, argument: Dict) -> AbstractFile: - raise NotImplementedError('Add mustache to render this template') + raise NotImplementedError("Add mustache to render this template") @property def template_file(self) -> AbstractFile: @@ -343,18 +378,23 @@ class WorkflowTemplate(Base): return self.template_file.__json__(request) def __repr__(self): - return f'<WorkflowTemplate filename={self.filename} for workflow={self.workflow_name}>' + return f"<WorkflowTemplate filename={self.filename} for workflow={self.workflow_name}>" class WorkflowRequest(Base): """ Workflow Requests invoke a particular workflow with a particular argument and some user-supplied files. """ - __tablename__ = 'workflow_requests' - workflow_request_id = sa.Column('workflow_request_id', sa.Integer, primary_key=True, autoincrement=True) - workflow_name = sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name')) - argument = sa.Column('argument', sa.JSON) - files = relationship('WorkflowRequestFile', backref='request') + + __tablename__ = "workflow_requests" + workflow_request_id = sa.Column( + "workflow_request_id", sa.Integer, primary_key=True, autoincrement=True + ) + workflow_name = sa.Column( + "workflow_name", sa.String, sa.ForeignKey("workflows.workflow_name") + ) + argument = sa.Column("argument", sa.JSON) + files = relationship("WorkflowRequestFile", backref="request") @property def request(self) -> AbstractRequest: @@ -371,23 +411,27 @@ class WorkflowRequest(Base): def set_start_time(self, time: str): self.start_time = time - def set_end_time(self, time:str): + def set_end_time(self, time: str): self.end_time = time def __repr__(self): - return f'<WorkflowRequest workflow_request_id= {self.workflow_request_id}>' + return f"<WorkflowRequest workflow_request_id= {self.workflow_request_id}>" class WorkflowRequestFile(Base): """ A Workflow Request File is a file supplied by the user and attached to the request they have submitted. """ - __tablename__ = 'workflow_request_files' - workflow_request_id = sa.Column('workflow_request_id', sa.Integer, - sa.ForeignKey('workflow_requests.workflow_request_id'), - primary_key=True) - filename = sa.Column('filename', sa.String, primary_key=True) - content = sa.Column('content', sa.LargeBinary, nullable=False) + + __tablename__ = "workflow_request_files" + workflow_request_id = sa.Column( + "workflow_request_id", + sa.Integer, + sa.ForeignKey("workflow_requests.workflow_request_id"), + primary_key=True, + ) + filename = sa.Column("filename", sa.String, primary_key=True) + content = sa.Column("content", sa.LargeBinary, nullable=False) @property def file(self) -> AbstractFile: @@ -398,7 +442,7 @@ class WorkflowRequestFile(Base): self.filename, self.content = file.filename, file.content def __repr__(self): - return f'<WorkflowRequestFile filename={self.filename}>' + return f"<WorkflowRequestFile filename={self.filename}>" def get_engine(): @@ -406,8 +450,10 @@ def get_engine(): Generate the SQL Alchemy engine for us, using Capo. :return: """ - capo = CapoConfig().settings('metadataDatabase') - url = capo.jdbcUrl.replace('jdbc:', '').replace('://', f'://{capo.jdbcUsername}:{capo.jdbcPassword}@') + capo = CapoConfig().settings("metadataDatabase") + url = capo.jdbcUrl.replace("jdbc:", "").replace( + "://", f"://{capo.jdbcUsername}:{capo.jdbcPassword}@" + ) return create_engine(url) diff --git a/shared/workspaces/src/workspaces/services.py b/shared/workspaces/src/workspaces/services.py index c549b4434c116de65dec42b209e6b133147cdea1..fa1e830f6b45e7825ab79532e1b9ee4c3969e72c 100644 --- a/shared/workspaces/src/workspaces/services.py +++ b/shared/workspaces/src/workspaces/services.py @@ -13,31 +13,60 @@ from sqlalchemy.orm import Session from workflow.event_catcher import EventCatcher from workspaces.capability_interfaces import CapabilityIF -from .capability_interfaces import CapabilityServiceIF, CapabilityQueueIF, CapabilityInfoIF, \ - CapabilityEngineIF, CapabilityName, ParameterIF +from .capability_interfaces import ( + CapabilityServiceIF, + CapabilityQueueIF, + CapabilityInfoIF, + CapabilityEngineIF, + CapabilityName, + ParameterIF, +) from .helpers import CapabilitySequence, ExecutionPriority, RequestState, ExecutionState from .product_interfaces import FutureProductIF from .workflow_interfaces import WorkflowServiceIF, WorkflowInfoIF -from .schema import Workflow, WorkflowEvent, WorkflowEventType, CapabilityRequest, Capability, \ - CapabilityExecution, get_engine, get_session_factory, WorkflowRequest, AbstractFile -from channels.amqp_helpers import workflow_events,capability_events, CAPABILITY_STATUS_EXCH -from wf_monitor.monitor import WorkflowMonitor, WORKFLOW_STATUS_EXCH, log_decorator_factory +from .schema import ( + Workflow, + WorkflowEvent, + WorkflowEventType, + CapabilityRequest, + Capability, + CapabilityExecution, + get_engine, + get_session_factory, + WorkflowRequest, + AbstractFile, +) +from channels.amqp_helpers import ( + workflow_events, + capability_events, + CAPABILITY_STATUS_EXCH, +) +from wf_monitor.monitor import ( + WorkflowMonitor, + WORKFLOW_STATUS_EXCH, + log_decorator_factory, +) class CapabilityService(CapabilityServiceIF): """ The capability service: clients access this to request capability runs """ + def __init__(self, info: CapabilityInfoIF): self.execution_pool = [] self.queues = {} self.capability_info = info - def create_request(self, - capability_name: str, - parameters: List[ParameterIF]=None, - products: List[FutureProductIF]=None) -> "CapabilityRequestIF": - return self.capability_info.create_capability_request(capability_name, parameters, products) + def create_request( + self, + capability_name: str, + parameters: List[ParameterIF] = None, + products: List[FutureProductIF] = None, + ) -> CapabilityRequest: + return self.capability_info.create_capability_request( + capability_name, parameters, products + ) def run_capability(self, request: CapabilityRequest) -> CapabilityExecution: """ @@ -50,9 +79,9 @@ class CapabilityService(CapabilityServiceIF): return execution_record def enqueue_execution( - self, - execution_record: CapabilityExecution, - priority: int = ExecutionPriority.Default.value + self, + execution_record: CapabilityExecution, + priority: int = ExecutionPriority.Default.value, ): """ Move execution record that is ready to execute a workflow into the appropriate capability @@ -76,6 +105,7 @@ class CapabilityEngine(CapabilityEngineIF): """ Executes a prepare and run workflow step of a capability """ + def __init__(self, execution: CapabilityExecution): self.execution = execution @@ -100,10 +130,7 @@ class CapabilityInfo(CapabilityInfoIF): return self.session.query(Capability).filter_by(name=capability_name).first() def create_capability( - self, - name: CapabilityName, - steps: CapabilitySequence, - max_jobs: int + self, name: CapabilityName, steps: CapabilitySequence, max_jobs: int ) -> int: """ Create new capability and save it in the database @@ -116,11 +143,11 @@ class CapabilityInfo(CapabilityInfoIF): return self.save_entity(capability) def create_capability_request( - self, - capability_name: str, - parameters: List[ParameterIF] = None, - future_products: List[FutureProductIF] = None, - versions: List[str] = None + self, + capability_name: str, + parameters: List[ParameterIF] = None, + future_products: List[FutureProductIF] = None, + versions: List[str] = None, ) -> int: """ Create new capability request and save it in the database @@ -146,15 +173,19 @@ class CapabilityInfo(CapabilityInfoIF): :return: Integer identifier for the record """ record = CapabilityExecution( - state=ExecutionState.Ready.name, capability_request_id=request_id, current_step=0 + state=ExecutionState.Ready.name, + capability_request_id=request_id, + current_step=0 ) self.save_entity(record) return record def lookup_entity( - self, - entity_id: int, - entity_schema: Union[Type[Capability], Type[CapabilityRequest], Type[CapabilityExecution]] + self, + entity_id: int, + entity_schema: Union[ + Type[Capability], Type[CapabilityRequest], Type[CapabilityExecution] + ], ) -> Optional[Union[Capability, CapabilityRequest, CapabilityExecution]]: """ Look up entity in database and return object representation of it if found @@ -164,7 +195,9 @@ class CapabilityInfo(CapabilityInfoIF): """ return self.session.query(entity_schema).filter(entity_schema.id == entity_id).one() - def save_entity(self, entity: Union[Capability, CapabilityRequest, CapabilityExecution]) -> int: + def save_entity( + self, entity: Union[Capability, CapabilityRequest, CapabilityExecution] + ) -> int: """ Save a given entity and return an integer identifier for it :param entity: the entity to save @@ -229,7 +262,6 @@ class WorkflowService(WorkflowServiceIF): # send amqp event and update database self.on_workflow_event(e, record, temp_folder) - @staticmethod def _prepare_files_for_condor(files: List[AbstractFile]) -> Path: """ @@ -246,7 +278,7 @@ class WorkflowService(WorkflowServiceIF): (temp_folder / file.filename).write_bytes(file.content) # 3. make any scripts in there executable - for file in temp_folder.glob('*.sh'): + for file in temp_folder.glob("*.sh"): file.chmod(file.stat().st_mode | stat.S_IEXEC) # finished, return folder @@ -260,17 +292,17 @@ class WorkflowService(WorkflowServiceIF): :param folder: the path to the folder to execute :return: the path to the log file """ - print(f'executing on folder {folder}') + print(f"executing on folder {folder}") # some file in here should end in .dag; that file is our dagman input - dagman = list(folder.glob('*.dag'))[0] + dagman = list(folder.glob("*.dag"))[0] # ensure the log file exists - logfile = folder / 'condor.log' + logfile = folder / "condor.log" logfile.touch() # submit - subprocess.run(['condor_submit_dag', str(dagman)], cwd=str(folder.absolute())) + subprocess.run(["condor_submit_dag", str(dagman)], cwd=str(folder.absolute())) # return the logfile return logfile @@ -284,7 +316,9 @@ class WorkflowService(WorkflowServiceIF): # self.channel = channels.workflow_events.listen(self.on_workflow_event) raise NotImplementedError - def on_workflow_event(self, event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path): + def on_workflow_event( + self, event: WorkflowEvent, request_record: WorkflowRequest, tmp_folder: Path + ): # 1. log that we received this event, somehow # 2. update the WorkflowRequest record with the state we got # 3. do per-event-type stuff, such as level change events, database @@ -292,14 +326,20 @@ class WorkflowService(WorkflowServiceIF): catcher = EventCatcher() - decorated_workflow_send = log_decorator_factory('Sending Workflow Event...')(workflow_events.send) - decorated_capability_send = log_decorator_factory('Sending Capability Event...')(capability_events.send) + decorated_workflow_send = log_decorator_factory("Sending Workflow Event...")( + workflow_events.send + ) + decorated_capability_send = log_decorator_factory( + "Sending Capability Event..." + )(capability_events.send) # 1. send amqp event to workflow channel decorated_workflow_send(event, WORKFLOW_STATUS_EXCH) # 2. update request record with new status - print(f'Updating state on request {request_record.workflow_request_id} to {event.type.name}...') + print( + f"Updating state on request {request_record.workflow_request_id} to {event.type.name}..." + ) request_record.update_status(event.type.name) # 3. do per-event-type stuff