Skip to content
Snippets Groups Projects

fix ingestion message catching for autocalibration

Merged Charlotte Hausman requested to merge 2.5_ingestion_messaging into main
Files
2
@@ -185,7 +185,7 @@ class ArchiveService(ArchiveServiceIF):
self.router = Router("archive")
self.router.register(self)
@on_message(message="Ingestion complete")
@on_message(message="ingestion complete")
def on_ingestion_complete(self, **message: Dict):
"""
Accept an Ingestion Complete message from the archive
@@ -195,19 +195,25 @@ class ArchiveService(ArchiveServiceIF):
log_data = message["logData"]
sdm_id = log_data["fileset_id"]
sb_type = log_data["schedblock_type"]
telescope = log_data["telescope"]
if sb_type != "OBSERVER" or telescope != "EVLA":
# not a candidate for std_calibration
return None
# log message
logger.info(f"Archive Ingestion Complete Event: {log_data}")
sdm_id = log_data["fileset_id"]
if sdm_id.startswith("VLASS") or sdm_id.startswith("TSKY"):
# Do not submit VLASS or TSKY execution blocks for calibration
logger.info(f"Caught Ingestion Event was for VLASS or TSKY - ignoring.")
return None
self.create_and_submit_from_sdm_id(message)
self.create_and_submit_from_sdm_id(log_data)
def create_and_submit_from_sdm_id(self, info: Dict):
log_data = info["logData"]
def create_and_submit_from_sdm_id(self, log_data: Dict):
sdm_id = log_data["fileset_id"]
execblock_id = log_data.get("execblock_id", "")
Loading