diff --git a/schema/versions/68d0883785b7_add_delivery_results_to_capability_.py b/schema/versions/68d0883785b7_add_delivery_results_to_capability_.py new file mode 100644 index 0000000000000000000000000000000000000000..ccb85d865dcecf81b76115931dd85ebf69f2cef9 --- /dev/null +++ b/schema/versions/68d0883785b7_add_delivery_results_to_capability_.py @@ -0,0 +1,32 @@ +"""add-delivery-results-to-capability-execution + +Revision ID: 68d0883785b7 +Revises: cb49c557f7e8 +Create Date: 2021-04-12 10:30:16.482620 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "68d0883785b7" +down_revision = "cb49c557f7e8" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "capability_executions", + sa.Column("delivery_url", sa.String, comment="URL to the results, if possible"), + ) + op.add_column( + "capability_executions", + sa.Column("delivery_path", sa.String, comment="Path to the results on disk"), + ) + + +def downgrade(): + op.drop_column("capability_executions", "delivery_url") + op.drop_column("capability_executions", "delivery_path") diff --git a/shared/messaging/messaging/messenger.py b/shared/messaging/messaging/messenger.py index cb0b22aba5efc904cec5b756fa905f4ade9e0b8c..4b3527d19dc6453a29ebb6146304c46e71daf8fe 100644 --- a/shared/messaging/messaging/messenger.py +++ b/shared/messaging/messaging/messenger.py @@ -20,6 +20,10 @@ The messenger object is also responsible for message-to-event logger = logging.getLogger() +class MessageFormatException(Exception): + pass + + class AMQPServerInitializer: """ Object responsible for creating a connection to the correct AMQP server and establishing exchanges and queues for @@ -153,7 +157,12 @@ class Messenger(ConsumerProducerMixin): :param message: Body of AMQP message """ if "routing_key" not in message.keys(): - logger.warning("Message has no routing key.") + if "service" in message.keys(): + message["routing_key"] = message["service"] + else: + raise MessageFormatException( + "Message has no routing key. Keys are: " + ", ".join(message.keys()) + ) self.producer.publish( message, diff --git a/shared/workspaces/workspaces/capability/schema.py b/shared/workspaces/workspaces/capability/schema.py index 6a66b58d704c390cfe2103f84971a0688d572128..8338b0ab7bf94b53463c5f894cdf891334bd9dd7 100644 --- a/shared/workspaces/workspaces/capability/schema.py +++ b/shared/workspaces/workspaces/capability/schema.py @@ -1,6 +1,7 @@ from __future__ import annotations import datetime +import pathlib from typing import Tuple import pendulum @@ -204,6 +205,19 @@ class CapabilityVersion(Base, CapabilityVersionIF): return self.request.capability +class SaPath(sa.types.TypeDecorator): + impl = sa.types.String + + def process_bind_param(self, value, dialect): + return str(value) + + def process_result_value(self, value, dialect): + return pathlib.Path(value) + + def copy(self, **kw): + return SaPath(self.impl.length) + + class CapabilityExecution(Base, CapabilityExecutionIF, JSONSerializable): """ Schema representation of a capability request's execution record @@ -223,6 +237,8 @@ class CapabilityExecution(Base, CapabilityExecutionIF, JSONSerializable): steps = sa.Column("steps", sa.String) version = relationship(CapabilityVersion, back_populates="executions") current_workflow_request_id = sa.Column("current_workflow_request_id", sa.Integer) + delivery_url = sa.Column("delivery_url", sa.String) + delivery_path = sa.Column("delivery_path", SaPath) created_at = sa.Column( "created_at", diff --git a/shared/workspaces/workspaces/capability/schema_interfaces.py b/shared/workspaces/workspaces/capability/schema_interfaces.py index c5e9b17c61ec4a7fde9794292cb5e0e77e31372b..edfd865351c78951335181387213ab79f0c53495 100644 --- a/shared/workspaces/workspaces/capability/schema_interfaces.py +++ b/shared/workspaces/workspaces/capability/schema_interfaces.py @@ -1,11 +1,11 @@ from __future__ import annotations -from typing import Dict, List +import pathlib +from typing import List from workspaces.capability.helpers_interfaces import CapabilityStepIF, ParameterIF from workspaces.products.schema_interfaces import FutureProductIF from workspaces.system.schema import JSONSerializable -from workspaces.workflow.schema_interfaces import WorkflowRequestIF class CapabilityIF(JSONSerializable): @@ -45,6 +45,8 @@ class CapabilityExecutionIF: steps: str capability: CapabilityIF capability_request: CapabilityRequestIF + delivery_url: str + delivery_path: pathlib.Path def on_last_step(self) -> bool: raise NotImplementedError diff --git a/shared/workspaces/workspaces/capability/services/capability_service.py b/shared/workspaces/workspaces/capability/services/capability_service.py index 98adaa2cd368ba17031e09305196ce9916b146bc..1ec113506bab3e65a5034f947982197a7953ea42 100644 --- a/shared/workspaces/workspaces/capability/services/capability_service.py +++ b/shared/workspaces/workspaces/capability/services/capability_service.py @@ -27,10 +27,12 @@ class CapabilityService(CapabilityServiceIF): The capability service: clients access this to request capability runs """ - def __init__(self, - capability_info: CapabilityInfoIF, - workflow_service: WorkflowServiceIF, - notification_service: NotificationServiceIF): + def __init__( + self, + capability_info: CapabilityInfoIF, + workflow_service: WorkflowServiceIF, + notification_service: NotificationServiceIF, + ): self.message_router = Router("capability") self.message_router.register(self) self.execution_manager = ExecutionManager( @@ -91,6 +93,29 @@ class CapabilityService(CapabilityServiceIF): } self.message_router.send_message(**capability_complete_msg) + @on_message(service="workflow", type="delivery") + def on_delivery(self, **message: Dict): + """ + Catch a delivery notification and update the locations in the + referenced execution based on it + + :param message: a delivery-type notification with a workflow request + subject and a delivery field + """ + logger.info("Received delivery notification: %s", message) + + # retrieve the delivery structure in the message + delivery = message["delivery"] + + # we caught a delivery event, find the associated capability execution + execution = self.capability_info.lookup_execution_by_workflow_request_id( + message["subject"]["workflow_request_id"] + ) + + # update some fields on the execution, if possible + execution.delivery_url = delivery["url"] if "url" in delivery else None + execution.delivery_path = delivery["delivered_to"] if "delivered_to" in delivery else None + @on_message(type="capability-submitted") def notify_submitted(self, **message: Dict): subject = message["subject"]