From ecee256b42de0a6f2495b6c03f4f099572abcba2 Mon Sep 17 00:00:00 2001 From: chausman <chausman@nrao.edu> Date: Fri, 26 Aug 2022 09:43:52 -0600 Subject: [PATCH] fix ingestion message catching for autocalibration --- docker.properties | 1 - .../system/services/archive_service.py | 17 ++++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/docker.properties b/docker.properties index bd38ba4f8..788bc621c 100644 --- a/docker.properties +++ b/docker.properties @@ -26,7 +26,6 @@ edu.nrao.workspaces.ProcessingSettings.useCasa = false edu.nrao.workspaces.ProcessingSettings.rootDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/spool edu.nrao.workspaces.ProcessingSettings.scriptLocation = /lustre/aoc/cluster/pipeline/docker/workspaces/sbin edu.nrao.workspaces.ProcessingSettings.ramInGb = 0.2G -edu.nrao.workspaces.ProcessingSettings.autoGenerateStandardCals = False edu.nrao.workspaces.ProcessingSettings.CasaVersion.vlassSeci = /home/casa/packages/pipeline/casa-6.1.3-3-pipeline-2021.1.1.32 edu.nrao.archive.workflow.config.CasaVersions.homeForReprocessing = /home/casa/packages/pipeline/current diff --git a/shared/workspaces/workspaces/system/services/archive_service.py b/shared/workspaces/workspaces/system/services/archive_service.py index fd28aad1a..6f744d1ff 100644 --- a/shared/workspaces/workspaces/system/services/archive_service.py +++ b/shared/workspaces/workspaces/system/services/archive_service.py @@ -185,7 +185,7 @@ class ArchiveService(ArchiveServiceIF): self.router = Router("archive") self.router.register(self) - @on_message(message="Ingestion complete") + @on_message(message="ingestion complete") def on_ingestion_complete(self, **message: Dict): """ Accept an Ingestion Complete message from the archive @@ -194,20 +194,27 @@ class ArchiveService(ArchiveServiceIF): """ log_data = message["logData"] + routing_key = message["routing_key"] + + sdm_id = log_data["fileset_id"] + sb_type = log_data["schedblock_type"] + telescope = log_data["telescope"] + + if sb_type != "OBSERVER" or telescope != "EVLA" or "ingestion-complete.rawdata" not in routing_key: + # not a candidate for std_calibration + return None # log message logger.info(f"Archive Ingestion Complete Event: {log_data}") - sdm_id = log_data["fileset_id"] if sdm_id.startswith("VLASS") or sdm_id.startswith("TSKY"): # Do not submit VLASS or TSKY execution blocks for calibration logger.info(f"Caught Ingestion Event was for VLASS or TSKY - ignoring.") return None - self.create_and_submit_from_sdm_id(message) + self.create_and_submit_from_sdm_id(log_data) - def create_and_submit_from_sdm_id(self, info: Dict): - log_data = info["logData"] + def create_and_submit_from_sdm_id(self, log_data: Dict): sdm_id = log_data["fileset_id"] execblock_id = log_data.get("execblock_id", "") -- GitLab