Skip to content
Snippets Groups Projects

refactor aat-wrest for sanity

Merged Charlotte Hausman requested to merge cherry_pick_aat_wrest into main
9 files
+ 313
301
Compare changes
  • Side-by-side
  • Inline
Files
9
"""
Extracts workflow relevant metadata from the NRAO archive based on Science Product Locator.
"""
import argparse
import json
import logging
import sys
import pendulum
import psycopg2 as pg
from aat_wrest.observation_wrester import ObservationWrester
from aat_wrest.utilities import PENDULUM_FORMAT, TIME_ZONE, MDDBConnector
from pycapo import CapoConfig
from typing import List
logger = logging.getLogger("aat_wrest")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
from aat_wrest.utilities import PENDULUM_FORMAT, TIME_ZONE, MDDBConnector
class WrestWorkflowMetadata:
@@ -23,18 +16,15 @@ class WrestWorkflowMetadata:
"""
def __init__(
self, connection: MDDBConnector, spl: str = None, fileset_id: str = None, sdm_id: str = None
self,
connection: MDDBConnector,
spl: List[str] = None,
sdm_id: str = None,
):
self.logger = logging.getLogger("aat_wrest")
self.conn = connection
if fileset_id is not None:
self.fileset_id = fileset_id
if sdm_id is not None:
self.sdm_id = sdm_id
if not spl and fileset_id:
self.spl = self.wrest_obs_metadata_from_fileset_id(fileset_id)["spl"]
else:
self.spl = spl
self.sdm_id = sdm_id
self.spl = spl
def wrest_standard_cal_info(self) -> json:
"""
@@ -85,8 +75,8 @@ class WrestWorkflowMetadata:
def wrest_standard_image_info(self) -> json:
"""
Given an execution block science product locator, returns the required metadata to run
the standard calibration workflow
Given an execution block SDM ID, returns the required metadata to run
the standard imaging workflow
:return:
"""
@@ -130,82 +120,57 @@ class WrestWorkflowMetadata:
self.conn.close()
return make_json
def wrest_obs_metadata_from_fileset_id(self, fileset_id: str) -> str:
def wrest_restore_info(self) -> json:
"""
Given a fileset_id, query the Metadata DB and return the corresponding science_product_locator
:param fileset_id:
:return science_product_locator:
Given an execution block science product locator and calibration science product locator, returns
the required metadata to run the restore CMS workflow
:return:
"""
metadata = {
"spl": None,
"bands": None,
"array_config": None,
"obs_start_time": None,
"obs_end_time": None,
}
sql = f"""
SELECT science_product_locator, band_code, configuration, starttime, endtime
FROM execution_blocks
WHERE ngas_fileset_id = %(fileset_id)s
eb_sql = f"""
SELECT ngas_fileset_id as filesetId,
e.project_code as projectCode,
p.title as title,
e.starttime as startTime,
(a.firstname || ' ' || a.lastname) as observer,
telescope as telescope
FROM execution_blocks e
JOIN projects p on e.project_code = p.project_code
JOIN authors a on p.project_code = a.project_code
WHERE science_product_locator = %(spl)s AND a.is_pi = true
"""
with self.conn.cursor() as cursor:
cursor.execute(sql, {"fileset_id": fileset_id})
data = cursor.fetchall()
metadata["spl"] = data[0][0]
metadata["bands"] = data[0][1]
metadata["array_config"] = data[0][2]
metadata["obs_start_time"] = data[0][3]
metadata["obs_end_time"] = data[0][4]
return metadata
def parser() -> argparse.ArgumentParser:
arg_parser = argparse.ArgumentParser(
description="Workspaces-to-Archive Metadata Exchange",
formatter_class=argparse.RawTextHelpFormatter,
)
arg_parser.add_argument(
"-sc",
"--stdcals",
nargs=1,
action="store",
required=False,
help="Find workflow metadata for standard calibrations with provided product locator",
)
arg_parser.add_argument(
"-si",
"--stdimg",
nargs=1,
action="store",
required=False,
help="Find workflow metadata for standard CMS imaging with provided SDM id",
)
arg_parser.add_argument(
"-obs",
"--observation",
nargs=1,
action="store",
required=False,
help="Find display metadata for observations by product locator",
)
return arg_parser
def determine_wrester(connection: MDDBConnector, args: argparse.Namespace):
if args.stdcals:
data = WrestWorkflowMetadata(connection, spl=args.stdcals[0]).wrest_standard_cal_info()
if args.stdimg:
data = WrestWorkflowMetadata(connection, sdm_id=args.stdimg[0]).wrest_standard_image_info()
if args.observation:
data = ObservationWrester(connection, spl=args.observation[0]).wrest_observation_info()
print(data)
def main():
args = parser().parse_args()
connection = MDDBConnector()
cal_sql = f"""
SELECT external_name as calSdmId
from science_products WHERE science_product_locator = %(cal_spl)s
"""
determine_wrester(connection, args)
make_json = {}
try:
cursor = self.conn.cursor()
cursor.execute(eb_sql, {"spl": self.spl[0]})
data = cursor.fetchall()
cursor.execute(cal_sql, {"cal_spl": self.spl[1]})
cal_data = cursor.fetchall()
if data and cal_data:
make_json = json.dumps(
{
"sdmId": data[0][0],
"calSdmId": cal_data[0][0],
"projectCode": data[0][1],
"title": data[0][2],
"startTime": data[0][3],
"observer": data[0][4],
"telescope": data[0][5],
"created_at": str(
pendulum.now().in_timezone(TIME_ZONE).format(PENDULUM_FORMAT)
),
}
)
else:
self.logger.error(
f"ERROR: aat-wrest query returned no results!"
f" The database appears to be missing information for either SPL {self.spl[0]} or Cal SPL {self.spl[1]}!"
)
finally:
self.conn.close()
return make_json
Loading