Skip to content
Snippets Groups Projects
Commit 2719264c authored by Brittany Faciane's avatar Brittany Faciane
Browse files

WS-1392: Data annotation tools for DAs.

parent 4ab0b3a2
No related branches found
Tags 2.8.0
1 merge request!1470WS-1392: Data annotation tools for DAs.
......@@ -84,6 +84,13 @@ build conveyor:
PACKAGE_PATH: "apps/cli/executables/pexable/conveyor"
extends: .build-package
build data annotator:
interruptible: true
stage: build-packages
variables:
PACKAGE_PATH: "apps/cli/executables/pexable/data_annotator"
extends: .build-package
build deliver:
interruptible: true
stage: build-packages
......@@ -308,6 +315,13 @@ push conveyor:
PIP_NAME: "conveyor"
extends: .push-packages
push data-annotator:
stage: push-packages
variables:
PACKAGE_NAME: "data_annotator"
PIP_NAME: "data-annotator"
extends: .push-packages
push deliver:
stage: push-packages
variables:
......
This diff is collapsed.
#
# Copyright (C) 2021 Associated Universities, Inc. Washington DC, USA.
#
# This file is part of NRAO Workspaces.
#
# Workspaces is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Workspaces is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
"""
Workspaces metrics reporter for users outside of SSA.
"""
__version__ = "2.8.2rc1"
#
# Copyright (C) 2021 Associated Universities, Inc. Washington DC, USA.
#
# This file is part of NRAO Workspaces.
#
# Workspaces is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Workspaces is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
import argparse
from .queries import retrieve_problem_severities, retrieve_problem_types, get_exec_blocks, \
get_all_spls_from_exec_blocks, create_comments, delete_comments, modify_comments
from .connections import MDDBConnector
def parser() -> argparse.ArgumentParser:
"""
Initialize argument parser for command-line arguments
:return: Argument parser
"""
parser = argparse.ArgumentParser(
description="Workspaces data annotator", formatter_class=argparse.RawTextHelpFormatter
)
# Create a group to capture the action to be done; require that users select exactly one of this group
action_group = parser.add_mutually_exclusive_group(required=True)
action_group.add_argument(
"-c",
"--create",
action='store_true',
help="Create a comment for one or more datasets. Cannot be used with -d/--delete or -m/--modify.",
)
action_group.add_argument(
"-m",
"--modify",
action='store_true',
help="Modify a comment for one or more datasets. Cannot be used with -d/--delete or -c/--create.",
)
action_group.add_argument(
"-d",
"--delete",
action='store_true',
help="Delete a comment for one or more datasets. Cannot be used with -c/--create or -m/--modify.",
)
# Create a group to capture the data that will be searched by when querying the database; require at least one
search_group = parser.add_argument_group()
search_group.add_argument(
"--project_code",
action="store",
help="A string representing the project code to search by.",
)
search_group.add_argument(
"--date-range",
nargs=2,
action="store",
help="Two strings representing beginning and end date to search by.",
)
search_group.add_argument(
"--fsid",
action="store",
help="A string representing the FSID to search by",
)
search_group.add_argument(
"--configuration",
action="store",
help="A string representing the configuration to search by.",
)
search_group.add_argument(
"--band",
action="store",
help="A string representing the band to search by.",
)
# Create a group to capture other data that is not required at the start of the program
optional_group = parser.add_argument_group()
optional_group.add_argument(
"--comment",
action="store",
help="A string representing the comment to add.",
)
optional_group.add_argument(
"--old-comment",
action="store",
help="A string representing the old comment to search by. Only used with -m/--modify.",
)
optional_group.add_argument(
"--problem-severity",
action="store",
help="Two strings representing beginning and end date to search by.",
)
optional_group.add_argument(
"--problem-type",
action="store",
help="Two strings representing beginning and end date to search by.",
)
return parser
def prompt_for_input_from_db_list(db_list, prompt):
"""
Accepts a list of id and description pairs and a sentence prompt to give to a user, and returns the user's choice
from this list.
:param db_list: a list of id and description tuples
:param prompt: a string representing a question to ask the user
:return: the user's choice from the passed iin options
"""
output_string = prompt + "\n"
for item in db_list:
output_string += str(item[0]) + '. ' + item[1] + '\n'
return input(output_string)
def main():
print("**********************************")
print("* WELCOME TO THE DATA ANNOTATOR! *")
print("**********************************")
args = parser().parse_args()
try:
conn = MDDBConnector()
except Exception as e:
print(e)
# Prompt for any missing data
if not args.problem_severity:
problem_severities = retrieve_problem_severities(conn)
prompt = "A problem severity is required. Input the number for the value you wish to assign and hit enter:"
args.problem_severity = prompt_for_input_from_db_list(problem_severities, prompt)
if not args.problem_type:
problem_types = retrieve_problem_types(conn)
prompt = "A problem type is required. Input the number for the value you wish to assign and hit enter:"
args.problem_type = prompt_for_input_from_db_list(problem_types, prompt)
if not args.comment:
prompt = "Please type your comment and hit enter:\n"
args.comment = input(prompt)
# Find the initial execution blocks, use them to find all relevant science product locators and proceed
ebs = get_exec_blocks(args.project_code, args.date_range, args.fsid, args.band, args.configuration, conn)
all_spls = get_all_spls_from_exec_blocks(ebs, conn)
if args.create:
create_comments(all_spls, args.comment, args.problem_severity, args.problem_type, conn)
elif args.delete:
delete_comments(all_spls, args.comment, args.problem_severity, args.problem_type, conn)
elif args.modify:
if not args.old_comment:
prompt = "Please type the old comment you wish to modify:\n"
args.old_comment = input(prompt)
modify_comments(all_spls, args.comment, args.old_comment, args.problem_severity, args.problem_type, conn)
conn.commit()
conn.close()
if __name__ == '__main__':
main()
\ No newline at end of file
import logging
import sys
import psycopg2 as pg
from pycapo import CapoConfig
from sqlalchemy.engine import Connection
logger = logging.getLogger("data_annotator")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
class MDDBConnector:
"""Use this connection to interrogate the archive"""
def __init__(self):
self.connection = self._connect_to_db()
@staticmethod
def _connect_to_db() -> Connection:
"""
Establish a DB connection
:return:
"""
settings = CapoConfig().settings("metadataDatabase")
host_slash_db = settings["jdbcUrl"][len("jdbc:postgresql://") :]
host, dbname = host_slash_db.split("/")
port = 5432
if ":" in host:
host, port = host.split(":")
try:
conn = pg.connect(
host=host,
port=port,
database=dbname,
user=settings.jdbcUsername,
password=settings.jdbcPassword,
)
return conn
except Exception as exc:
logger.error(f"Unable to connect to database: {exc}")
raise exc
def cursor(self):
return self.connection.cursor()
def close(self):
self.connection.close()
def commit(self):
self.connection.commit()
def retrieve_problem_types(connection):
"""
Retrieves all valid problem types from the database.
:param connection: a MDDBConnector object to interface with the database
:return: a list of tuples representing problem type descriptions and their database ids
"""
cur = connection.cursor()
cur.execute("SELECT * FROM problem_types")
rows = cur.fetchall()
return rows
def retrieve_problem_severities(connection):
"""
Retrieves all valid problem severities from the database.
:param connection: a MDDBConnector object to interface with the database
:return: a list of tuples representing problem severity descriptions and their database ids
"""
cur = connection.cursor()
cur.execute("SELECT * FROM problem_severity_list")
rows = cur.fetchall()
return rows
def get_exec_blocks(project_code, date_range, fsid, band, configuration, connection):
"""
Finds all execution blocks associated with a user's query by building and executing a SQL query
:param project_code: a string corresponding to the project_code field in the database
:param date_range: a tuple corresponding to the starttime and endtime fields in the database
:param fsid: a string corresponding to the external name field in the database
:param band: a string corresponding to the band field in the database
:param configuration: a string corresponding to the configuration field in the database
:param connection: a MDDBConnector object to interface with the database
:return: a list of spls for all execution blocks, images, and calibrations related to the initial query
"""
# Format WHERE clauses with provided search pieces
project_code = f"project_code='{project_code}' AND " if project_code else ''
date_range = f"starttime>={date_range[0]} AND endtime<={date_range[1]} AND " if date_range else ''
fsid = f"external_name='{fsid}' AND " if fsid else ''
configuration = f"configuration='{configuration}' AND " if configuration else ''
band = f"band='{band}'" if band else ''
search_command = f"SELECT execution_block_id, execution_blocks.science_product_locator FROM execution_blocks \
INNER JOIN science_products ON execution_blocks.science_product_locator=science_products.science_product_locator \
WHERE {project_code}{configuration}{date_range}{fsid}{band}"
# Remove trailing ANDs, if necessary
if search_command[-4:] == 'AND ':
search_command = search_command[:-4]
cur = connection.cursor()
cur.execute(search_command)
return cur.fetchall()
def get_all_spls_from_exec_blocks(ebs, connection):
"""
Finds all science product locators associated with a list of execution blocks
:param ebs: a list tuples representing execution blocks
:return: a list of spls for all execution blocks, images, and calibrations related to the initial query
"""
ebs_to_find = []
spls_to_return = []
for eb in ebs:
ebs_to_find.append(str(eb[0]))
spls_to_return.append(str(eb[1]))
print("Finding all exec blocks, cals, and images fitting the passed in search terms...")
spl_query = f"""
with recursive
input_outputs as (select sppg2.science_product_locator as input_spl, sppg1.science_product_locator as output_spl
from science_products_product_groups sppg1
join product_groups pg on pg.product_group_id = sppg1.product_group_id
join product_groups ppg on ppg.product_group_id = pg.parent_product_group_id
join science_products_product_groups sppg2
on sppg2.product_group_id = ppg.product_group_id),
all_inputs_outputs as (select *
from input_outputs
union all
select input_outputs.input_spl, all_inputs_outputs.output_spl
from input_outputs
join all_inputs_outputs on input_outputs.output_spl = all_inputs_outputs.input_spl)
select distinct all_inputs_outputs.output_spl
from all_inputs_outputs, execution_blocks eb
where
eb.science_product_locator = all_inputs_outputs.input_spl AND
eb.execution_block_id in ({",".join(ebs_to_find)})
"""
cur = connection.cursor()
cur.execute(spl_query)
rows = cur.fetchall()
for row in rows:
spls_to_return.append(row[0])
return spls_to_return
def create_comments(ebs, comment, problem_severity, problem_type, connection):
"""
Creates a list of data comments and inserts them into the database.
:param ebs: a list execution block ids
:param comment: a string representing the comment to create
:param problem_severity: an integer representing the problem severity id
:param problem_type: an integer representing the problem type id
:param connection: a MDDBConnector object to interface with the database
"""
print("Creating comments...")
comments_to_add = []
for eb in ebs:
comments_to_add.append((problem_type, eb, problem_severity, comment))
cur = connection.cursor()
cur.executemany("INSERT INTO science_product_comments VALUES(DEFAULT,%s,%s,%s,%s)", comments_to_add)
print(f"Created comments for {cur.rowcount} total rows...")
def delete_comments(ebs, comment, problem_severity, problem_type, connection):
"""
Finds a list of comments in the database and deletes them.
:param ebs: a list execution block ids
:param comment: a string representing the comment to delete
:param problem_severity: an integer representing the problem severity id
:param problem_type: an integer representing the problem type id
:param connection: a MDDBConnector object to interface with the database
"""
print("Deleting comments...")
comments_to_add = []
for eb in ebs:
comments_to_add.append((problem_type, eb, problem_severity, comment))
cur = connection.cursor()
cur.executemany(f"DELETE FROM science_product_comments WHERE problem_type_id=%s AND science_product_locator=%s AND severity=%s AND comment=%s", comments_to_add)
print(f"Deleted comments for {cur.rowcount} total rows...")
def modify_comments(ebs, new_comment, old_comment, problem_severity, problem_type, connection):
"""
Finds a list of comments in the database and modifies the values.
:param ebs: a list execution block ids
:param new_comment: a string representing the comment to be inserted
:param old_comment: a string representing the comment to be replaced
:param problem_severity: an integer representing the problem severity id
:param problem_type: an integer representing the problem type id
:param connection: a MDDBConnector object to interface with the database
"""
print("Modifying comments...")
comments_to_add = []
for eb in ebs:
comments_to_add.append((new_comment, problem_type, eb, problem_severity, old_comment))
cur = connection.cursor()
cur.executemany(f"UPDATE science_product_comments SET comment=%s WHERE problem_type_id=%s AND science_product_locator=%s AND severity=%s AND comment=%s", comments_to_add)
print(f"Modified comments for {cur.rowcount} total rows...")
\ No newline at end of file
[tool.poetry]
name = "data_annotator"
version = "2.8.2rc1"
description = "Workspaces CLI tool for creating and deleting comments associated with data quality."
authors = ["DMS SSA <dms-ssa@nrao.edu>"]
license = "GPL3+"
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.10"
pycapo = "^0.3.1"
psycopg2-binary = "^2.9.6"
sqlalchemy = "1.4.47"
[tool.poetry.group.test.dependencies]
pytest = "^7.3.1"
[tool.poetry.scripts]
data_annotator = "data_annotator.annotator:main"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment