Skip to content
Snippets Groups Projects
Commit 778b9aab authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Save the delivery URL and path in the capability layer

parent ffcc0f03
No related branches found
No related tags found
1 merge request!183Save the delivery URL and path in the capability layer
Pipeline #1265 passed
"""add-delivery-results-to-capability-execution
Revision ID: 68d0883785b7
Revises: cb49c557f7e8
Create Date: 2021-04-12 10:30:16.482620
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "68d0883785b7"
down_revision = "cb49c557f7e8"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"capability_executions",
sa.Column("delivery_url", sa.String, comment="URL to the results, if possible"),
)
op.add_column(
"capability_executions",
sa.Column("delivery_path", sa.String, comment="Path to the results on disk"),
)
def downgrade():
op.drop_column("capability_executions", "delivery_url")
op.drop_column("capability_executions", "delivery_path")
......@@ -20,6 +20,10 @@ The messenger object is also responsible for message-to-event
logger = logging.getLogger()
class MessageFormatException(Exception):
pass
class AMQPServerInitializer:
"""
Object responsible for creating a connection to the correct AMQP server and establishing exchanges and queues for
......@@ -153,7 +157,12 @@ class Messenger(ConsumerProducerMixin):
:param message: Body of AMQP message
"""
if "routing_key" not in message.keys():
logger.warning("Message has no routing key.")
if "service" in message.keys():
message["routing_key"] = message["service"]
else:
raise MessageFormatException(
"Message has no routing key. Keys are: " + ", ".join(message.keys())
)
self.producer.publish(
message,
......
from __future__ import annotations
import datetime
import pathlib
from typing import Tuple
import pendulum
......@@ -204,6 +205,19 @@ class CapabilityVersion(Base, CapabilityVersionIF):
return self.request.capability
class SaPath(sa.types.TypeDecorator):
impl = sa.types.String
def process_bind_param(self, value, dialect):
return str(value)
def process_result_value(self, value, dialect):
return pathlib.Path(value)
def copy(self, **kw):
return SaPath(self.impl.length)
class CapabilityExecution(Base, CapabilityExecutionIF, JSONSerializable):
"""
Schema representation of a capability request's execution record
......@@ -223,6 +237,8 @@ class CapabilityExecution(Base, CapabilityExecutionIF, JSONSerializable):
steps = sa.Column("steps", sa.String)
version = relationship(CapabilityVersion, back_populates="executions")
current_workflow_request_id = sa.Column("current_workflow_request_id", sa.Integer)
delivery_url = sa.Column("delivery_url", sa.String)
delivery_path = sa.Column("delivery_path", SaPath)
created_at = sa.Column(
"created_at",
......
from __future__ import annotations
from typing import Dict, List
import pathlib
from typing import List
from workspaces.capability.helpers_interfaces import CapabilityStepIF, ParameterIF
from workspaces.products.schema_interfaces import FutureProductIF
from workspaces.system.schema import JSONSerializable
from workspaces.workflow.schema_interfaces import WorkflowRequestIF
class CapabilityIF(JSONSerializable):
......@@ -45,6 +45,8 @@ class CapabilityExecutionIF:
steps: str
capability: CapabilityIF
capability_request: CapabilityRequestIF
delivery_url: str
delivery_path: pathlib.Path
def on_last_step(self) -> bool:
raise NotImplementedError
......
......@@ -27,10 +27,12 @@ class CapabilityService(CapabilityServiceIF):
The capability service: clients access this to request capability runs
"""
def __init__(self,
capability_info: CapabilityInfoIF,
workflow_service: WorkflowServiceIF,
notification_service: NotificationServiceIF):
def __init__(
self,
capability_info: CapabilityInfoIF,
workflow_service: WorkflowServiceIF,
notification_service: NotificationServiceIF,
):
self.message_router = Router("capability")
self.message_router.register(self)
self.execution_manager = ExecutionManager(
......@@ -91,6 +93,29 @@ class CapabilityService(CapabilityServiceIF):
}
self.message_router.send_message(**capability_complete_msg)
@on_message(service="workflow", type="delivery")
def on_delivery(self, **message: Dict):
"""
Catch a delivery notification and update the locations in the
referenced execution based on it
:param message: a delivery-type notification with a workflow request
subject and a delivery field
"""
logger.info("Received delivery notification: %s", message)
# retrieve the delivery structure in the message
delivery = message["delivery"]
# we caught a delivery event, find the associated capability execution
execution = self.capability_info.lookup_execution_by_workflow_request_id(
message["subject"]["workflow_request_id"]
)
# update some fields on the execution, if possible
execution.delivery_url = delivery["url"] if "url" in delivery else None
execution.delivery_path = delivery["delivered_to"] if "delivered_to" in delivery else None
@on_message(type="capability-submitted")
def notify_submitted(self, **message: Dict):
subject = message["subject"]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment