Skip to content
Snippets Groups Projects

refactor aat-wrest for sanity

Merged Charlotte Hausman requested to merge cherry_pick_aat_wrest into main
9 files
+ 258
318
Compare changes
  • Side-by-side
  • Inline
Files
9
"""
Extracts workflow relevant metadata from the NRAO archive based on Science Product Locator.
"""
import argparse
import json
import logging
import sys
import pendulum
from typing import List
from aat_wrest.observation_wrester import ObservationWrester
from aat_wrest.utilities import PENDULUM_FORMAT, TIME_ZONE, MDDBConnector
logger = logging.getLogger("aat_wrest")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
class WrestWorkflowMetadata:
"""
@@ -26,19 +19,12 @@ class WrestWorkflowMetadata:
self,
connection: MDDBConnector,
spl: List[str] = None,
fileset_id: str = None,
sdm_id: str = None,
):
self.logger = logging.getLogger("aat_wrest")
self.conn = connection
if fileset_id is not None:
self.fileset_id = fileset_id
if sdm_id is not None:
self.sdm_id = sdm_id
if not spl and fileset_id:
self.spl = self.wrest_obs_metadata_from_fileset_id(fileset_id)["spl"]
else:
self.spl = spl
self.sdm_id = sdm_id
self.spl = spl
def wrest_standard_cal_info(self) -> json:
"""
@@ -89,8 +75,8 @@ class WrestWorkflowMetadata:
def wrest_standard_image_info(self) -> json:
"""
Given an execution block science product locator, returns the required metadata to run
the standard calibration workflow
Given an execution block SDM ID, returns the required metadata to run
the standard imaging workflow
:return:
"""
@@ -140,8 +126,6 @@ class WrestWorkflowMetadata:
the required metadata to run the restore CMS workflow
:return:
"""
print(self.spl)
eb_sql = f"""
SELECT ngas_fileset_id as filesetId,
e.project_code as projectCode,
@@ -190,94 +174,3 @@ class WrestWorkflowMetadata:
finally:
self.conn.close()
return make_json
def wrest_obs_metadata_from_fileset_id(self, fileset_id: str) -> str:
"""
Given a fileset_id, query the Metadata DB and return the corresponding science_product_locator
:param fileset_id:
:return science_product_locator:
"""
metadata = {
"spl": None,
"bands": None,
"array_config": None,
"obs_start_time": None,
"obs_end_time": None,
}
sql = f"""
SELECT science_product_locator, band_code, configuration, starttime, endtime
FROM execution_blocks
WHERE ngas_fileset_id = %(fileset_id)s
"""
with self.conn.cursor() as cursor:
cursor.execute(sql, {"fileset_id": fileset_id})
data = cursor.fetchall()
metadata["spl"] = data[0][0]
metadata["bands"] = data[0][1]
metadata["array_config"] = data[0][2]
metadata["obs_start_time"] = data[0][3]
metadata["obs_end_time"] = data[0][4]
return metadata
def parser() -> argparse.ArgumentParser:
arg_parser = argparse.ArgumentParser(
description="Workspaces-to-Archive Metadata Exchange",
formatter_class=argparse.RawTextHelpFormatter,
)
arg_parser.add_argument(
"-sc",
"--stdcals",
nargs=1,
action="store",
required=False,
help="Find workflow metadata for standard calibrations with provided product locator",
)
arg_parser.add_argument(
"-si",
"--stdimg",
nargs=1,
action="store",
required=False,
help="Find workflow metadata for standard CMS imaging with provided SDM id",
)
arg_parser.add_argument(
"-r",
"--restore",
nargs="+",
default=[],
required=False,
help="Find workflow metadata for Restores with provided EB product locator and Cal product locator",
)
arg_parser.add_argument(
"-obs",
"--observation",
nargs=1,
action="store",
required=False,
help="Find display metadata for observations by product locator",
)
return arg_parser
def determine_wrester(connection: MDDBConnector, args: argparse.Namespace):
if args.stdcals:
data = WrestWorkflowMetadata(connection, spl=args.stdcals[0]).wrest_standard_cal_info()
if args.stdimg:
data = WrestWorkflowMetadata(connection, sdm_id=args.stdimg[0]).wrest_standard_image_info()
if args.restore:
print(args)
data = WrestWorkflowMetadata(connection, spl=args.restore).wrest_restore_info()
if args.observation:
data = ObservationWrester(connection, spl=args.observation[0]).wrest_observation_info()
print(data)
def main():
args = parser().parse_args()
connection = MDDBConnector()
determine_wrester(connection, args)
Loading