Newer
Older
#
# Copyright (C) 2021 Associated Universities, Inc. Washington DC, USA.
#
# This file is part of NRAO Workspaces.
#
# Workspaces is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Workspaces is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
# -------------------------------------------------------------------------
#
# P R O D U C T F I N D I N G
#
# -------------------------------------------------------------------------
import abc
from pendulum.datetime import DateTime
from .products import ArchiveProduct, ProductMetadata, RestoreProduct, SpooledProduct
"""
Locates products for the delivery to deliver
"""
@property
@abc.abstractmethod
def projects(self) -> List[str]:
"""
Return a list of all the projects we're delivering for right now.
:return:
"""
pass
@abc.abstractmethod
def find_products(self) -> Iterator[SpooledProduct]:
"""
Find products and return an iterator of them
:return: Iterator of SpooledProducts
"""
# Historical note
#
# When this program was first conceived, the plan was to use the CASA "Pipeline Processing Results" format, or
# "piperesults" file (since PPR has another meaning already in "Pipeline Processing Request") to drive delivery.
# This format looks a bit like this:
#
# <?xml version="1.0" ?>
# <piperesults name="unknown">
# <ous name="unknown">
# <casaversion name="5.1.2-4"/>
# <pipeline_version name="41154 (Pipeline-CASA51-P2-B)"/>
# <procedure_name name="Undefined"/>
# <session name="session_1">
# <caltables name="unknown.session_1.caltables.tgz"/>
# <asdm name="18A-426.sb35753229.eb35761423.58425.94538642361.ms">
# <finalflags name="18A-426.sb35753229.eb35761423.58425.94538642361.ms.flagversions.tgz"/>
# <applycmds name="18A-426.sb35753229.eb35761423.58425.94538642361.ms.calapply.txt"/>
# </asdm>
# </session>
# <weblog name="weblog.tgz"/>
# <casa_cmdlog name="casa_commands.log"/>
# <pipescript name="casa_pipescript.py"/>
# <restorescript name="casa_piperestorescript.py"/>
# <image imtype="calibrator" name="oussid.J1820-2528_ph.spw0.mfs.I.pbcor.fits"/>
# <image imtype="calibrator" name="oussid.J1820-2528_ph.spw0.mfs.I.pb.fits"/>
# ...
# New requirements started coming in and it was starting to feel like this format wouldn't be a great starting
# point, since it would be difficult to interject more conditional processing. So instead we created something
# else, the HeuristicProductFinder, which worked by scanning the filesystem and making inferences based on the
# file names and types.
#
# The HeuristicProductFinder would have worked if most of our products had intrinsic metadata we could use.
# Unfortunately we have a lot of products for which this doesn't work—perhaps they're raw tar files; they could
# be a weblog or they could be calibration tables. And we have other things like FITS files that carry some
# metadata, but perhaps not a sufficient amount of metadata. This never worked completely, so it has been removed.
#
# This is how we have just the one ProductFinder now, which is based on the idea of looking at the "products.json"
# file which is generated by the product fetcher or the workflow itself.
class JsonProductFinder(ProductFinder):
def __init__(self, dir: pathlib.Path):
self.root = dir
self.products: dict = json.loads((dir / "products.json").read_bytes())
Return a list of all the projects we're delivering for right now.
:return:
return list(set(p["project_code"] for p in self.products.values()))
def find_products(self) -> Iterator[SpooledProduct]:
for path, product in self.products.items():
yield ArchiveProduct(self.root / path, product)
class RestoreProductFinder(ProductFinder):
"""Used when there's a single RestoreProduct that's the entirety of `dir`,
whose metadata can be gleaned from the dir's metadata.json.
Can easily be extended to the ALMA restore use-case;
should be extensible to multiple restores per delivery as well.
"""
def __init__(self, dir: pathlib.Path) -> None:
self.dir = dir
self._metadata = self.parse_metadata()
def parse_metadata(self) -> ProductMetadata:
metadata_file = json.loads((self.dir / "metadata.json").read_bytes())
project_metadata = metadata_file["projectMetadata"]
casa_log_path = list(self.dir.glob("./working/casa-*.log"))
assert len(casa_log_path) == 1
casa_log_name = casa_log_path[0].name
start_timestamp_match = re.match(r"^casa-(?P<timestamp>[0-9]+-[0-9]+).log$", casa_log_name)
assert start_timestamp_match
start_timestamp = pendulum.from_format(start_timestamp_match.group("timestamp"), "YYYYMMDD-HHmmss")
project_metadata["telescope"], project_metadata["projectCode"], "restored_cms", None, None, str(start_mjd)