Skip to content
Snippets Groups Projects
Commit 4a5d5ef8 authored by Daniel Nemergut's avatar Daniel Nemergut
Browse files

Sending an ingestion-complete.vlba message to AAT when an obs ingestion was successful

parent 4857f079
No related branches found
No related tags found
1 merge request!1476Fixing post-ingestion message error
Pipeline #12228 failed
...@@ -178,6 +178,26 @@ def send_aat_image_message(parameters: dict, ingest_type: Union[IngestType, VLAS ...@@ -178,6 +178,26 @@ def send_aat_image_message(parameters: dict, ingest_type: Union[IngestType, VLAS
) )
def send_aat_obs_message(parameters: dict):
"""
Send AMQP message to AAT to set off observation (VLBA) indexing
:param parameters: dictionary of available input parameters
:return:
"""
url = parameters["workflowUrl"] + f"/workflows/requests/message-archive/obs-ingestion-complete"
payload = {"project_code": parameters["project"]}
response = requests.post(url, json=payload)
if response.status_code != http.HTTPStatus.OK:
logger.warning(
f"WARNING: Failed to send observation ingestion complete to archive for observation associated with "
f"project {parameters['project']}."
f" Please set off the index manually!"
)
def _determine_ingestion_info( def _determine_ingestion_info(
args: argparse.Namespace, args: argparse.Namespace,
) -> Tuple[Union[IngestType, VLASSIngestType], dict, IngestLauncherIF]: ) -> Tuple[Union[IngestType, VLASSIngestType], dict, IngestLauncherIF]:
...@@ -228,6 +248,8 @@ def main(): ...@@ -228,6 +248,8 @@ def main():
logger.info("Ingestion finished successfully! Sending status message to archive...") logger.info("Ingestion finished successfully! Sending status message to archive...")
if arg_type == IngestType.CAL: if arg_type == IngestType.CAL:
set_cal_status_flag(parameters) set_cal_status_flag(parameters)
elif arg_type == IngestType.OBS:
send_aat_obs_message(parameters)
else: else:
send_aat_image_message(parameters, arg_type) send_aat_image_message(parameters, arg_type)
else: else:
......
...@@ -318,6 +318,13 @@ class WorkflowService(WorkflowServiceIF): ...@@ -318,6 +318,13 @@ class WorkflowService(WorkflowServiceIF):
elif msg_type == "img-ingestion-complete": elif msg_type == "img-ingestion-complete":
logger.info(f"Sending 'Image Ingestion Complete' indexing message to AAT for project {project_code}!") logger.info(f"Sending 'Image Ingestion Complete' indexing message to AAT for project {project_code}!")
msg_arch = ArchiveMessageArchitect(routing_key="ingestor.images", request=None, project_code=project_code) msg_arch = ArchiveMessageArchitect(routing_key="ingestor.images", request=None, project_code=project_code)
elif msg_type == "obs-ingestion-complete":
logger.info(f"Sending 'Observation Ingestion Complete' indexing message to AAT for project {project_code}!")
msg_arch = ArchiveMessageArchitect(
routing_key="ingestion-complete.vlba",
request=None,
project_code=project_code
)
else: else:
logger.info(f"Cannot determine archive message for type {msg_type}. Skipping....") logger.info(f"Cannot determine archive message for type {msg_type}. Skipping....")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment