diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py index b0b33f26959d6438bcc23c26fa278ff1a18867d5..4b3ad0beb15ea9a7a2d63917e4ff934d4537fbd1 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingest.py @@ -24,14 +24,15 @@ from distutils.util import strtobool from typing import Tuple, Union import requests -from ingest_envoy.interfaces import IngestLauncherIF +from ingest_envoy.interfaces import LauncherIF from ingest_envoy.launchers import ( IngestCalibrationLauncher, IngestImageLauncher, IngestObservationLauncher, + CuratorLauncher, ) from ingest_envoy.solicitor import Solicitor -from ingest_envoy.utilities import IngestType, VLASSIngestType +from ingest_envoy.utilities import IngestType, VLASSIngestType, CuratorType from pycapo import CapoConfig """ @@ -44,7 +45,10 @@ logger.addHandler(logging.StreamHandler(sys.stdout)) def _get_settings( - arg_type: Union[IngestType, VLASSIngestType], filename: str = None, cal_spl: str = None, source_dir: str = None + arg_type: Union[IngestType, VLASSIngestType, CuratorType], + filename: str = None, + cal_spl: str = None, + source_dir: str = None, ) -> dict: """ Determine needed settings from metadata.json file @@ -86,34 +90,57 @@ def arg_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser( description="Workspaces Ingestion System", formatter_class=argparse.RawTextHelpFormatter ) - parser.add_argument( + # ensure we can't try running all types of ingestion at the same time + group = parser.add_mutually_exclusive_group() + group.add_argument( "--calibration", nargs=1, action="store", required=False, help="run ingestion for a calibration product", ) - parser.add_argument( + group.add_argument( "--image", nargs=1, action="store", required=False, help="run ingestion for an image product", ) - parser.add_argument( + group.add_argument( "--observation", nargs=2, action="store", required=False, help="run ingestion for an observation", ) - parser.add_argument( + group.add_argument( "--seci", nargs=2, action="store", required=False, help="run ingestion for VLASS SECI image products", ) + # Add subparser for curator commands + subparsers = parser.add_subparsers( + title="Curator Commands", + description="Commands for setting curation types", + help="Options for Curator", + ) + curate_parser = subparsers.add_parser("curate") + curate_group = curate_parser.add_mutually_exclusive_group(required="True") + curate_group.add_argument( + "--partial", + nargs=1, + action="store", + help="run partial curation for a product", + ) + curate_group.add_argument( + "--full", + nargs=1, + action="store", + help="run full curation for a product", + ) + return parser @@ -125,84 +152,84 @@ def check_ingest_type(args_type: Union[IngestType, VLASSIngestType], parameters: :param parameters: the parameters determined from metadata.json :return: boolean representing if the requested ingestion type matches the product type """ - if "workflowName" not in parameters and args_type == IngestType.OBS: + + if "workflowName" not in parameters and (args_type == IngestType.OBS or isinstance(args_type, CuratorType)): return True wf_name = parameters["workflowName"] - if args_type.value in wf_name: - return True - elif args_type == IngestType.IMG and "imaging" in wf_name: + if args_type.value in wf_name or (args_type == IngestType.IMG and "imaging" in wf_name): return True return False -def set_cal_status_flag(parameters: dict): - """ - Send AMQP message to AAT to set calibration status on EB. - - :param parameters: dictionary of available input parameters - :return: - """ - request_id = parameters["requestId"] - - url = parameters["workflowUrl"] + f"/workflows/requests/message-archive/cal-ingestion-complete" - - response = requests.post(url, json={"request_id": request_id, "project_code": parameters["project"]}) - - if response.status_code != http.HTTPStatus.OK: - logger.warning( - f"WARNING: Failed to set calibration status flag in archive for request #{request_id}." - f" Please set the flag manually!" - ) - - -def send_aat_image_message(parameters: dict, ingest_type: Union[IngestType, VLASSIngestType]): - """ - Send AMQP message to AAT to set off image indexing - - :param parameters: dictionary of available input parameters - :param ingest_type: type of ingestion that was run - :return: - """ - url = parameters["workflowUrl"] + f"/workflows/requests/message-archive/img-ingestion-complete" - if isinstance(ingest_type, VLASSIngestType): - payload = {"project_code": parameters["project"]} - else: - payload = {"request_id": parameters["requestId"], "project_code": parameters["project"]} - - response = requests.post(url, json=payload) - - if response.status_code != http.HTTPStatus.OK: - logger.warning( - f"WARNING: Failed to send image ingestion complete to archive for image associated with " - f"calibration {parameters['calSpl']} and project {parameters['project']}." - f" Please set off the index manually!" - ) - - -def send_aat_obs_message(parameters: dict): +def send_indexer_message(parameters: dict, ingest_type: Union[IngestType, VLASSIngestType, CuratorType]): """ - Send AMQP message to AAT to set off observation (VLBA) indexing + Send AMQP message to AAT to set off indexing :param parameters: dictionary of available input parameters + :param ingest_type: type of ingestion or curation that was run :return: """ - url = parameters["workflowUrl"] + f"/workflows/requests/message-archive/obs-ingestion-complete" - payload = {"project_code": parameters["project"]} + msg_suffix = "-ingestion-complete" + + match ingest_type: + case IngestType.CAL: + msg = "cal" + msg_suffix + request_id = parameters["requestId"] + payload = {"request_id": request_id, "project_code": parameters["project"]} + warn_log = ( + f"Failed to send calibration ingestion complete to archive for setting calibration status " + f"flag and indexing of request #{request_id}." + ) + case IngestType.IMG: + msg = "img" + msg_suffix + payload = {"request_id": parameters["requestId"], "project_code": parameters["project"]} + warn_log = ( + f"Failed to send image ingestion complete to archive for image associated with " + f"calibration {parameters['calSpl']} and project {parameters['project']}." + ) + case IngestType.OBS: + msg = "obs" + msg_suffix + payload = { + "project_code": parameters["project"], + "telescope": parameters["telescope"], + } + warn_log = ( + f"Failed to send observation ingestion complete to archive for observation {parameters['filename']}." + ) + case VLASSIngestType.SECI | VLASSIngestType.QUICKLOOK | VLASSIngestType.CUBE: + msg = "img" + msg_suffix + payload = {"project_code": parameters["project"]} + warn_log = ( + f"Failed to send image ingestion complete to archive for image(s) associated with " + f"calibration {parameters['calSpl']} and project {parameters['project']}." + ) + case CuratorType.FULL | CuratorType.PARTIAL: + # TODO: figure out how to handle this with non-EB products + msg = "obs" + msg_suffix + telescope = parameters["telescope"] + payload = { + "project_code": parameters["project"], + "telescope": telescope, + } + warn_log = f"Failed to send {parameters['product_type']} curation complete to archive." + case _: + msg = "" + payload = {} + warn_log = "" + logger.error(f"Error: Unknown ingest type {ingest_type}, can't send index message to AAT.") + + url = parameters["workflowUrl"] + f"/workflows/requests/message-archive/{msg}" response = requests.post(url, json=payload) if response.status_code != http.HTTPStatus.OK: - logger.warning( - f"WARNING: Failed to send observation ingestion complete to archive for observation associated with " - f"project {parameters['project']}." - f" Please set off the index manually!" - ) + logger.warning(f"WARNING: {warn_log} Please set off the indexer manually") def _determine_ingestion_info( args: argparse.Namespace, -) -> Tuple[Union[IngestType, VLASSIngestType], dict, IngestLauncherIF]: +) -> Tuple[Union[IngestType, VLASSIngestType, CuratorType], dict, LauncherIF]: """ Determine parameters and launcher needed for ingestion @@ -216,7 +243,7 @@ def _determine_ingestion_info( if args.calibration is not None: arg_type = IngestType.CAL parameters = _get_settings(arg_type, filename=args.calibration[0]) - launcher = IngestCalibrationLauncher(parameters) + launcher = IngestCalibrationLauncher(arg_type, parameters) elif args.image is not None: arg_type = IngestType.IMG parameters = _get_settings(arg_type, filename=args.image[0]) @@ -225,10 +252,18 @@ def _determine_ingestion_info( arg_type = VLASSIngestType.SECI parameters = _get_settings(arg_type, cal_spl=args.seci[0], source_dir=args.seci[1]) launcher = IngestImageLauncher(arg_type, parameters) - if args.observation is not None: + elif args.observation is not None: arg_type = IngestType.OBS parameters = _get_settings(arg_type, filename=args.observation[0], source_dir=args.observation[1]) - launcher = IngestObservationLauncher(parameters) + launcher = IngestObservationLauncher(arg_type, parameters) + elif args.partial is not None: + arg_type = CuratorType.PARTIAL + parameters = _get_settings(arg_type, filename=args.partial[0]) + launcher = CuratorLauncher(arg_type, parameters) + elif args.full is not None: + arg_type = CuratorType.FULL + parameters = _get_settings(arg_type, filename=args.full[0]) + launcher = CuratorLauncher(arg_type, parameters) return arg_type, parameters, launcher @@ -244,22 +279,20 @@ def main(): arg_type, parameters, launcher = _determine_ingestion_info(args) if check_ingest_type(args_type=arg_type, parameters=parameters): - result = launcher.launch_ingestion() + result = launcher.launch() if result == 0: # Ingestion succeeded - logger.info("Ingestion finished successfully! Sending status message to archive...") - if arg_type == IngestType.CAL: - set_cal_status_flag(parameters) - elif arg_type == IngestType.OBS: - send_aat_obs_message(parameters) - else: - send_aat_image_message(parameters, arg_type) + logger.info( + f"{'Curation' if isinstance(arg_type, CuratorType) else 'Ingestion'} finished successfully! " + f"Sending status message to archive..." + ) + send_indexer_message(parameters, arg_type) else: - logger.error(f"ERROR: {arg_type} ingestion failure! Please check logs") + logger.error(f"ERROR: {arg_type} failure! Please check logs") sys.exit(1) else: logger.error( - f"ERROR: The workflow request to be ingested does not match specified " + f"ERROR: The workflow request to be ingested or curated does not match specified " f"ingestion type {arg_type}. Exiting...." ) sys.exit(1) diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py index 88ca1f8c3c5b9b103a6a1bc40df49ffb3040629e..4f29b790d892ae2ab801bc9305143c37a9ea1c88 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/ingestion_manifest.py @@ -24,10 +24,9 @@ from pathlib import Path from pycapo import CapoConfig # pylint: disable=C0301, E0401, R0903, R1721 -from typing import Tuple +from typing import Tuple, List import arrow -from arrow import Arrow from ingest_envoy.manifest_components import ( INGESTION_ARTIFACTS_NAME, INIT_WEBLOG_FILENAME, @@ -43,6 +42,8 @@ from ingest_envoy.manifest_components import ( ManifestParameters, OutputGroup, OutputScienceProduct, + ReingestGroup, + CURATOR_MANIFEST_FILENAME, ) from ingest_envoy.schema import AbstractTextFile from ingest_envoy.std_img_manifest_utils import ImageIngestionProductsFinder @@ -55,6 +56,7 @@ from ingest_envoy.utilities import ( Telescope, find_output_tars, find_weblogs, + CuratorType, ) logger = logging.getLogger(__name__) @@ -71,12 +73,14 @@ class IngestionManifest(ManifestComponentIF): self, telescope: Telescope, sp_type: ScienceProductType, - staging_source_dir: Path, - locator: str, - # all except EVLA_EB and VLASS catalog manifest have input group - input_group: InputGroup, - # every manifest has at least one output group - output_group: OutputGroup, + staging_source_dir: Path | None, # curation has no staging directory + locator: str | None, # initial observation ingestion has no locator + # if running curator, use ReingestGroup, otherwise none + reingest_group: ReingestGroup | None, + # all except Partial Curation, EVLA_EB and VLASS catalog manifest have input group + input_group: InputGroup | None, + # every non-curator manifest has at least one output group + output_group: OutputGroup | None, # image manifest has this: additional_metadata: AbstractTextFile = None, filename: str = None, @@ -84,15 +88,34 @@ class IngestionManifest(ManifestComponentIF): self.staging_source_dir = staging_source_dir self.sp_type = sp_type self.locator = locator + self.reingest = reingest_group self.input_group = input_group self.output_group = output_group self.telescope = telescope + self.filename = filename # Check if NGAS ingestion should be enabled for all manifests in this environment - self.ngas_ingest = CapoConfig().getboolean("archive-ingestion.ingestNGAS") + self.ngas_ingest = self.get_ngas_flag() self.parameters = self.build_ingest_parameters(additional_metadata) - self.files_found = [file for file in self.staging_source_dir.iterdir()] + if staging_source_dir is not None: + # we are not running curation, find files + self.files_found = [file for file in self.staging_source_dir.iterdir()] + + def get_ngas_flag(self) -> bool: + """Determine and retrieve the correct ingestNGAS flag for this type of ingestion/curation""" + + match self.sp_type: + case ScienceProductType.EXEC_BLOCK: + flag = CapoConfig().getboolean("archive-ingestion.ingestNGAS.observation") + case ScienceProductType.EVLA_CAL: + flag = CapoConfig().getboolean("archive-ingestion.ingestNGAS.calibration") + case ScienceProductType.IMAGE | ScienceProductType.VLASS_SECI: + flag = CapoConfig().getboolean("archive-ingestion.ingestNGAS.imaging") + case _: + flag = True + + return flag def build_ingest_parameters(self, additional_metadata: AbstractTextFile): """ @@ -108,10 +131,16 @@ class IngestionManifest(ManifestComponentIF): ]: raise NotImplementedError() - if additional_metadata: + if self.reingest is not None: + params = ManifestParameters( + telescope=self.telescope, + ngas_ingest=False, + staging_source_dir=self.staging_source_dir, + ) + + elif additional_metadata: params = ManifestParameters( telescope=self.telescope, - reingest=False, ngas_ingest=self.ngas_ingest, calibrate=False, staging_source_dir=self.staging_source_dir, @@ -121,7 +150,6 @@ class IngestionManifest(ManifestComponentIF): elif self.sp_type == ScienceProductType.IMAGE: params = ManifestParameters( telescope=self.telescope, - reingest=False, ngas_ingest=self.ngas_ingest, calibrate=False, staging_source_dir=self.staging_source_dir, @@ -130,7 +158,6 @@ class IngestionManifest(ManifestComponentIF): else: params = ManifestParameters( telescope=self.telescope, - reingest=False, ngas_ingest=self.ngas_ingest, staging_source_dir=self.staging_source_dir, ) @@ -161,9 +188,11 @@ class IngestionManifest(ManifestComponentIF): :return: """ - me_dict = self.to_dict() - output_path = self.staging_source_dir / MANIFEST_FILENAME + if self.reingest is not None: + output_path = Path.cwd() / CURATOR_MANIFEST_FILENAME + else: + output_path = self.staging_source_dir / MANIFEST_FILENAME to_write = json.dumps(me_dict, indent=4) with open(output_path, "w") as out: @@ -177,20 +206,35 @@ class IngestionManifest(ManifestComponentIF): :return: """ - me_dict = dict(self.__dict__) to_return = { IngestionManifestKey.PARAMETERS.value: self.parameters.to_dict(), - IngestionManifestKey.OUTPUT_GROUP.value: me_dict[IngestionManifestKey.OUTPUT_GROUP.value].to_dict(), } - # Ingestion manifests with empty input groups can cause errors on ingest - if len(self.input_group.science_products) > 0: - to_return[IngestionManifestKey.INPUT_GROUP.value] = me_dict[ - IngestionManifestKey.INPUT_GROUP.value + # are we running curator? + if IngestionManifestKey.REINGEST.value in me_dict and me_dict[IngestionManifestKey.REINGEST.value] is not None: + to_return[IngestionManifestKey.REINGEST.value] = me_dict[IngestionManifestKey.REINGEST.value].to_dict() + + # curator manifests have no output groups + if ( + IngestionManifestKey.OUTPUT_GROUP.value in me_dict + and me_dict[IngestionManifestKey.OUTPUT_GROUP.value] is not None + ): + to_return[IngestionManifestKey.OUTPUT_GROUP.value] = me_dict[ + IngestionManifestKey.OUTPUT_GROUP.value ].to_dict() + if ( + IngestionManifestKey.INPUT_GROUP.value in me_dict + and me_dict[IngestionManifestKey.INPUT_GROUP.value] is not None + ): + # Ingestion manifests with empty input groups can cause errors on ingest + if len(self.input_group.science_products) > 0: + to_return[IngestionManifestKey.INPUT_GROUP.value] = me_dict[ + IngestionManifestKey.INPUT_GROUP.value + ].to_dict() + return to_return @@ -199,12 +243,13 @@ class IngestionManifestBuilder: def __init__( self, - staging_source_dir: Path, + staging_source_dir: Path | None, # partial curation has no source path sp_type: str, - locator: str, + locator: str | None, # initial observation ingestion has no locator telescope: str, additional_metadata: AbstractTextFile = None, filename: str = None, + curate: (CuratorType, str, List[str]) = None, ): # get the telescope self.telescope = Telescope(telescope) @@ -214,9 +259,12 @@ class IngestionManifestBuilder: # get the directory containing the files to be ingested self.staging_source_dir = staging_source_dir - # get the additional metadata, if any (EVLA CAL ingestions will have none) + # get the additional metadata, if any (EVLA CAL ingestion will have none) self.additional_metadata = additional_metadata + # are we running curator? + self.curation_type = curate[0] if curate is not None else None + # determine the science product type self.sp_type = ScienceProductType(sp_type) if self.sp_type is None or not isinstance(self.sp_type, ScienceProductType): @@ -229,12 +277,23 @@ class IngestionManifestBuilder: ]: raise NotImplementedError(f"Don't know yet how to build a {self.sp_type.value} manifest") + if self.curation_type is not None: + if self.curation_type not in [CuratorType.PARTIAL]: + raise NotImplementedError( + f"Don't know how to build a {self.curation_type.value} curation {self.sp_type} manifest" + ) + self.curation_source = curate[1] + self.target_list = curate[2] + if locator is not None: + # we are not running observation ingestion, use a locator self.locator = locator - self.files_found = [file for file in staging_source_dir.iterdir()] - if len(self.files_found) == 0: - raise IngestionManifestException(f"No ingestion files found at {staging_source_dir}") + if staging_source_dir is not None: + # we are not running curation, look for files + self.files_found = [file for file in staging_source_dir.iterdir()] + if len(self.files_found) == 0: + raise IngestionManifestException(f"No ingestion files found at {staging_source_dir}") if filename is not None: self.filename = filename @@ -246,6 +305,8 @@ class IngestionManifestBuilder: :return: the ingestion manifest and the file containing its JSON """ + if self.curation_type == CuratorType.PARTIAL: + return self._build_curation_manifest() if self.sp_type == ScienceProductType.EVLA_CAL: return self._build_evla_cal_manifest() @@ -254,13 +315,35 @@ class IngestionManifestBuilder: return self._build_image_manifest() - def _build_evla_cal_manifest(self): + def _build_curation_manifest(self) -> (IngestionManifest, Path): + """ + Build a manifest for curator + Partial curation is simple, only parameters and reingest groups are needed + + :return: the manifest file object and path + """ + manifest = IngestionManifest( + telescope=self.telescope, + staging_source_dir=self.curation_source, + locator=self.locator, + sp_type=self.sp_type, + reingest_group=self._build_reingest_group(), + input_group=None, + output_group=None, + ) + + manifest_file = manifest.write() + + return manifest, manifest_file + + def _build_evla_cal_manifest(self) -> (IngestionManifest, Path): # create the manifest manifest = IngestionManifest( telescope=self.telescope, locator=self.locator, sp_type=self.sp_type, staging_source_dir=self.staging_source_dir, + reingest_group=None, input_group=self._build_input_group(), output_group=self._build_evla_cal_output_group(), ) @@ -285,13 +368,14 @@ class IngestionManifestBuilder: return manifest, manifest_file - def _build_observation_manifest(self): + def _build_observation_manifest(self) -> (IngestionManifest, Path): # create the manifest manifest = IngestionManifest( telescope=self.telescope, locator=None, sp_type=self.sp_type, staging_source_dir=self.staging_source_dir, + reingest_group=None, input_group=InputGroup([]), output_group=self._build_observation_output_group(), filename=self.filename, @@ -324,7 +408,7 @@ class IngestionManifestBuilder: return None - def _build_image_manifest(self): + def _build_image_manifest(self) -> (IngestionManifest, Path): """ Image manifest has additional_metadata, and output group is way more complicated :return: @@ -337,6 +421,7 @@ class IngestionManifestBuilder: additional_metadata=self.additional_metadata, sp_type=self.sp_type, staging_source_dir=self.staging_source_dir, + reingest_group=None, input_group=self._build_input_group(), output_group=self._build_imaging_output_group(), ) @@ -350,7 +435,19 @@ class IngestionManifestBuilder: return manifest, manifest_file - def _build_input_group(self): + def _build_reingest_group(self) -> ReingestGroup: + """ + Create the curation reingest group + + :return: + """ + return ReingestGroup( + locator=self.locator, + product_type=self.sp_type, + targets=self.target_list, + ) + + def _build_input_group(self) -> InputGroup: """ Create the input group using the parameters. @@ -364,7 +461,7 @@ class IngestionManifestBuilder: return InputGroup([sp_in]) - def _build_evla_cal_output_group(self): + def _build_evla_cal_output_group(self) -> OutputGroup: """ Create EVLA standard calibration manifest output group using the parameters and the contents of the staging dir. @@ -458,7 +555,7 @@ class IngestionManifestBuilder: return ingestion_artifacts_tar -def format_timestamp(datetime: Arrow) -> str: +def format_timestamp(datetime: arrow.Arrow) -> str: """ Format the current time as 2021_07_01T13_49_17.237 diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/interfaces.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/interfaces.py index ac3ce7b7ec740590aadec76a0e6611c867659f98..35e2f796b5eb6fcbef93faf8b90a5cebee924453 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/interfaces.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/interfaces.py @@ -24,7 +24,7 @@ from abc import ABC from ingest_envoy.schema import AbstractTextFile -class IngestLauncherIF(ABC): +class LauncherIF(ABC): """ Generic Ingestion Launcher methods. Should be implemented for any type of ingestion launcher @@ -32,7 +32,7 @@ class IngestLauncherIF(ABC): # launch all ingest steps, should be called in ingest.main @abc.abstractmethod - def launch_ingestion(self) -> int: + def launch(self) -> int: """ Prepare for and run ingest @@ -42,7 +42,7 @@ class IngestLauncherIF(ABC): # setup workflow results for ingestion, ensure placement in staging area @abc.abstractmethod - def prepare_for_ingest(self): + def prepare_for_launch(self): """ Prepare files for ingestion, creation of collection tars and manifest diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py index 866d679f6e2daa7ff5e39d85314c6a5446fe8d6a..8407f279527d79608aae9c07ecc4a9eb8f20805b 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/launchers.py @@ -28,9 +28,9 @@ from ingest_envoy.collectors import ( collect_image_metadata, ) from ingest_envoy.ingestion_manifest import IngestionManifestBuilder -from ingest_envoy.interfaces import IngestLauncherIF +from ingest_envoy.interfaces import LauncherIF from ingest_envoy.schema import AbstractTextFile -from ingest_envoy.utilities import IngestType, VLASSIngestType +from ingest_envoy.utilities import IngestType, VLASSIngestType, CuratorType def trigger_ingest(real_ingest: bool, staging_dir: str, bin_dir: str = ".") -> int: @@ -39,7 +39,7 @@ def trigger_ingest(real_ingest: bool, staging_dir: str, bin_dir: str = ".") -> i :param real_ingest: real ingestion vs. testing flag :param staging_dir: staging directory to ingest from - :param bin_dir: directory containing the ingestion script + :param bin_dir: directory containing the ingestion utility :return: return code """ if real_ingest: @@ -55,28 +55,50 @@ def trigger_ingest(real_ingest: bool, staging_dir: str, bin_dir: str = ".") -> i return 0 -class IngestCalibrationLauncher(IngestLauncherIF): +def trigger_curator(real_ingest: bool, bin_dir: str = ".") -> int: + """ + Run curator + + :param real_ingest: real curation or testing only? + :param bin_dir: directory containing the curator utility + :return: return code + """ + + if real_ingest: + # manifest location is known, no need for paths + curator_process = subprocess.run( + [f"{bin_dir}/curator", "-m", f"{Path.cwd() / 'manifest.json'}"], + stdout=sys.stdout, + stderr=sys.stderr, + ) + return curator_process.returncode + else: + # for local testing + return 0 + + +class IngestCalibrationLauncher(LauncherIF): """Setup and Launch Calibration Ingestion""" - def __init__(self, parameters: dict): + def __init__(self, arg_type: IngestType, parameters: dict): self.logger = logging.getLogger("ingest_envoy") - self.sci_product_type = "calibration" + self.sci_product_type = arg_type.value.__str__() self.parameters = parameters self.staging_source_dir = self.parameters["staging_area"] + "/" + self.parameters["workflowDir"] - def launch_ingestion(self) -> int: + def launch(self) -> int: """ Prepare and run ingestion script :return: Return code of ingestion script process """ self.logger.info("RUNNING CALIBRATION INGESTION!") - self.prepare_for_ingest() + self.prepare_for_launch() self.logger.info("Running ingest!") return trigger_ingest(self.parameters["useIngest"], self.staging_source_dir) - def prepare_for_ingest(self): + def prepare_for_launch(self): """ Prepare files for ingestion, creation of collection tars and manifest @@ -141,32 +163,31 @@ class IngestCalibrationLauncher(IngestLauncherIF): IngestionManifestBuilder(Path(self.staging_source_dir), self.sci_product_type, spl, telescope).build() -class IngestImageLauncher(IngestLauncherIF): +class IngestImageLauncher(LauncherIF): """Setup and Launch Image Ingestion""" def __init__(self, arg_type: Union[IngestType, VLASSIngestType], parameters: dict): self.logger = logging.getLogger("ingest_envoy") - self.type = arg_type - self.sci_product_type = arg_type.value + self.sci_product_type = arg_type.value.__str__() self.parameters = parameters self.staging_source_dir = self.parameters["staging_area"] + "/" + self.parameters["workflowDir"] self.collector = ( ImageCollector(self.parameters) if isinstance(arg_type, IngestType) else SECICollector(self.parameters) ) - def launch_ingestion(self) -> int: + def launch(self) -> int: """ Prepare and run ingestion script :return: Return code from ingestion script process """ self.logger.info("RUNNING IMAGE INGESTION!") - self.prepare_for_ingest() + self.prepare_for_launch() self.logger.info("Running ingest!") return trigger_ingest(self.parameters["useIngest"], self.staging_source_dir) - def prepare_for_ingest(self): + def prepare_for_launch(self): """ Prepare files for ingestion, creation of collection tars and manifest @@ -211,29 +232,29 @@ class IngestImageLauncher(IngestLauncherIF): ).build() -class IngestObservationLauncher(IngestLauncherIF): +class IngestObservationLauncher(LauncherIF): """Setup and Launch Observation Ingestion""" - def __init__(self, parameters: dict): + def __init__(self, arg_type: IngestType, parameters: dict): self.logger = logging.getLogger("ingest_envoy") - self.sci_product_type = "execution_block" + self.sci_product_type = arg_type.value.__str__() self.parameters = parameters self.staging_source_dir = self.parameters["staging_area"] + "/" + self.parameters["workflowDir"] self.collector = ObservationCollector(self.parameters) - def launch_ingestion(self) -> int: + def launch(self) -> int: """ Prepare and run ingestion script :return: Return code of ingestion script process """ self.logger.info("RUNNING OBSERVATION INGESTION!") - self.prepare_for_ingest() + self.prepare_for_launch() self.logger.info("Running ingest!") return trigger_ingest(self.parameters["useIngest"], self.staging_source_dir, self.parameters["script_location"]) - def prepare_for_ingest(self): + def prepare_for_launch(self): """ Prepare files for ingestion, creation of collection tars and manifest @@ -265,5 +286,62 @@ class IngestObservationLauncher(IngestLauncherIF): filename = self.parameters["filename"] IngestionManifestBuilder( - Path(self.staging_source_dir), self.sci_product_type, None, telescope, filename=filename + Path(self.staging_source_dir), + self.sci_product_type, + None, + telescope, + filename=filename, ).build() + + +class CuratorLauncher(LauncherIF): + """Setup and Launch Curator""" + + def __init__(self, arg_type: CuratorType, parameters: dict): + self.logger = logging.getLogger("ingest_envoy") + self.curator_type = arg_type + self.sci_product_type = parameters["product_type"] + self.parameters = parameters + + def prepare_for_launch(self): + """ + Prepare for curation, manifest building + + :return: + """ + self.logger.info("Preparing for curator...") + # no collection needed for curation, manifest building only + self.create_manifest() + + def create_manifest(self, additional_file: AbstractTextFile = None): + """ + Create the curation manifest + + :param additional_file: Unneeded for curation + :return: + """ + self.logger.info("Creating curation manifest...") + spl = self.parameters["spl"] + telescope = self.parameters["telescope"] + curation_source = self.parameters["curation_source"] + target_list = self.parameters["target_list"] + + IngestionManifestBuilder( + staging_source_dir=None, + sp_type=self.sci_product_type, + locator=spl, + telescope=telescope, + curate=(self.curator_type, curation_source, target_list), + ).build() + + def launch(self) -> int: + """ + Prepare and run curation + + :return: return code from curator + """ + self.logger.info(f"RUNNING {self.sci_product_type.upper()} CURATION!") + self.prepare_for_launch() + + self.logger.info("Running curator!") + return trigger_curator(self.parameters["useIngest"]) diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/manifest_components.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/manifest_components.py index 6e53946d4ff78d1c3c331a4ee255a903da049b2f..328cbc3a099342896ff4197b8f724758bf6da79d 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/manifest_components.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/manifest_components.py @@ -27,6 +27,7 @@ from ingest_envoy.schema import AbstractTextFile from ingest_envoy.utilities import AncillaryProductType, ScienceProductType, Telescope MANIFEST_FILENAME = "ingestion-manifest.json" +CURATOR_MANIFEST_FILENAME = "manifest.json" INGESTION_ARTIFACTS_NAME = "ingestion_artifacts_" TARFILE_EXT = ".tar" WEBLOG_FILENAME = "weblog.tgz" @@ -43,9 +44,9 @@ class IngestionManifestKey(Enum): PARAMETERS = "parameters" INPUT_GROUP = "input_group" OUTPUT_GROUP = "output_group" - INGESTION_PATH = "ingestion_path" SCIENCE_PRODUCTS = "science_products" ANCILLARY_PRODUCTS = "ancillary_products" + REINGEST = "reingest" class ParamsKey(Enum): @@ -56,8 +57,17 @@ class ParamsKey(Enum): REINGEST = "reingest" NGAS_INGEST = "ngas_ingest" CALIBRATE = "calibrate" - INGESTION_PATH = IngestionManifestKey.INGESTION_PATH.value + INGESTION_PATH = "ingestion_path" ADDITIONAL_METADATA = "additional_metadata" + COLLECTION_METADATA = "additional_metadata" # needed for VLASS, realfast, elwa, alfalfa. Not yet implemented + + +class ReingestKey(Enum): + """Keys in ReingestGroup""" + + TARGETS = "targets" + LOCATOR = "locator" + TYPE = "type" class ManifestComponentIF(abc.ABC): @@ -67,15 +77,6 @@ class ManifestComponentIF(abc.ABC): def __eq__(self, other): return NotImplemented - # @abc.abstractmethod - # def to_json(self) -> JSON: - # """ - # JSON-ify this object - # - # :return: json.load()-able string - # """ - # return NotImplemented - @abc.abstractmethod def to_dict(self) -> Dict: """ @@ -143,27 +144,25 @@ class ManifestParameters(ManifestComponentIF): def __init__( self, telescope: Telescope, - reingest: bool, ngas_ingest: bool, - staging_source_dir: Path, + staging_source_dir: Path | None, additional_metadata: AbstractTextFile = None, calibrate: bool = None, ): self.telescope = telescope - self.reingest = reingest self.ngas_ingest = ngas_ingest if calibrate is not None: self.calibrate = calibrate - self.staging_source_dir = staging_source_dir + if staging_source_dir is not None: + self.staging_source_dir = staging_source_dir self.additional_metadata = additional_metadata def __eq__(self, other): if isinstance(other, ManifestParameters): return ( other.telescope.value == self.telescope.value - and other.reingest == self.reingest and other.ngas_ingest == self.ngas_ingest and other.calibrate == self.calibrate and other.staging_source_dir == self.staging_source_dir @@ -183,10 +182,10 @@ class ManifestParameters(ManifestComponentIF): ParamsKey.TELESCOPE.value: self.telescope.value, # The ingestion manifest must have "true" and "false" # rather than "True" and "False" - ParamsKey.REINGEST.value: self.reingest, ParamsKey.NGAS_INGEST.value: self.ngas_ingest, - ParamsKey.INGESTION_PATH.value: str(self.staging_source_dir), } + if hasattr(self, "staging_source_dir"): + json_dict[ParamsKey.INGESTION_PATH.value] = str(self.staging_source_dir) if hasattr(self, "calibrate"): json_dict[ParamsKey.CALIBRATE.value] = self.calibrate if self.additional_metadata: @@ -313,3 +312,41 @@ class OutputGroup(ManifestComponentIF): me_dict[IngestionManifestKey.ANCILLARY_PRODUCTS.value] = ap_jsons return me_dict + + +class ReingestGroup(ManifestComponentIF): + """Generic manifest reingest group""" + + def __init__( + self, + locator: str, + product_type: ScienceProductType, + targets: List[str] = None, # optional: presence determines Partial vs Full curation. + ): + self.locator = locator + self.product_type = product_type + self.targets = targets + + def __eq__(self, other): + if isinstance(other, ReingestGroup): + return ( + other.locator == self.locator + and other.product_type == self.product_type + and other.targets == self.targets + ) + + return False + + def to_dict(self) -> JSON: + """ + Turn me into a json-ifiable dict + + :return: dict + """ + json_dict = { + ReingestKey.TARGETS.value: self.targets, + ReingestKey.LOCATOR.value: self.locator, + ReingestKey.TYPE.value: self.product_type.value, + } + + return json_dict diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py index e3d9ddb4c84f83b8298426caa74eaf0f2cc35220..4696b7ff6df2a598ebd3a45c7d385b8ee305777e 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/solicitor.py @@ -25,13 +25,13 @@ import pathlib from typing import List, Union import requests -from ingest_envoy.utilities import IngestType, VLASSIngestType +from ingest_envoy.utilities import IngestType, VLASSIngestType, CuratorType INVALID_INITIAL_VERSION = "Initial version not valid for ingest" class Solicitor: - def __init__(self, arg_type: Union[IngestType, VLASSIngestType], urls: List, filename: str = None): + def __init__(self, arg_type: Union[IngestType, VLASSIngestType, CuratorType], urls: List, filename: str = None): if filename is not None: self.filename = filename self.metadata = self.solicit_contents(self.filename) @@ -219,6 +219,33 @@ class Solicitor: return {**obs} + def solicit_curation_params(self) -> dict: + """ + Determine curation specific parameters + + :return: dict + """ + + if self.argument == CuratorType.PARTIAL and "target_list" not in self.metadata: + targets = [] # empty list is untargeted partial curation + elif self.argument == CuratorType.FULL: + targets = None + else: + targets = self.metadata["target_list"] + + params = { + "telescope": self.metadata["projectMetadata"]["telescope"], # all, needed by manifest generator + "project": self.metadata["projectMetadata"]["projectCode"], # needed for post ingestion messaging + "spl": self.metadata["product_locator"], + "product_type": self.metadata["product_type"], + "curation_source": self.metadata["data_location"] + if "data_location" in self.metadata + else None, # not required for curation + "target_list": targets, + } + + return {**params} + def solicit_seci_params(self) -> dict: """ Determine SECI image specific parameters @@ -244,11 +271,14 @@ class Solicitor: :return: parameters dict """ - if self.argument == IngestType.CAL: - return self.solicit_calibration_params() - elif self.argument == IngestType.IMG: - return self.solicit_image_params() - elif self.argument == IngestType.OBS: - return self.solicit_observation_params() - elif self.argument == VLASSIngestType.SECI: - return self.solicit_seci_params() + match self.argument: + case IngestType.CAL: + return self.solicit_calibration_params() + case IngestType.IMG: + return self.solicit_image_params() + case IngestType.OBS: + return self.solicit_observation_params() + case VLASSIngestType.SECI: + return self.solicit_seci_params() + case CuratorType.PARTIAL | CuratorType.FULL: + return self.solicit_curation_params() diff --git a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/utilities.py b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/utilities.py index 07674c9a906a8de4ab9b73cc5ae394d8eca490a6..93717f60dcc123b3803674202c8c06072f9156e1 100644 --- a/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/utilities.py +++ b/apps/cli/executables/pexable/ingest_envoy/ingest_envoy/utilities.py @@ -31,7 +31,14 @@ class IngestType(Enum): CAL = "calibration" IMG = "image" - OBS = "observation_data" + OBS = "execution_block" + + +class CuratorType(Enum): + """Types of Data Curation (i.e. Re-Ingestion)""" + + PARTIAL = "partial" + FULL = "full" class VLASSIngestType(Enum): @@ -73,7 +80,7 @@ class AncillaryProductType(Enum): PIPELINE_WEBLOG = "pipeline_weblog" LOG_TYPE = "observation_log" - ### Images ### + # Images # # our default FITS type FITS = "fits_image" diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_Realfast_SDM.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_Realfast_SDM.json deleted file mode 100644 index b7ef437c6f6da713859a1342f92b274a427e33a7..0000000000000000000000000000000000000000 --- a/apps/cli/executables/pexable/ingest_envoy/test/examples/Manifest_Realfast_SDM.json +++ /dev/null @@ -1,40 +0,0 @@ -{ - "parameters": { - "reingest": false, - "ngas_ingest": false, - "telescope": "EVLA", - "ingestion_path": "/Need/To/Decide/About/This", - "collection_metadata": "realfast.json" - }, - "output_group": { - "science_products": [ - { - "type": "execution_block", - "filename": "realfast_20A-333.place.holder.here", - "ancillary_products": [ - { - "type": "candidate_image", - "filename": "aaaa.png" - }, - { - "type": "candidate_image", - "filename": "bbbb.png" - } - ] - } - ], - "ancillary_products": [ - { - "type": "ingestion_artifacts", - "filename": "ingestion_artifacts_2020_07_17_T13_45_39.902.tar" - } - ] - }, - "associate_group": { - "science_products": [ - { - "locator": "uid://evla/execblock/abcd-efgh-ijkl-mnop" - } - ] - } -} diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_cal_final_manifest.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_cal_final_manifest.json index c4ae82b2e57aa0d8ba2afd1eae9c949fc6975dff..1a47f80ac9878103c97314e7595baf3404e783e1 100644 --- a/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_cal_final_manifest.json +++ b/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_cal_final_manifest.json @@ -1,6 +1,5 @@ { "parameters": { - "reingest": false, "ngas_ingest": true, "telescope": "EVLA", "ingestion_path": "/lustre/aoc/cluster/pipeline/dsoc-prod/stage_products/20A-346_2021_07_23_T13_37_08.376" diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_cal_manifest_2021-08-02.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_cal_manifest_2021-08-02.json index e4dc509bbe1590e59c7d1215f0f6c33df5a88446..026337514b39249d7afe1de6c2c8d57698654642 100644 --- a/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_cal_manifest_2021-08-02.json +++ b/apps/cli/executables/pexable/ingest_envoy/test/examples/evla_cal_manifest_2021-08-02.json @@ -1,6 +1,5 @@ { "parameters": { - "reingest": false, "ngas_ingest": true, "telescope": "EVLA", "ingestion_path": "/lustre/aoc/cluster/pipeline/dsoc-prod/stage_products/20A-346_2021_07_23_T13_37_08.376" diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/image_set_manifest.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/image_set_manifest.json deleted file mode 100644 index 23a812756b2eb27ffae5b2326780af7172bc2e06..0000000000000000000000000000000000000000 --- a/apps/cli/executables/pexable/ingest_envoy/test/examples/image_set_manifest.json +++ /dev/null @@ -1,60 +0,0 @@ -{ - "parameters": { - "reingest": "false", - "ngas_ingest": "false", - "calibrate": "false", - "ingestion_path": "/lustre/.." - }, - "input_group": { - "science_products": [ - { - "type": "calibration", - "locator": "...." - } - ] - }, - "output_group": { - "science_products": [ - { - "type": "image", - "filename": "image1.fits", - "ancillary-products": [ - { - "type": "image_mask", - "filename": "image1.mask.fits" - } - ] - }, - { - "type": "image", - "filename": "image2.fits", - "ancillary-products": [ - { - "type": "image_mask", - "filename": "image2.mask.fits" - } - ] - }, - { - "type": "image", - "filename": "image3.fits", - "ancillary-products": [ - { - "type": "image_mask", - "filename": "image3.mask.fits" - } - ] - } - ], - "ancillary-products": [ - { - "type": "weblog", - "filename": "weblog.tgz" - }, - { - "type": "tar", - "filename": "imaging_extras.tar" - } - ] - } -} diff --git a/apps/cli/executables/pexable/ingest_envoy/test/examples/vlass_catalog_manifest.json b/apps/cli/executables/pexable/ingest_envoy/test/examples/vlass_catalog_manifest.json deleted file mode 100644 index 86d44f8e03c597661576a2cd15e75f92495f69ed..0000000000000000000000000000000000000000 --- a/apps/cli/executables/pexable/ingest_envoy/test/examples/vlass_catalog_manifest.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "parameters": { - "reingest": "false", - "ngas_ingest": "false", - "calibrate": "false", - "ingestion_path": "/lustre/...../" - }, - "science-products": [ - { - "type": "vlass_catalog", - "filename": "imaging_catalog_example.cat", - "group_with": ".... (locator for an image)" - } - ] -} diff --git a/apps/cli/executables/pexable/ingest_envoy/test/test_evla_cal_manifest.py b/apps/cli/executables/pexable/ingest_envoy/test/test_evla_cal_manifest.py index 3eb8025d5645d89e260b3021667a898582813286..f588fc210520e4b46b03749ad7b2f3920ca454c1 100644 --- a/apps/cli/executables/pexable/ingest_envoy/test/test_evla_cal_manifest.py +++ b/apps/cli/executables/pexable/ingest_envoy/test/test_evla_cal_manifest.py @@ -43,6 +43,7 @@ from ingest_envoy.manifest_components import ( ManifestParameters, OutputGroup, OutputScienceProduct, + ParamsKey, ) from ingest_envoy.utilities import AncillaryProductType, ScienceProductType, Telescope @@ -88,7 +89,7 @@ def test_filters_cal_input_files(ingest_path: Path): assert manifest.locator == locator params = manifest.parameters - assert params.reingest is False and params.ngas_ingest is True + assert params.ngas_ingest is True assert not hasattr(params, "calibrate") input_group = manifest.input_group @@ -162,7 +163,6 @@ def test_params_json_well_formed(): """ params = ManifestParameters( telescope=Telescope.EVLA, - reingest=False, ngas_ingest=False, calibrate=False, staging_source_dir=Path("/home/mchammer/evla/parallel-prod"), @@ -331,13 +331,11 @@ def test_evla_cal_manifest_matches_example(ingest_path: Path): with open(manifest_file, "r") as infile: actual_json = dict(json.load(infile).items()) - actual_json[IngestionManifestKey.PARAMETERS.value][IngestionManifestKey.INGESTION_PATH.value] = expected_dir_name + actual_json[IngestionManifestKey.PARAMETERS.value][ParamsKey.INGESTION_PATH.value] = expected_dir_name expected_params = expected_json["parameters"] actual_params = manifest.parameters.to_dict() - expected_reingest = expected_params["reingest"] - assert actual_params["reingest"] == expected_reingest expected_ngas_ingest = expected_params["ngas_ingest"] assert actual_params["ngas_ingest"] == expected_ngas_ingest @@ -435,13 +433,11 @@ def test_evla_cal_final_manifest_matches_example(ingest_path: Path): with open(manifest_file, "r") as infile: actual_json = dict(json.load(infile).items()) - actual_json[IngestionManifestKey.PARAMETERS.value][IngestionManifestKey.INGESTION_PATH.value] = expected_dir_name + actual_json[IngestionManifestKey.PARAMETERS.value][ParamsKey.INGESTION_PATH.value] = expected_dir_name expected_params = expected_json["parameters"] actual_params = manifest.parameters.to_dict() - expected_reingest = expected_params["reingest"] - assert actual_params["reingest"] == expected_reingest expected_ngas_ingest = expected_params["ngas_ingest"] assert actual_params["ngas_ingest"] == expected_ngas_ingest diff --git a/apps/cli/executables/pexable/ingest_envoy/test/test_launchers.py b/apps/cli/executables/pexable/ingest_envoy/test/test_launchers.py index c99bc10e3fabd24e008c2e3ef6861be0304e9451..aef5abf5194c5834d52b5c1ca0853b742f3d73ae 100644 --- a/apps/cli/executables/pexable/ingest_envoy/test/test_launchers.py +++ b/apps/cli/executables/pexable/ingest_envoy/test/test_launchers.py @@ -65,9 +65,9 @@ def test_trigger_ingest(mock_run): class TestIngestCalibrationLauncher: def test_launch_ingestion(self): - with patch("ingest_envoy.launchers.IngestCalibrationLauncher.prepare_for_ingest") as prepare: + with patch("ingest_envoy.launchers.IngestCalibrationLauncher.prepare_for_launch") as prepare: with patch("ingest_envoy.launchers.trigger_ingest") as ingest: - IngestCalibrationLauncher(parameters).launch_ingestion() + IngestCalibrationLauncher(IngestType.CAL, parameters).launch() assert prepare.call_count == 1 assert ingest.call_count == 1 @@ -75,35 +75,35 @@ class TestIngestCalibrationLauncher: def test_prepare_for_ingest(self, mock_run): with patch("ingest_envoy.launchers.IngestCalibrationLauncher.run_collection_script") as collector: with patch("ingest_envoy.launchers.IngestCalibrationLauncher.create_manifest") as manifest: - IngestCalibrationLauncher(parameters).prepare_for_ingest() + IngestCalibrationLauncher(IngestType.CAL, parameters).prepare_for_launch() assert collector.call_count == 1 assert manifest.call_count == 1 @patch("subprocess.run") def test_run_collection_script(self, mock_run): mock_run.return_value.returncode = 0 - IngestCalibrationLauncher(parameters).run_collection_script() + IngestCalibrationLauncher(IngestType.CAL, parameters).run_collection_script() assert mock_run.call_count == 1 @pytest.mark.skip("Skip. Ignores manifest builder mock") def test_create_manifest(self): with patch("ingest_envoy.ingestion_manifest.IngestionManifestBuilder.build") as manifest: - IngestCalibrationLauncher(parameters).create_manifest() + IngestCalibrationLauncher(IngestType.CAL, parameters).create_manifest() assert manifest.call_count == 1 class TestIngestImageLauncher: def test_launch_ingestion(self): - with patch("ingest_envoy.launchers.IngestImageLauncher.prepare_for_ingest") as prepare: + with patch("ingest_envoy.launchers.IngestImageLauncher.prepare_for_launch") as prepare: with patch("ingest_envoy.launchers.trigger_ingest") as ingest: - IngestImageLauncher(IngestType.IMG, image_parameters).launch_ingestion() + IngestImageLauncher(IngestType.IMG, image_parameters).launch() assert prepare.call_count == 1 assert ingest.call_count == 1 def test_prepare_for_ingest(self): with patch("ingest_envoy.launchers.IngestImageLauncher.run_collector") as collector: with patch("ingest_envoy.launchers.IngestImageLauncher.create_manifest") as manifest: - IngestImageLauncher(IngestType.IMG, image_parameters).prepare_for_ingest() + IngestImageLauncher(IngestType.IMG, image_parameters).prepare_for_launch() assert collector.call_count == 1 assert manifest.call_count == 1 diff --git a/docker.properties b/docker.properties index d689fafb7135d668dfd5d08c5061107c1797e4ff..fe2faf36d924579ee452a3ba68f65d55efbaee23 100644 --- a/docker.properties +++ b/docker.properties @@ -43,7 +43,9 @@ edu.nrao.workspaces.VlassDeliverySettings.cache = /lustre/aoc/cluster/pipeline/v edu.nrao.workspaces.IngestionSettings.useIngest = False edu.nrao.workspaces.IngestionSettings.stagingDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/staging edu.nrao.workspaces.IngestionSettings.storageDirectory = /lustre/aoc/cluster/pipeline/docker/workspaces/storage -archive-ingestion.ingestNGAS = true +archive-ingestion.ingestNGAS.observation = true +archive-ingestion.ingestNGAS.calibration = true +archive-ingestion.ingestNGAS.imaging = true # # Standard Calibration Settings diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index a288457703b7cb5c0fd2fd9b2a4f84a42663a808..cccb9b15f7eb41d60587cc2b682cb2cd1b14c6a5 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -21,7 +21,6 @@ import http import json import logging -import os import time from json import JSONDecodeError from pathlib import Path @@ -29,7 +28,6 @@ from pathlib import Path import prometheus_client import transaction import zope.sqlalchemy -from pycapo import CapoConfig from pyramid.config import Configurator from pyramid.renderers import JSONP from pyramid.request import Request @@ -45,7 +43,6 @@ from workspaces.workflow.services.workflow_info import WorkflowInfo from workspaces.workflow.services.workflow_service import ( WorkflowMessageHandler, WorkflowService, - subprocess, ) logger = logging.getLogger(__name__) @@ -59,9 +56,6 @@ def metrics(environ, start_response): return prometheus_exporter(environ, start_response) -logger = logging.getLogger(__name__) - - # --------------------------------------------------------- # # S E R V I C E S @@ -504,7 +498,12 @@ class WorkflowRequestRestService: identifier = int(body["request_id"]) if "request_id" in body else body["project_code"] msg_type = self.request.matchdict["msg_type"] - additional = body["project_code"] if "project_code" in body and identifier != body["project_code"] else None + additional = None + + if "project_code" in body and isinstance(identifier, int): + additional = body["project_code"] + elif "telescope" in body and isinstance(identifier, str): + additional = body["telescope"] self.request.workflows.message_archive(identifier, msg_type, additional) return Response( diff --git a/shared/workspaces/workspaces/workflow/message_architect.py b/shared/workspaces/workspaces/workflow/message_architect.py index faaa32a631f7300687e4e13d9aca907f8cfab408..6547e304c70221b8118aad1006556f3fa47ef763 100644 --- a/shared/workspaces/workspaces/workflow/message_architect.py +++ b/shared/workspaces/workspaces/workflow/message_architect.py @@ -126,7 +126,8 @@ archive_msg_templates = DictView( "request": "observation", "logData": { "project_code": None, - "ingestion_type": "vlba", + "telescope": None, + "ingestion_type": None, "status": "complete", }, }, @@ -243,6 +244,7 @@ class ArchiveMessageArchitect(MessageArchitectIF): project_code: str = None, delivery_info: dict = None, stage_info: dict = None, + telescope: str = None, ): """ Initialize Architect @@ -260,6 +262,7 @@ class ArchiveMessageArchitect(MessageArchitectIF): self.project = project_code self.delivery = delivery_info self.stage_info = stage_info + self.telescope = telescope @staticmethod def get_message_template(msg_type: str) -> dict: @@ -322,4 +325,23 @@ class ArchiveMessageArchitect(MessageArchitectIF): template["start"] = self.stage_info["start"] template["stop"] = self.stage_info["stop"] + if self.telescope is not None: + template["telescope"] = self.telescope + template["ingestion_type"] = self._determine_indexer_type(self.telescope) + return DictView(template) + + @staticmethod + def _determine_indexer_type(telescope: str) -> str: + """ + What type of indexing do we need? + + :param telescope: the ingested/curated product's telescope + :return: the indexer type to use + """ + if telescope.upper() == "VLBA" or telescope.upper() == "GMVA": + return "vlba" + if telescope.upper() == "EVLA" or telescope.upper() == "ALMA": + return f"{telescope.lower()}_sdm" + else: + logger.error(f"Error: Unknown telescope {telescope}, can't send index message to AAT.") diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 77dfed54b3c7e250314e37914a1364a89351c071..64328808986cc003e253c8fde99fc04f7b6c28b6 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -286,6 +286,7 @@ class WorkflowService(WorkflowServiceIF): wf_request = None sdm_id = None project_code = None + telescope = None if isinstance(identifier, int): # a request should exist, find it @@ -300,8 +301,11 @@ class WorkflowService(WorkflowServiceIF): if additional_field is not None: project_code = additional_field else: - # I'm VLASS and decided to be special <unhappy tears here> + # I'm initial ingestion, curation, or VLASS project_code = identifier + # needed to determine indexer type in initial ingestion or curation + if additional_field is not None: + telescope = additional_field if msg_type == "do-not-calibrate": # this should only send from the WS QA interface @@ -320,10 +324,9 @@ class WorkflowService(WorkflowServiceIF): msg_arch = ArchiveMessageArchitect(routing_key="ingestor.images", request=None, project_code=project_code) elif msg_type == "obs-ingestion-complete": logger.info(f"Sending 'Observation Ingestion Complete' indexing message to AAT for project {project_code}!") + key = "vlba" if telescope == "VLBA" or telescope == "GMVA" else "metadata" msg_arch = ArchiveMessageArchitect( - routing_key="ingestion-complete.vlba", - request=None, - project_code=project_code + routing_key=f"ingestion-complete.{key}", request=None, project_code=project_code, telescope=telescope ) else: logger.info(f"Cannot determine archive message for type {msg_type}. Skipping....")