Skip to content
Snippets Groups Projects
Commit ecee256b authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

fix ingestion message catching for autocalibration

parent a555624a
No related branches found
No related tags found
1 merge request!1042fix ingestion message catching for autocalibration
Pipeline #6462 failed
......@@ -26,7 +26,6 @@ edu.nrao.workspaces.ProcessingSettings.useCasa = false
edu.nrao.workspaces.ProcessingSettings.rootDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/spool
edu.nrao.workspaces.ProcessingSettings.scriptLocation = /lustre/aoc/cluster/pipeline/docker/workspaces/sbin
edu.nrao.workspaces.ProcessingSettings.ramInGb = 0.2G
edu.nrao.workspaces.ProcessingSettings.autoGenerateStandardCals = False
edu.nrao.workspaces.ProcessingSettings.CasaVersion.vlassSeci = /home/casa/packages/pipeline/casa-6.1.3-3-pipeline-2021.1.1.32
edu.nrao.archive.workflow.config.CasaVersions.homeForReprocessing = /home/casa/packages/pipeline/current
......
......@@ -185,7 +185,7 @@ class ArchiveService(ArchiveServiceIF):
self.router = Router("archive")
self.router.register(self)
@on_message(message="Ingestion complete")
@on_message(message="ingestion complete")
def on_ingestion_complete(self, **message: Dict):
"""
Accept an Ingestion Complete message from the archive
......@@ -194,20 +194,27 @@ class ArchiveService(ArchiveServiceIF):
"""
log_data = message["logData"]
routing_key = message["routing_key"]
sdm_id = log_data["fileset_id"]
sb_type = log_data["schedblock_type"]
telescope = log_data["telescope"]
if sb_type != "OBSERVER" or telescope != "EVLA" or "ingestion-complete.rawdata" not in routing_key:
# not a candidate for std_calibration
return None
# log message
logger.info(f"Archive Ingestion Complete Event: {log_data}")
sdm_id = log_data["fileset_id"]
if sdm_id.startswith("VLASS") or sdm_id.startswith("TSKY"):
# Do not submit VLASS or TSKY execution blocks for calibration
logger.info(f"Caught Ingestion Event was for VLASS or TSKY - ignoring.")
return None
self.create_and_submit_from_sdm_id(message)
self.create_and_submit_from_sdm_id(log_data)
def create_and_submit_from_sdm_id(self, info: Dict):
log_data = info["logData"]
def create_and_submit_from_sdm_id(self, log_data: Dict):
sdm_id = log_data["fileset_id"]
execblock_id = log_data.get("execblock_id", "")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment