Skip to content
Snippets Groups Projects
Commit 9c7db06b authored by Daniel Nemergut's avatar Daniel Nemergut
Browse files

Merge branch 'ingest_obs_aat_message' into 'main'

Fixing post-ingestion message error

See merge request !1476
parents 4857f079 ba0f323c
No related branches found
No related tags found
1 merge request!1476Fixing post-ingestion message error
Pipeline #12231 passed
...@@ -178,6 +178,26 @@ def send_aat_image_message(parameters: dict, ingest_type: Union[IngestType, VLAS ...@@ -178,6 +178,26 @@ def send_aat_image_message(parameters: dict, ingest_type: Union[IngestType, VLAS
) )
def send_aat_obs_message(parameters: dict):
"""
Send AMQP message to AAT to set off observation (VLBA) indexing
:param parameters: dictionary of available input parameters
:return:
"""
url = parameters["workflowUrl"] + f"/workflows/requests/message-archive/obs-ingestion-complete"
payload = {"project_code": parameters["project"]}
response = requests.post(url, json=payload)
if response.status_code != http.HTTPStatus.OK:
logger.warning(
f"WARNING: Failed to send observation ingestion complete to archive for observation associated with "
f"project {parameters['project']}."
f" Please set off the index manually!"
)
def _determine_ingestion_info( def _determine_ingestion_info(
args: argparse.Namespace, args: argparse.Namespace,
) -> Tuple[Union[IngestType, VLASSIngestType], dict, IngestLauncherIF]: ) -> Tuple[Union[IngestType, VLASSIngestType], dict, IngestLauncherIF]:
...@@ -228,6 +248,8 @@ def main(): ...@@ -228,6 +248,8 @@ def main():
logger.info("Ingestion finished successfully! Sending status message to archive...") logger.info("Ingestion finished successfully! Sending status message to archive...")
if arg_type == IngestType.CAL: if arg_type == IngestType.CAL:
set_cal_status_flag(parameters) set_cal_status_flag(parameters)
elif arg_type == IngestType.OBS:
send_aat_obs_message(parameters)
else: else:
send_aat_image_message(parameters, arg_type) send_aat_image_message(parameters, arg_type)
else: else:
......
...@@ -120,6 +120,16 @@ archive_msg_templates = DictView( ...@@ -120,6 +120,16 @@ archive_msg_templates = DictView(
"status": "complete", "status": "complete",
}, },
}, },
"obs_ingestion_complete": {
"application": "ingestor",
"message": "Observation Ingestion Complete",
"request": "observation",
"logData": {
"project_code": None,
"ingestion_type": "vlba",
"status": "complete",
},
},
"download_complete": { "download_complete": {
"application": "workflow", "application": "workflow",
"download_url": None, "download_url": None,
...@@ -281,7 +291,7 @@ class ArchiveMessageArchitect(MessageArchitectIF): ...@@ -281,7 +291,7 @@ class ArchiveMessageArchitect(MessageArchitectIF):
# image indexing is based on project not request id, use different subject # image indexing is based on project not request id, use different subject
template["subject"] = ( template["subject"] = (
f"WS workflow triggering indexer for {self.project}" f"WS workflow triggering indexer for {self.project}"
if self.key == "ingestor.images" if self.key in ["ingestor.images", "ingestion-complete.vlba"]
else f"WS workflow request #{self.request.workflow_request_id}" else f"WS workflow request #{self.request.workflow_request_id}"
) )
template["routing_key"] = self.key template["routing_key"] = self.key
......
...@@ -318,6 +318,13 @@ class WorkflowService(WorkflowServiceIF): ...@@ -318,6 +318,13 @@ class WorkflowService(WorkflowServiceIF):
elif msg_type == "img-ingestion-complete": elif msg_type == "img-ingestion-complete":
logger.info(f"Sending 'Image Ingestion Complete' indexing message to AAT for project {project_code}!") logger.info(f"Sending 'Image Ingestion Complete' indexing message to AAT for project {project_code}!")
msg_arch = ArchiveMessageArchitect(routing_key="ingestor.images", request=None, project_code=project_code) msg_arch = ArchiveMessageArchitect(routing_key="ingestor.images", request=None, project_code=project_code)
elif msg_type == "obs-ingestion-complete":
logger.info(f"Sending 'Observation Ingestion Complete' indexing message to AAT for project {project_code}!")
msg_arch = ArchiveMessageArchitect(
routing_key="ingestion-complete.vlba",
request=None,
project_code=project_code
)
else: else:
logger.info(f"Cannot determine archive message for type {msg_type}. Skipping....") logger.info(f"Cannot determine archive message for type {msg_type}. Skipping....")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment