# # Copyright (C) 2021 Associated Universities, Inc. Washington DC, USA. # # This file is part of NRAO Workspaces. # # Workspaces is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Workspaces is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Workspaces. If not, see <https://www.gnu.org/licenses/>. # ------------------------------------------------------------------------- # # P R O D U C T F I N D I N G # # ------------------------------------------------------------------------- import abc import json import pathlib import re from typing import Iterator, List import pendulum from delivery import convert_datetime_to_mjd from pendulum.datetime import DateTime from .products import ArchiveProduct, ProductMetadata, RestoreProduct, SpooledProduct class ProductFinder(abc.ABC): """ Locates products for the delivery to deliver """ @property @abc.abstractmethod def projects(self) -> List[str]: """ Return a list of all the projects we're delivering for right now. :return: """ pass @abc.abstractmethod def find_products(self) -> Iterator[SpooledProduct]: """ Find products and return an iterator of them :return: Iterator of SpooledProducts """ pass # Historical note # # When this program was first conceived, the plan was to use the CASA "Pipeline Processing Results" format, or # "piperesults" file (since PPR has another meaning already in "Pipeline Processing Request") to drive delivery. # This format looks a bit like this: # # <?xml version="1.0" ?> # <piperesults name="unknown"> # <ous name="unknown"> # <casaversion name="5.1.2-4"/> # <pipeline_version name="41154 (Pipeline-CASA51-P2-B)"/> # <procedure_name name="Undefined"/> # <session name="session_1"> # <caltables name="unknown.session_1.caltables.tgz"/> # <asdm name="18A-426.sb35753229.eb35761423.58425.94538642361.ms"> # <finalflags name="18A-426.sb35753229.eb35761423.58425.94538642361.ms.flagversions.tgz"/> # <applycmds name="18A-426.sb35753229.eb35761423.58425.94538642361.ms.calapply.txt"/> # </asdm> # </session> # <weblog name="weblog.tgz"/> # <casa_cmdlog name="casa_commands.log"/> # <pipescript name="casa_pipescript.py"/> # <restorescript name="casa_piperestorescript.py"/> # <image imtype="calibrator" name="oussid.J1820-2528_ph.spw0.mfs.I.pbcor.fits"/> # <image imtype="calibrator" name="oussid.J1820-2528_ph.spw0.mfs.I.pb.fits"/> # ... # # New requirements started coming in and it was starting to feel like this format wouldn't be a great starting # point, since it would be difficult to interject more conditional processing. So instead we created something # else, the HeuristicProductFinder, which worked by scanning the filesystem and making inferences based on the # file names and types. # # The HeuristicProductFinder would have worked if most of our products had intrinsic metadata we could use. # Unfortunately we have a lot of products for which this doesn't workâperhaps they're raw tar files; they could # be a weblog or they could be calibration tables. And we have other things like FITS files that carry some # metadata, but perhaps not a sufficient amount of metadata. This never worked completely, so it has been removed. # # This is how we have just the one ProductFinder now, which is based on the idea of looking at the "products.json" # file which is generated by the product fetcher or the workflow itself. class JsonProductFinder(ProductFinder): def __init__(self, dir: pathlib.Path): self.root = dir self.products: dict = json.loads((dir / "products.json").read_bytes()) @property def projects(self) -> List[str]: """ Return a list of all the projects we're delivering for right now. :return: """ return list(set(p["project_code"] for p in self.products.values())) def find_products(self) -> Iterator[SpooledProduct]: for path, product in self.products.items(): yield ArchiveProduct(self.root / path, product) class RestoreProductFinder(ProductFinder): """Used when there's a single RestoreProduct that's the entirety of `dir`, whose metadata can be gleaned from the dir's metadata.json. Can easily be extended to the ALMA restore use-case; should be extensible to multiple restores per delivery as well. """ def __init__(self, dir: pathlib.Path) -> None: self.dir = dir self._metadata = self.parse_metadata() def parse_metadata(self) -> ProductMetadata: metadata_file = json.loads((self.dir / "metadata.json").read_bytes()) project_metadata = metadata_file["projectMetadata"] casa_log_path = list(self.dir.glob("./working/casa-*.log")) assert len(casa_log_path) == 1 casa_log_name = casa_log_path[0].name start_timestamp_match = re.match(r"^casa-(?P<timestamp>[0-9]+-[0-9]+).log$", casa_log_name) assert start_timestamp_match start_timestamp = pendulum.from_format(start_timestamp_match.group("timestamp"), "YYYYMMDD-HHmmss") start_mjd = convert_datetime_to_mjd(start_timestamp) return ProductMetadata( project_metadata["telescope"], project_metadata["projectCode"], "restored_cms", None, None, str(start_mjd) ) @property def projects(self) -> List[str]: return [self._metadata.project] def find_products(self) -> Iterator[SpooledProduct]: yield RestoreProduct(self.dir, self._metadata)