Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (16)
Showing
with 862 additions and 20 deletions
......@@ -18,6 +18,7 @@ variables:
POSTGRES_DB: archive
POSTGRES_USER: "archive"
POSTGRES_PASSWORD: "docker"
DL_HOST: https://dl-nrao.aoc.nrao.edu
# CI Postgres Service
services:
......@@ -241,16 +242,18 @@ deploy:
# Docker doesn't allow variable interpolation when declaring Docker Secret names
# This sed command finds and replaces "dsoc_ENV_secrets:" with "dsoc_${DEPLOY_ENV}_secrets:"
- sed -i "s/dsoc_ENV_secrets:/dsoc_${DEPLOY_ENV}_secrets:/g" docker-compose.dev.yml
- ENV=$DEPLOY_ENV TAG=$IMAGE_TAG docker stack deploy --compose-file docker-compose.dev.yml workspaces-${DEPLOY_ENV}
- ENV=$DEPLOY_ENV TAG=$IMAGE_TAG DL_HOST=$DL_HOST docker stack deploy --compose-file docker-compose.dev.yml workspaces-${DEPLOY_ENV}
rules:
- if: $CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH
variables:
IMAGE_TAG: ${CI_DEFAULT_BRANCH}
- if: '$CI_COMMIT_TAG =~ /^end-of-sprint\/[0-9]+/'
- if: '$CI_COMMIT_TAG =~ /^end-of-sprint-[0-9]+/'
variables:
IMAGE_TAG: $CI_COMMIT_TAG
# override DEPLOY_ENV
DEPLOY_ENV: "test"
# override DL_HOST
DL_HOST: https://dl-dsoc-test.nrao.edu
# - if: '$CI_COMMIT_TAG =~ /^test_[0-9]+\.[0-9]+.[0-9]+$/'
# variables:
# IMAGE_TAG: $CI_COMMIT_TAG
......
""" A locations report is produced by the archive service; you give
it a product locator and it returns a dictionary of details on how
to retrieve the product's files from long term storage (NGAS): this
class handles fetching the report from the service or reading it from
a file, and has utilities to manipulate the report.
"""
# pylint: disable=C0301, E0401, E0402, W1203
import copy
import http
import json
import logging
from json import JSONDecodeError
from typing import Dict, List
import requests
from pycapo import CapoConfig
from .errors import (
LocationServiceTimeoutException,
LocationServiceRedirectsException,
LocationServiceErrorException,
NoLocatorException,
MissingSettingsException,
)
from .utilities import Cluster, RetrievalMode, validate_file_spec, get_arg_parser
# pylint: disable=C0103, R0902, R0903, R0914, W0703, W1203
_LOG = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
REQUIRED_SETTINGS = {
"EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.LOCATORSERVICEURLPREFIX": "locator_service_url",
"EDU.NRAO.ARCHIVE.DATAFETCHER.DATAFETCHERSETTINGS.EXECUTIONSITE": "execution_site",
}
class LocationsReportRefactor:
""" Builds a location report """
def __init__(self, args: List[str]):
namespace = get_arg_parser().parse_args(args)
try:
self.capo_config = CapoConfig(profile=namespace.profile)
except Exception as exc:
raise MissingSettingsException("Capo profile is required") from exc
self.product_locator = namespace.product_locator
self.location_file = namespace.location_file
self.profile = namespace.profile
if not self.product_locator and not self.location_file:
raise NoLocatorException("either product locator or report file must be specified")
self._run()
def _run(self):
self.files_report = self._get_files_report()
self.servers_report = self._get_servers_report()
def _get_files_report(self):
"""Given a product locator or a path to a location file, return a
location report: an object describing the files that make up the product
and where to get them from.
If neither argument is provided, throw a ValueError; if both are
(for some reason), then the location file takes precedence.
:return: location report (from file, in JSON)
"""
result = dict()
try:
if self.location_file is not None:
result = self._get_location_report_from_file()
if self.product_locator is not None:
result = self._get_location_report_from_service()
return self._add_retrieve_method_field(result)
except JSONDecodeError as js_err:
_LOG.error(f"Unable to parse {self.location_file}")
raise ReportFileParseException from js_err
def _get_location_report_from_file(self) -> Dict[str, str]:
"""
Read a file at a user-provided path to pull in the location report.
:return:
"""
_LOG.debug(f'fetching files from report "{self.location_file}"')
try:
with open(self.location_file) as to_read:
result = json.load(to_read)
return result
except FileNotFoundError as err:
_LOG.error(f"{err}")
raise
except JSONDecodeError as js_err:
_LOG.error(f"Unable to parse {self.location_file}")
raise ReportFileParseException from js_err
except Exception as exc:
_LOG.error(f"Problem getting location report from f{self.location_file}: {exc}")
raise
def _get_location_report_from_service(self):
"""Use 'requests' to fetch the location report from the locator service.
:return: location report (from locator service, in JSON)
"""
url = self.capo_config.get(
"edu.nrao.archive.datafetcher.DataFetcherSettings.locatorServiceUrlPrefix"
)
_LOG.debug(f"fetching report from {url} for {self.product_locator}")
# this is needed to prevent SSL errors when tests are run
# inside a Docker container
requests.packages.urllib3.util.ssl_.DEFAULT_CIPHERS += ":HIGH:!DH:!aNULL"
requests.Session().mount(url, adapter=requests.adapters.HTTPAdapter())
response = None
try:
response = requests.get(url, params={"locator": self.product_locator})
except requests.exceptions.Timeout as exc_to:
raise LocationServiceTimeoutException() from exc_to
except requests.exceptions.TooManyRedirects as exc_re:
raise LocationServiceRedirectsException() from exc_re
except requests.exceptions.RequestException as ex:
raise LocationServiceErrorException(ex) from ex
except Exception as exc:
_LOG.error(f"{exc}")
if response.status_code == http.HTTPStatus.OK:
return response.json()
if response.status_code == http.HTTPStatus.NOT_FOUND:
raise UnknownLocatorException(f'locator "{self.product_locator}" ' f"not found")
raise LocationServiceErrorException(
"locator service returned {}".format(response.status_code)
)
def _add_retrieve_method_field(self, files_report: Dict):
"""This adds a field to the files report about whether we can do
a direct copy or we have to rely on streaming: this is something
the location service itself doesn't know because it depends on
which site data-fetcher is running on, which site has the data and
whether
the NGAS cluster supports direct copy."""
dsoc_cluster = Cluster.DSOC
exec_site = self.capo_config.getstring(
"edu.nrao.archive.datafetcher.DataFetcherSettings.executionSite"
)
for file_spec in files_report["files"]:
validate_file_spec(file_spec, False)
server = file_spec["server"]
location = server["location"]
if server["cluster"] == dsoc_cluster.value and (
location == exec_site or location == str(exec_site)
):
server["retrieve_method"] = RetrievalMode.COPY
else:
server["retrieve_method"] = RetrievalMode.STREAM
return files_report
def _get_servers_report(self) -> Dict:
"""The location report we get back looks like this, for each file:
{"ngas_file_id":"17B-197_2018_02_19_T15_59_16.097.tar",
"subdirectory":"17B-197.sb34812522.eb35115211.58168.58572621528",
"relative_path":"17B-197_2018_02_19_T15_59_16.097.tar",
"checksum":"-1848259250",
"checksum_type":"ngamsGenCrc32",
"version":1,
"size":108677120,
"server":{"server":"nmngas01.aoc.nrao.edu:7777",
"location":"DSOC",
"cluster":"DSOC"
}}
Re-organize it to group files under servers so it is more useful.
"""
result = {}
for file_spec in self.files_report["files"]:
validate_file_spec(file_spec, True)
new_f = copy.deepcopy(file_spec)
del new_f["server"]
server = file_spec["server"]
server_host = server["server"]
if server_host not in result:
result[server_host] = dict()
result[server_host]["location"] = server["location"]
result[server_host]["cluster"] = server["cluster"]
result[server_host]["retrieve_method"] = server["retrieve_method"]
result[server_host]["files"] = list()
result[server_host]["files"].append(new_f)
return result
class ReportFileParseException(Exception):
"""Throw this when we're unable to parse a .json """
class UnknownLocatorException(Exception):
"""Throw this when we get a product locator we don't know how to handle"""
......@@ -7,6 +7,7 @@ import sys
from argparse import Namespace
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from pprint import pprint
from typing import Dict
from datafetcher.errors import NGASServiceErrorException
......@@ -132,26 +133,34 @@ class ParallelFetcher(BaseFetcher):
def fetch_bucket(self, bucket):
""" Grab the files in this bucket """
file_sizes = [file["size"] for file in bucket["files"]]
pprint(f"retrieving files {file_sizes} from {bucket['server']}")
self._LOG.debug(
f"{bucket['retrieve_method']} "
f"{len(bucket['files'])} files from "
f"{bucket['server']}...."
)
self.retrieve_files(bucket["server"], bucket["retrieve_method"], bucket["files"])
num_files = self.retrieve_files(
bucket["server"], bucket["retrieve_method"], bucket["files"]
)
print(f"done retrieving files {file_sizes} from {bucket['server']}")
return num_files
def run(self):
""" Fetch all the files for the product locator """
print(f"running {self.__class__.__name__}")
if self.args.dry_run:
self._LOG.debug("This is a dry run; files will not be fetched")
return 0
with ThreadPoolExecutor() as executor:
results = executor.map(self.fetch_bucket, self.bucketized_files)
print(f"results: {results}")
try:
futures = as_completed(results)
for future in futures:
self.num_files_retrieved += future.result()
for result in results:
print(f"result has arrived: {result}")
self.num_files_retrieved += result
if self.num_files_retrieved != self.num_files_expected:
self._LOG.error(
f"{self.num_files_expected} files expected, "
......@@ -160,6 +169,7 @@ class ParallelFetcher(BaseFetcher):
self._exit_with_error(ReturnCode.NGAS_FETCH_ERROR)
# successful retrieval
print("returning")
return 0
except (FileExistsError, NGASServiceErrorException) as exc:
self._LOG.error(f"{exc}")
......
""" Let's not actually hit the archive service """
import logging
import sys
from pathlib import Path
from datafetcher.locations_report_refactor import LocationsReportRefactor, UnknownLocatorException
# pylint: disable=C0301, R0903
_LOG = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
PROFILE = "docker"
REPORT_FILES = {
"vla_eb": "17A-109_fg_18468.json",
"calibration": "CALIBRATION.json",
"image": "IMG.json",
"vlba": "VLBA_EB.json",
"gbt": "AGBT17B_044_02.json",
# "alma": "A001_X1296_Xa93_RAW.json",
"empty": "EMPTY.json",
"not_json": "NOT_JSON.json",
"vla_bad_server": "VLA_BAD_SERVER.json",
"vla_eb_busted": "VLA_SMALL_EB_BUSTED.json",
}
class FakeArchiveService:
""" Stand-in for archive service """
def __init__(self, product_locator: str):
"""
Return locations report for given locator
:param product_locator:
"""
self.product_locator = product_locator
self.data_dir = get_test_data_dir()
def get_locations_report(self) -> LocationsReportRefactor:
"""
Depending on the product locator, return locations report for a VLA EB, an image, a VLBA product, etc.
:return: the location report
"""
if "vlba" in self.product_locator:
return self._vlba_locations_report()
elif "calibration" in self.product_locator:
return self._calibration_locations_report()
elif "image" in self.product_locator:
return self._image_locations_report()
elif "alma" in self.product_locator:
return self._alma_locations_report()
elif "gbt" in self.product_locator:
return self._gbt_locations_report()
elif "evla/execblock" in self.product_locator:
return self._vla_eb_locations_report()
raise UnknownLocatorException
def _vla_eb_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLA EB locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["vla_eb"]
return self._get_location_report_from_file(file)
def _calibration_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLA calibrations locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["calibration"]
return self._get_location_report_from_file(file)
def _image_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLASS image locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["image"]
return self._get_location_report_from_file(file)
def _vlba_locations_report(self) -> LocationsReportRefactor:
"""
Read in a VLBA locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["vlba"]
return self._get_location_report_from_file(file)
def _gbt_locations_report(self) -> LocationsReportRefactor:
"""
Read in a GBT locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["gbt"]
return self._get_location_report_from_file(file)
def _alma_locations_report(self) -> LocationsReportRefactor:
"""
Read in an ALMA EB locations report from a .json file.
:return:
"""
file = self.data_dir / REPORT_FILES["alma"]
return self._get_location_report_from_file(file)
@staticmethod
def _get_location_report_from_file(location_file: Path) -> LocationsReportRefactor:
"""
Read a .json file into a LocationsReport.
:param location_file: Path to file
:return: the locations report
"""
args = ["--location-file", str(location_file), "--profile", PROFILE]
return LocationsReportRefactor(args)
def get_test_data_dir() -> Path:
"""
Find the .json locations report files we use for testing.
:return:
"""
top_level_subdirs = sys.path
shared_ws_src = None
for pathname in top_level_subdirs:
# test data will be a few levels under shared_wksp
if "shared/workspaces" in pathname:
shared_ws_src = pathname
break
shared_wksp = Path(shared_ws_src).parent
for item in shared_wksp.rglob("location_files"):
assert item.is_dir()
return item
return None
......@@ -183,7 +183,7 @@ def test_no_overwrite_without_force(make_tempdir, settings):
assert fake_size == 38
@pytest.mark.skipif(not RUN_ALL, reason="debug")
@pytest.mark.skip("verbose mode goes away in WS-179 and isn't in use now")
def test_more_output_when_verbose(make_tempdir, settings):
top_level = Path(make_tempdir)
location_file = get_mini_locations_file(top_level / _LOCATION_FILENAME)
......
""" Tests to confirm our fake archive service functions as intended """
# pylint: disable=E0402, W0511
from datafetcher.locations_report_refactor import LocationsReportRefactor
from .fake_archive_service import FakeArchiveService
# TODO: move this to appropriate DF test module
# def test_gets_df_settings():
# capo = CapoConfig(profile=PROFILE)
# key = "edu.nrao.archive.datafetcher.DataFetcherSettings"
# field = "locatorServiceUrlPrefix"
# to_get = key + "." + field
# try:
# url = capo.get(to_get)
# except KeyError as k_err:
# raise MissingSettingsException(f'missing required setting "{field}"') from k_err
# assert "locator" in url
#
#
def test_service_returns_vla_eb_locations():
"""
Does FakeArchiveService return the calibrations locations report in
test_data?
:return:
"""
found = False
report = FakeArchiveService("uid://evla/execblock/123a-456-b-789z").get_locations_report()
assert len(report.servers_report) == 2
assert len(report.files_report["files"]) == 79
expected_ngas_id = "17A-109_2017_02_11_T18_49_09.756.tar"
rel_path = None
for file_spec in report.files_report["files"]:
if file_spec["ngas_file_id"] == expected_ngas_id:
found = True
rel_path = file_spec["relative_path"]
break
assert found
assert rel_path == expected_ngas_id
def test_service_returns_vlba_locations():
"""
Does FakeArchiveService return the calibrations locations report in
test_data?
:return:
"""
report = FakeArchiveService("uid://evla/vlba/uuid-du-jour31").get_locations_report()
assert len(report.servers_report) == 1
assert len(report.files_report["files"]) == 16
expected_agg_size = 2140560000
actual_agg_size = report.files_report["aggregate_size"]
assert actual_agg_size == expected_agg_size
def test_service_returns_cal_locations():
"""
Does FakeArchiveService return the calibrations locations report in
test_data?
:return:
"""
found = False
report = FakeArchiveService("uid://evla/calibration/different-uuid").get_locations_report()
assert len(report.servers_report) == 1
assert len(report.files_report) == 2
expected_subdir = "18B-265_2019_12_10_T00_00_59.203.tar"
for file_spec in report.files_report["files"]:
if file_spec["subdirectory"] == expected_subdir:
found = True
break
assert found
def test_service_returns_image_locations():
"""
Does FakeArchiveService return the image locations report in test_data?
:return:
"""
report = FakeArchiveService("uid://evla/image/some-uuid").get_locations_report()
assert isinstance(report, LocationsReportRefactor)
assert len(report.servers_report) == 2
assert len(report.files_report) == 2
expected_file_id = "uid____evla_image_56a10be7-f1c2-4788-8651-6ecc5bfbc2f1.fits"
found = False
for file_spec in report.files_report["files"]:
if file_spec["ngas_file_id"] == expected_file_id:
found = True
break
assert found
""" Unit tests for LocationsReportRefactor """
# pylint: disable=W0511, E0401, E0402
import pytest
from datafetcher.locations_report_refactor import (
LocationsReportRefactor,
ReportFileParseException,
UnknownLocatorException,
)
from .fake_archive_service import REPORT_FILES, get_test_data_dir
PROFILE = "docker"
def test_service_returns_expected_report():
"""
Does FakeArchiveService return a valid locations report?
:return:
"""
product_locator = "uid://evla/image/7a546de2-ab1f-4915-a8dc-94f572f9215c"
args = ["--product-locator", product_locator, "--profile", PROFILE]
report = LocationsReportRefactor(args)
assert report.files_report is not None
assert len(report.servers_report) == 2
assert len(report.files_report) == 2
def test_empty_file_fails_as_expected():
"""
When we attempt to create a locations report from a blank
.json file, do we get the expected error?
:return:
"""
file = get_test_data_dir() / REPORT_FILES["empty"]
args = ["--location-file", str(file), "--profile", PROFILE]
with pytest.raises(ReportFileParseException):
LocationsReportRefactor(args)
def test_bad_json_fails_as_expected():
"""
When we attempt to create a locations report from a badly-formed
.json file, do we get the expected error?
:return:
"""
file = get_test_data_dir() / REPORT_FILES["not_json"]
args = ["--location-file", str(file), "--profile", PROFILE]
with pytest.raises(ReportFileParseException):
LocationsReportRefactor(args)
def test_bad_locator_fails_as_expected():
"""
An invalid locator should throw UnknownLocatorException
:return:
"""
product_locator = "uid://this/is/b00000gus"
args = ["--product-locator", product_locator, "--profile", PROFILE]
with pytest.raises(UnknownLocatorException):
LocationsReportRefactor(args)
def test_missing_file_fails_as_expected():
"""
If the specified location report file isn't there,
locations report should throw a FNF
:return:
"""
file = get_test_data_dir() / "nonexistent.json"
args = ["--location-file", str(file), "--profile", PROFILE]
with pytest.raises(FileNotFoundError):
LocationsReportRefactor(args)
def test_file_returns_expected_report():
"""
Can we get a valid locations report from a report file?
:return:
"""
file = get_test_data_dir() / REPORT_FILES["vla_eb"]
args = ["--location-file", str(file), "--profile", PROFILE]
report = LocationsReportRefactor(args)
assert len(report.servers_report) == 2
assert len(report.files_report) == 2
# TODO: other location report tests from test_df_function, test_df_return_codes
......@@ -94,7 +94,7 @@ class DeliveryContext(DeliveryContextIF):
else:
# FIXME: this is gross
settings = CapoConfig().settings("edu.nrao.archive.workflow.config.DeliverySettings")
builder.local(pathlib.Path(settings.nraoDownloadDirectory) / self.token)
builder.local(pathlib.Path(settings.nraoDownloadDirectory) / "anonymous" / self.token)
# then handle the tar argument
if self.tar:
......
......@@ -48,7 +48,7 @@ class ExecutionBlock(SpooledProduct):
def deliver_to(self, destination: Destination):
eb_name = self.path.absolute().name
# let's use our directory name as the relative path
destination.add_directory((self.path / "rawdata" / eb_name), eb_name)
destination.add_directory(self.path, eb_name)
# Future types of product that might be needed:
......
......@@ -20,7 +20,7 @@ def test_local_rawdata_no_tar(tmpdir_factory):
main(["-r", "-l", temp_directory, test_data_path])
# compare the source and destination
compare_dirs = filecmp.dircmp(
temp_directory + "/" + eb_name, (test_data_path + eb_name + "/rawdata/" + eb_name)
temp_directory + "/" + eb_name, (test_data_path + eb_name)
)
# did the comparison report they are the same
assert (
......@@ -48,7 +48,7 @@ def test_local_rawdata_with_tar(tmpdir_factory):
assert os.path.exists(temp_directory + "/extracted/" + eb_name)
# compare the extracted results with the source
compare_dirs = filecmp.dircmp(
temp_directory + "/extracted/" + eb_name, (test_data_path + eb_name + "/rawdata/" + eb_name)
temp_directory + "/extracted/" + eb_name, (test_data_path + eb_name)
)
# is the source and extracted the same
assert (
......@@ -73,7 +73,7 @@ def test_web_rawdata_no_tar(tmpdir_factory):
destination_path = Delivery().deliver(test_context)
# compare the source and destination
compare_dirs = filecmp.dircmp(
destination_path / eb_name, f"{test_data_path}{eb_name}/rawdata/{eb_name}"
destination_path / eb_name, f"{test_data_path}{eb_name}"
)
# did the comparison report they are the same
assert (
......@@ -105,7 +105,7 @@ def test_web_rawdata_with_tar(tmpdir_factory):
assert os.path.exists(temp_directory + "/extracted/" + eb_name)
# compare the extracted results with the source
compare_dirs = filecmp.dircmp(
temp_directory + "/extracted/" + eb_name, (test_data_path + eb_name + "/rawdata/" + eb_name)
temp_directory + "/extracted/" + eb_name, (test_data_path + eb_name)
)
# is the source and extracted the same
assert (
......
......@@ -254,7 +254,7 @@ class WorkflowMonitor:
r"(?P<eventnum>[0-9]{3}) \((?P<jobnum>[0-9]{4})\.[0-9]{3}\.[0-9]{3}\) "
r"(?P<timestamp>[0-9]{2}/[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) "
r"(?P<desc>[a-zA-Z0-9 ?&:=<>\-._]+)\n"
r"(?P<body>[\t ]+[^\.]*\n)*"
r"(?P<body>[\t ]+[A-z0-9 ?_.(),: - \s]*\n)*"
r"..."
)
match = re.match(r_condor_event, event_str)
......
#!/usr/bin/python
# -*- coding: utf-8 -*-
from pathlib import Path
from setuptools import setup
VERSION = open("ws_metrics/_version.py").readlines()[-1].split()[-1].strip("\"'")
README = Path("README.md").read_text()
requires = [
"psycopg2",
"pycapo"
]
setup(
name="ssa-" + Path().absolute().name,
version=VERSION,
description="Workspaces metrics reporter for users outside of SSA.",
long_description=README,
author="NRAO SSA Team",
author_email="dms-ssa@nrao.edu",
url="TBD",
license="GPL",
install_requires=requires,
keywords=[],
packages=["ws_metrics"],
classifiers=["Programming Language :: Python :: 3.8"],
entry_points={"console_scripts": ["ws_metrics = ws_metrics.deep_thought:main"]},
)
"""
Testing suite for ws_metrics
"""
import argparse
import logging
import sys
from unittest.mock import patch
import pytest
from ws_metrics.deep_thought import LifeUniverseEverything
logger = logging.getLogger("test_ws_metrics")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
args_b = argparse.Namespace(between=['null', '2021-03-30T00:00:00', '2021-03-30T23:59:59'], capability=None)
result_b = 4
args_c = argparse.Namespace(between=None, capability='null')
result_c = 10
args_rd = argparse.Namespace(requestdatasize=['1'], datasize=None)
result_rd = 10
args_d = argparse.Namespace(requestdatasize=None, datasize=['2021-03-30T00:00:00', '2021-03-30T23:59:59'])
result_d = 100
def mock_leu(args: argparse.Namespace) -> LifeUniverseEverything:
with patch("psycopg2.connect") as mock_connect:
return LifeUniverseEverything(connection=mock_connect, args=args)
mock_leu_b = mock_leu(args_b)
mock_leu_c = mock_leu(args_c)
mock_leu_rd = mock_leu(args_rd)
mock_leu_d = mock_leu(args_d)
class TestWSMetrics:
def test_get_total_cap_executions(self):
mock_leu_c.conn.cursor.return_value.fetchone.return_value = result_c
assert args_c.capability == 'null'
value = mock_leu_c.get_total_cap_executions(args_c.capability)
assert value == result_c
def test_get_total_executions_in_range(self):
mock_leu_b.conn.cursor.return_value.fetchone.return_value = result_b
assert args_b.between[0] == 'null'
assert args_b.between[1] == '2021-03-30T00:00:00'
assert args_b.between[2] == '2021-03-30T23:59:59'
value = mock_leu_b.get_total_executions_in_range(args_b.between[0],
args_b.between[1],
args_b.between[2])
assert value == result_b
def test_get_total_data_volume(self):
mock_leu_d.conn.cursor.return_value.fetchone.return_value = result_d
assert args_d.datasize[0] == '2021-03-30T00:00:00'
assert args_d.datasize[1] == '2021-03-30T23:59:59'
value = mock_leu_d.get_total_data_volume(args_d.datasize[0],
args_d.datasize[1])
assert value == result_d
def test_get_request_data_volume(self):
mock_leu_rd.conn.cursor.return_value.fetchone.return_value = result_rd
assert args_rd.requestdatasize[0] == '1'
value = mock_leu_rd.get_request_data_volume('1')
assert value == result_rd
""" Version information for this package, don't put anything else here. """
___version___ = '4.0.0a1.dev1'
# Metrics make the world go round.
# Yes, this is a reference to Hitchhikers Guide to the Galaxy.
"""
scripted access to the metrics database views for data analysts
"""
import argparse
import datetime
import pprint
from typing import Dict
import psycopg2
import psycopg2.extras
import logging
import sys
from pycapo import CapoConfig
logger = logging.getLogger("ws_metrics")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
class LifeUniverseEverything:
def __init__(self, connection, args: argparse.Namespace):
self.conn = connection
self.args = args
def get_total_cap_executions(self, capability_name: str):
sql = "SELECT count(*) From capability_execution_metrics WHERE cap_name = \'" + capability_name[0] + "\'"
cursor = self.conn.cursor()
cursor.execute(sql)
total_number = cursor.fetchone()
print("SQL query used: " + sql)
return total_number
def get_total_executions_in_range(self,
capability_name: str,
beginning_date: datetime,
end_date: datetime):
sql = "SELECT count(*) FROM capability_execution_metrics WHERE cap_name = \'" + capability_name +\
"\' AND execution_created between %(beginning)s and %(end)s"
cursor = self.conn.cursor()
cursor.execute(sql, {'beginning': beginning_date, 'end': end_date})
total_number = cursor.fetchone()
print("SQL query used: " + sql)
return total_number
def get_request_data_volume(self, request_id: str):
sql = "SELECT sum(datasize) FROM download_metrics WHERE request_id = " + request_id +\
" AND state = \'Complete\' GROUP BY request_id"
cursor = self.conn.cursor()
cursor.execute(sql)
data_volume = cursor.fetchone()
print("SQL query used: " + sql)
return data_volume
def get_total_data_volume(self,
beginning_date: datetime,
end_date: datetime):
sql = "SELECT sum(datasize) FROM download_metrics WHERE state = \'Complete\' " \
"AND execution_created between %(beginning)s and %(end)s"
cursor = self.conn.cursor()
cursor.execute(sql, {'beginning': beginning_date, 'end': end_date})
data_volume = cursor.fetchone()
print("SQL query used: " + sql)
return data_volume
def run_metrics(self, args: argparse.Namespace):
if args.capability:
result = self.get_total_cap_executions(args.capability)
print("\nTotal number of " + args.capability[0] + " capability executions: " + str(result[0]) + "\n")
if args.between:
result = self.get_total_executions_in_range(args.between[0], args.between[1], args.between[2])
print("\nTotal number of " + args.between[0] +
" executions between " + args.between[1] +
" and " + args.between[2] + ": " + str(result[0]) + "\n")
if args.requestdatasize:
result = self.get_request_data_volume(args.requestdatasize[0])
print(result)
if result is not None:
print("\nDownload data size for request #" + args.requestdatasize[0] + " : " + str(result[0]))
else:
print("Request #" + args.requestdatasize[0] + " not found!")
if args.datasize:
result = self.get_total_data_volume(args.datasize[0], args.datasize[1])
print(result)
if result[0] is not None:
print("\nTotal volume of downloaded data for range " + args.datasize[0] + " to " +
args.datasize[1] + ": " + str(result[0]))
else:
print("Error: No download capability requests were found!")
def parser() -> argparse.ArgumentParser:
"""
Initialize argument parser for command-line arguments
:return: Argument parser
"""
parser = argparse.ArgumentParser(
description="Workspaces metrics reporter",
formatter_class=argparse.RawTextHelpFormatter
)
parser.add_argument(
"-c",
"--capability",
nargs=1,
action="store",
required=False,
help="report number of times a capability has executed"
)
parser.add_argument(
"-b",
"--between",
nargs=3,
action="store",
required=False,
help="report the number of times a capability has executed between two user provided timestamps."
)
parser.add_argument(
"-rd",
"--requestdatasize",
nargs=1,
action="store",
required=False,
help="reports the volume of data downloaded for this request in kilobytes."
)
parser.add_argument(
"-d",
"--datasize",
nargs=2,
action="store",
required=False,
help="reports the total volume of data downloaded via the workspaces system over a specified period of time."
)
return parser
def _get_capo_settings():
return CapoConfig().settings("metadataDatabase")
def main():
print("**********************************")
print("* WELCOME TO WORKSPACES METRICS! *")
print("**********************************")
args = parser().parse_args()
settings = _get_capo_settings()
host, dbname = settings['jdbcUrl'].split(':')[2][2:].split('/')
try:
conn = psycopg2.connect(
host=host,
database=dbname,
user=settings.jdbcUsername,
password=settings.jdbcPassword
)
except:
print("Unable to connect to database.")
LifeUniverseEverything(conn, args).run_metrics(args)
conn.close()
......@@ -23,7 +23,7 @@ RUN ./node_modules/.bin/ng build --configuration=dev --output-path=dist
FROM nginx:1.19.7-alpine
# Copy WS nginx config from base-build stage
COPY --from=base-build /app/ws-nginx.conf /etc/nginx/conf.d/
COPY --from=base-build /app/ws-nginx.conf.template /etc/nginx/templates/
# Remove default nginx config
RUN rm /etc/nginx/conf.d/default.conf
......@@ -33,6 +33,3 @@ COPY --from=base-build /app/dist /usr/share/nginx/html
# Expose port
EXPOSE 4444
# Run nginx with daemon off to run as foreground process
CMD nginx -g "daemon off;"
\ No newline at end of file
......@@ -12,7 +12,7 @@ exports.config = {
'browserName': 'chrome'
},
directConnect: true,
baseUrl: 'http://localhost:4200/',
baseUrl: 'http://localhost:4444/',
framework: 'jasmine',
jasmineNodeOpts: {
showColors: true,
......@@ -25,4 +25,4 @@ exports.config = {
});
jasmine.getEnv().addReporter(new SpecReporter({ spec: { displayStacktrace: true } }));
}
};
\ No newline at end of file
};