Skip to content
Snippets Groups Projects
Commit 9603b503 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Remove execution_id from workflow_request, move workflow_request_id to execution. More testing

parent c3ce20d3
No related branches found
No related tags found
No related merge requests found
......@@ -8,57 +8,116 @@ Create Date: 2020-09-02 11:25:01.571392
from alembic import op
import sqlalchemy as sa
revision = '44d5bbbf2615'
revision = "44d5bbbf2615"
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
op.create_table('workflows',
sa.Column('workflow_name', sa.String, primary_key=True, comment='the human-readable name for the workflow.'),
comment='A listing of the available workflows in the system.')
op.create_table(
"workflows",
sa.Column(
"workflow_name",
sa.String,
primary_key=True,
comment="the human-readable name for the workflow.",
),
comment="A listing of the available workflows in the system.",
)
op.create_table('workflow_templates',
sa.Column('filename', sa.String, primary_key=True, comment='the filename of the template'),
sa.Column('content', sa.LargeBinary, nullable=False, comment='the content of the template'),
sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name'), primary_key=True,
comment='the name of the workflow this template belongs to'),
comment='Templates associated with workflows')
op.create_table(
"workflow_templates",
sa.Column(
"filename",
sa.String,
primary_key=True,
comment="the filename of the template",
),
sa.Column(
"content",
sa.LargeBinary,
nullable=False,
comment="the content of the template",
),
sa.Column(
"workflow_name",
sa.String,
sa.ForeignKey("workflows.workflow_name"),
primary_key=True,
comment="the name of the workflow this template belongs to",
),
comment="Templates associated with workflows",
)
op.create_table('workflow_requests',
sa.Column('workflow_request_id', sa.Integer, primary_key=True, autoincrement=True,
comment='the unique id of the request. auto-generated'),
sa.Column('workflow_name', sa.String, sa.ForeignKey('workflows.workflow_name')),
sa.Column('argument', sa.JSON, comment='the argument(s) used for the workflow in this request.'),
sa.Column('state', sa.String, comment='the current state of the workflow in this request.'),
sa.Column('execution_id', sa.Integer,
comment='the id of the parent execution awaiting the workflow'),
comment='A listing of requests for workflow execution.')
op.create_table(
"workflow_requests",
sa.Column(
"workflow_request_id",
sa.Integer,
primary_key=True,
autoincrement=True,
comment="the unique id of the request. auto-generated",
),
sa.Column("workflow_name", sa.String, sa.ForeignKey("workflows.workflow_name")),
sa.Column(
"argument",
sa.JSON,
comment="the argument(s) used for the workflow in this request.",
),
sa.Column(
"state",
sa.String,
comment="the current state of the workflow in this request.",
),
comment="A listing of requests for workflow execution.",
)
op.create_table('workflow_request_files',
sa.Column('workflow_request_id', sa.Integer, sa.ForeignKey('workflow_requests.workflow_request_id'),
comment='the id of the workflow request.'),
sa.Column('filename', sa.String, comment='the name of this file', nullable=False),
sa.Column('content', sa.LargeBinary, comment='the contents of the file', nullable=False),
comment='A man-to-many mapping table tracking which files were used for workflow requests.')
op.create_table(
"workflow_request_files",
sa.Column(
"workflow_request_id",
sa.Integer,
sa.ForeignKey("workflow_requests.workflow_request_id"),
comment="the id of the workflow request.",
),
sa.Column(
"filename", sa.String, comment="the name of this file", nullable=False
),
sa.Column(
"content",
sa.LargeBinary,
comment="the contents of the file",
nullable=False,
),
comment="A man-to-many mapping table tracking which files were used for workflow requests.",
)
op.create_unique_constraint('workflow_request_filenames_uniq', 'workflow_request_files',
['workflow_request_id', 'filename'])
op.create_unique_constraint(
"workflow_request_filenames_uniq",
"workflow_request_files",
["workflow_request_id", "filename"],
)
# populate with some initial data
op.execute("INSERT INTO workflows (workflow_name) VALUES ('null')")
op.execute("INSERT INTO workflow_templates (workflow_name, filename, content) "
"VALUES ('null', 'null.condor', E'executable = null.sh\narguments = \"{{arguments}}\""
"\nerror = null.err\nlog = condor.log\nqueue')")
op.execute("INSERT INTO workflow_templates (workflow_name, filename, content) VALUES "
"('null', 'null.sh', E'#!/bin/sh\n\nnull $*')")
op.execute("INSERT INTO workflow_templates (workflow_name, filename, content) VALUES "
"('null', 'null.dag', 'JOB null null.condor')")
op.execute(
"INSERT INTO workflow_templates (workflow_name, filename, content) "
"VALUES ('null', 'null.condor', E'executable = null.sh\narguments = \"{{arguments}}\""
"\nerror = null.err\nlog = condor.log\nqueue')"
)
op.execute(
"INSERT INTO workflow_templates (workflow_name, filename, content) VALUES "
"('null', 'null.sh', E'#!/bin/sh\n\nnull $*')"
)
op.execute(
"INSERT INTO workflow_templates (workflow_name, filename, content) VALUES "
"('null', 'null.dag', 'JOB null null.condor')"
)
def downgrade():
op.drop_table('workflow_request_files')
op.drop_table('workflow_requests')
op.drop_table('workflow_templates')
op.drop_table('workflows')
op.drop_table("workflow_request_files")
op.drop_table("workflow_requests")
op.drop_table("workflow_templates")
op.drop_table("workflows")
......@@ -10,52 +10,74 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '57c38b5f012e'
down_revision = '44d5bbbf2615'
revision = "57c38b5f012e"
down_revision = "44d5bbbf2615"
branch_labels = None
depends_on = None
def upgrade():
print('creating capabilities')
op.create_table('capabilities',
sa.Column('capability_id', sa.Integer, primary_key=True),
sa.Column('capability_name', sa.String),
sa.Column('capability_steps', sa.String),
sa.Column('max_jobs', sa.Integer))
op.create_table('capability_requests',
sa.Column('capability_request_id', sa.Integer, primary_key=True),
sa.Column('state', sa.String),
sa.Column('capability_id', sa.Integer, sa.ForeignKey('capabilities.capability_id')),
sa.Column('parameters', sa.String))
op.create_table('capability_versions',
sa.Column('capability_request_id',
sa.Integer,
sa.ForeignKey('capability_requests.capability_request_id'),
primary_key=True),
sa.Column('version_number',
sa.Integer,
primary_key=True),
sa.Column('parameters', sa.String))
op.create_table('capability_executions',
sa.Column('execution_id', sa.Integer, primary_key=True),
sa.Column('state', sa.String),
sa.Column('capability_request_id', sa.Integer),
sa.Column('capability_version_number', sa.Integer),
sa.Column('current_step', sa.Integer),
sa.Column('steps', sa.String),
sa.ForeignKeyConstraint(['capability_request_id', 'capability_version_number'],
['capability_versions.capability_request_id', 'capability_versions.version_number']))
op.execute("INSERT INTO capabilities (capability_name, capability_steps, max_jobs) "
"VALUES ('null', 'prepare-and-run-workflow null\nawait-parameter qa-status', 2)")
print("creating capabilities")
op.create_table(
"capabilities",
sa.Column("capability_id", sa.Integer, primary_key=True),
sa.Column("capability_name", sa.String),
sa.Column("capability_steps", sa.String),
sa.Column("max_jobs", sa.Integer),
)
op.create_table(
"capability_requests",
sa.Column("capability_request_id", sa.Integer, primary_key=True),
sa.Column("state", sa.String),
sa.Column(
"capability_id", sa.Integer, sa.ForeignKey("capabilities.capability_id")
),
sa.Column("parameters", sa.String),
)
op.create_table(
"capability_versions",
sa.Column(
"capability_request_id",
sa.Integer,
sa.ForeignKey("capability_requests.capability_request_id"),
primary_key=True,
),
sa.Column("version_number", sa.Integer, primary_key=True),
sa.Column("parameters", sa.String),
)
op.create_table(
"capability_executions",
sa.Column("execution_id", sa.Integer, primary_key=True),
sa.Column("state", sa.String),
sa.Column("capability_request_id", sa.Integer),
sa.Column("capability_version_number", sa.Integer),
sa.Column("current_step", sa.Integer),
sa.Column("steps", sa.String),
sa.Column(
"current_workflow_request_id",
sa.Integer,
sa.ForeignKey("workflow_requests.workflow_request_id"),
),
sa.ForeignKeyConstraint(
["capability_request_id", "capability_version_number"],
[
"capability_versions.capability_request_id",
"capability_versions.version_number",
],
),
)
op.execute(
"INSERT INTO capabilities (capability_name, capability_steps, max_jobs) "
"VALUES ('null', 'prepare-and-run-workflow null\nawait-parameter qa-status', 2)"
)
def downgrade():
op.drop_table('capability_executions')
op.drop_table('capability_versions')
op.drop_table('capability_requests')
op.drop_table('capabilities')
op.drop_table("capability_executions")
op.drop_table("capability_versions")
op.drop_table("capability_requests")
op.drop_table("capabilities")
......@@ -96,8 +96,7 @@ class WorkflowRequestRestService:
request = self.request.info.create_workflow_request(
self.request.context, self.request.GET.getall("args")
)
return request\
return request
@view_config(request_method="POST", route_name="create_execution_workflow_request")
def create_execution_workflow_request(self):
......@@ -111,7 +110,9 @@ class WorkflowRequestRestService:
# hand it to WorkflowInfo to save it, but we're still conflating
# workflows and workflow requests right now
request = self.request.info.create_workflow_request(
self.request.context, self.request.GET.getall("args"), self.request.matchdict['id']
self.request.context,
self.request.GET.getall("args"),
self.request.matchdict["id"],
)
return request
......
......@@ -6,6 +6,8 @@ import inspect
from abc import ABC, abstractmethod
from typing import List, Union, Optional, Any, Type
from workspaces.workflow_interfaces import WorkflowRequestIF
from .product_interfaces import ProductIF, FutureProductIF
ProductLocator = str
......@@ -46,6 +48,8 @@ class CapabilityExecutionIF:
capability_version: CapabilityVersionIF
state: str
current_step: int
version: CapabilityVersionIF
current_workflow_request: WorkflowRequestIF
def __json__(self, request: Any) -> dict:
raise NotImplementedError
......
......@@ -312,6 +312,12 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
current_step = sa.Column("current_step", sa.Integer)
steps = sa.Column("steps", sa.String)
version = relationship(CapabilityVersion, back_populates="executions")
current_workflow_request_id = sa.Column(
"current_workflow_request_id",
sa.Integer,
sa.ForeignKey("workflow_requests.workflow_request_id"),
)
current_workflow_request = relationship("WorkflowRequest")
__table_args__ = (
sa.ForeignKeyConstraint(
......@@ -327,6 +333,7 @@ class CapabilityExecution(Base, CapabilityExecutionIF):
capability_request_id=self.capability_request_id,
version_number=self.version_number,
current_step=self.current_step,
current_workflow_request_id=self.current_workflow_request_id,
)
@property
......@@ -431,9 +438,6 @@ class WorkflowRequest(Base, WorkflowRequestIF):
)
argument = sa.Column("argument", sa.JSON)
state = sa.Column("state", sa.String)
execution_id = sa.Column(
"execution_id", sa.Integer, sa.ForeignKey("capability_executions.execution_id")
)
files = relationship("WorkflowRequestFile", backref="request")
@property
......@@ -460,7 +464,6 @@ class WorkflowRequest(Base, WorkflowRequestIF):
workflow_name=self.workflow_name,
argument=self.argument,
state=self.state,
execution_id=self.execution_id,
files=self.files,
)
......
......@@ -13,7 +13,9 @@ from typing import Dict, List, Optional, NamedTuple
from channels.amqp_helpers import (
workflow_events,
capability_events, Channel, WorkflowEventChannel,
capability_events,
Channel,
WorkflowEventChannel,
)
from sqlalchemy.orm import Session
from wf_monitor.monitor import (
......@@ -189,10 +191,9 @@ class CapabilityService(CapabilityServiceIF):
# No corresponding capability event
return None
workflow_request = self.workflow_info.lookup_workflow_request(
self.capability_info.lookup_execution_by_workflow_request_id(
event.workflow_request_id
)
execution_id = workflow_request.execution_id
return CapabilityEvent(event_type, execution_id)
......@@ -286,6 +287,13 @@ class CapabilityInfo(CapabilityInfoIF):
self.session.add(entity)
self.session.flush()
def lookup_execution_by_workflow_request_id(self, workflow_request_id: int):
return (
self.session.query(CapabilityExecution)
.filter_by(workflow_request_id=workflow_request_id)
.one()
)
class CapabilityEngine(CapabilityEngineIF):
"""
......@@ -602,7 +610,7 @@ class WorkflowInfo(WorkflowInfoIF):
return self.session.query(Workflow).all()
def create_workflow_request(
self, workflow: Workflow, argument: Dict, execution_id: int = None
self, workflow: Workflow, argument: Dict
) -> WorkflowRequest:
"""
Create new workflow request and save to database
......@@ -615,7 +623,6 @@ class WorkflowInfo(WorkflowInfoIF):
workflow_name=workflow.workflow_name,
argument=argument,
state=WorkflowRequestState.Created.name,
execution_id=execution_id,
)
self.save_request(request)
return request
......
......@@ -4,14 +4,17 @@ from workspaces.services import WorkflowInfo, CapabilityInfo
from workspaces.workflow_interfaces import WorkflowInfoIF
SESSION = create_session("SDM")
def get_workflow_info() -> WorkflowInfoIF:
"""Return a WorkflowInfo based on a real database connection"""
return WorkflowInfo(create_session("SDM"))
return WorkflowInfo(SESSION)
def get_capability_info() -> CapabilityInfoIF:
"""Return a CapabilityInfo based on a real database connection"""
return CapabilityInfo(create_session("SDM"))
return CapabilityInfo(SESSION)
def test_persisting_workflow_request():
......@@ -34,3 +37,15 @@ def test_reading_capability():
info = get_capability_info()
null = info.lookup_capability("null")
assert null is not None
def test_capability_associated_to_workflow():
cinfo = get_capability_info()
winfo = get_workflow_info()
req = cinfo.create_capability_request("null", {})
exec = cinfo.create_execution(req)
workflow_req = winfo.create_workflow_request(
winfo.lookup_workflow_definition("null"), {}
)
exec.current_workflow_request = workflow_req
SESSION.add(exec)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment