Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (16)
Showing
with 645 additions and 65 deletions
......@@ -95,7 +95,7 @@ class DeliveryConveyor(ConveyorIF):
def move_subdirectories_to_qa_area(self, qa_path: str):
# ignore non-casa related directories
transfer_list = ["rawdata/", "working/", "products/"]
transfer_list = ["working/", "products/"]
if not pathlib.Path(qa_path).exists():
os.makedirs(qa_path)
......
......@@ -84,7 +84,7 @@ class TestDeliveryConveyor:
@patch("os.chown")
@patch("os.makedirs")
def test_move_subdirectories_to_qa_area(self, mock_dirs, mock_chown, mock_chmod, mock_move, mock_system, mock_link):
with patch("glob.glob", MagicMock(return_value=["rawdata/", "working/", "products/"])) as contents:
with patch("glob.glob", MagicMock(return_value=["working/", "products/"])) as contents:
path = "/lustre/aoc/cluster/pipeline/docker/workspaces/qa2/tmpabcd1234"
conveyor.move_subdirectories_to_qa_area(path)
assert mock_dirs.call_count == 1
......
......@@ -36,7 +36,6 @@ settings = {
action = "Calibration Retrieval"
PRODUCTS = "products/"
RAWDATA = "rawdata/"
WORKING = "working/"
conveyor = RetrievalConveyor(settings=settings, action=action)
......@@ -71,7 +70,7 @@ class TestRetrievalConveyor:
@patch("pathlib.Path.unlink")
def test_break_symlinks(self, mock_unlink, mock_sym):
path = fake_determine_spool()
dir_list = [WORKING, RAWDATA, PRODUCTS]
dir_list = [WORKING, PRODUCTS]
conveyor.break_symlinks(path, dir_list)
assert mock_sym.call_count == 3
assert mock_unlink.call_count == 3
......@@ -80,7 +79,7 @@ class TestRetrievalConveyor:
def test_move_subdirectories_to_spool(self, mock_move):
qa_path = fake_determine_qa()
spool_path = fake_determine_spool()
dir_list = [WORKING, RAWDATA, PRODUCTS]
dir_list = [WORKING, PRODUCTS]
conveyor.move_subdirectories_to_spool(qa_path, spool_path, dir_list)
assert mock_move.call_count == 3
......@@ -92,11 +91,11 @@ class TestRetrievalConveyor:
path = conveyor.determine_spool_directory()
assert isinstance(path, Path)
@patch("glob.glob", return_value=[WORKING, RAWDATA, PRODUCTS])
@patch("glob.glob", return_value=[WORKING, PRODUCTS])
@patch("os.chdir")
def test_check_spool_contents(self, mock_chdir, mock_glob):
spool = fake_determine_spool()
expected_list = [WORKING, RAWDATA, PRODUCTS]
expected_list = [WORKING, PRODUCTS]
check = conveyor.check_spool_contents(spool, expected_list)
assert check is True
......
......@@ -403,6 +403,25 @@ urllib3 = ">=1.21.1,<3"
socks = ["PySocks (>=1.5.6,!=1.5.7)"]
use-chardet-on-py3 = ["chardet (>=3.0.2,<6)"]
[[package]]
name = "requests-mock"
version = "1.11.0"
description = "Mock out responses from the requests package"
optional = false
python-versions = "*"
files = [
{file = "requests-mock-1.11.0.tar.gz", hash = "sha256:ef10b572b489a5f28e09b708697208c4a3b2b89ef80a9f01584340ea357ec3c4"},
{file = "requests_mock-1.11.0-py2.py3-none-any.whl", hash = "sha256:f7fae383f228633f6bececebdab236c478ace2284d6292c6e7e2867b9ab74d15"},
]
[package.dependencies]
requests = ">=2.3,<3"
six = "*"
[package.extras]
fixture = ["fixtures"]
test = ["fixtures", "mock", "purl", "pytest", "requests-futures", "sphinx", "testtools"]
[[package]]
name = "six"
version = "1.16.0"
......@@ -526,4 +545,4 @@ zstd = ["zstandard (>=0.18.0)"]
[metadata]
lock-version = "2.0"
python-versions = "~3.10"
content-hash = "edc013fdb92304e119efddb9816b50ee2a878e201501d390ac85965898a0f94d"
content-hash = "1192eb11c1a6de88d0faa51b3558a9a7fd3f660b82dd4906868f09b9f544834a"
......@@ -16,6 +16,7 @@ sqlalchemy = "1.4.49"
[tool.poetry.group.test.dependencies]
pytest = "7.4.2"
requests-mock = "1.11.0"
[tool.poetry.scripts]
ws_annihilator = "ws_annihilator.annihilator:main"
......
# Copyright (C) 2023 Associated Universities, Inc. Washington DC, USA.
#
# This file is part of NRAO Workspaces.
#
# Workspaces is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Workspaces is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
#
import shutil
from itertools import chain
from pathlib import Path
from typing import Callable, List, Optional
import pytest
import ws_annihilator
from requests_mock import Mocker
from sqlalchemy.util import itertools_filterfalse
from ws_annihilator import annihilator
from ws_annihilator.annihilator import (
FAILURE_NOTIFICATION_TEMPLATE_NAME,
Annihilator,
CacheAnnihilator,
Settings,
SpoolAnnihilator,
_arg_parser,
run_annihilator,
)
MOCKED_SERVER_URL = "http://nonexistent-host"
MOCKED_NOTIFICATION_PORT = 3458
MOCKED_WORKFLOW_PORT = 3456
MOCKED_WORKFLOW_URL = f"{MOCKED_SERVER_URL}:{MOCKED_WORKFLOW_PORT}"
MOCKED_NOTIFICATION_URL = f"{MOCKED_SERVER_URL}:{MOCKED_NOTIFICATION_PORT}"
def make_mocked_rmtree(dirs_with_bad_permissions: List[Path]) -> Callable[[str], None]:
"""Make a replacement for shutil.rmtree that just raises a PermissionError when called on certain directories
:param dirs_with_bad_permissions: Absolute paths of the directories to raise a PermissionError on
:return: The mocked replacement for rmtree
"""
dir_names_with_bad_permissions = [dir_path.name for dir_path in dirs_with_bad_permissions]
def mocked_rmtree(path: str):
if path in dir_names_with_bad_permissions:
raise PermissionError()
return mocked_rmtree
def requests_mock_notify_good(requests_mock: Mocker) -> Mocker:
"""Mock requests assuming notifications get sent successfully"""
url = f"{MOCKED_NOTIFICATION_URL}/notify/ws_annihilator_failure/send"
requests_mock.post(
url,
status_code=200,
json={"message": "Email sent", "errors": {}},
headers={
"Content-Length": "39",
"Content-Type": "application/json",
"Date": "Mon, 06 Feb 2023 18:06:10 GMT",
"Server": "waitress",
},
)
return requests_mock
def requests_mock_spool_annihilator(
requests_mock: Mocker, annihilator: SpoolAnnihilator, stale_dirs: List[str]
) -> Mocker:
"""Mock the two endpoints that a SpoolAnnihilator needs to access"""
requests_mock.get(
annihilator.stale_directory_url,
status_code=200,
json={"stale_dirs": ",".join(stale_dirs)},
)
requests_mock.put(annihilator.clean_url, status_code=200)
return requests_mock
def setup_dir_with_file(tmp_path: Path, dir_name: str) -> Path:
dir_with_permissions_issue = tmp_path / dir_name
dir_with_permissions_issue.mkdir()
failed_file = dir_with_permissions_issue / "failure.txt"
failed_file.touch(exist_ok=False)
return dir_with_permissions_issue
def make_settings_for_dir(dir: Path) -> Settings:
days = str(10)
return {
"notification_url": MOCKED_NOTIFICATION_URL,
"failure_notification_email": "nonexistent@email.address",
"workflow_url": MOCKED_WORKFLOW_URL,
"cache": str(dir),
"spool": str(dir),
"storage": str(dir),
"staging": str(dir),
"spool_days": days,
"cache_days": days,
"staging_days": days,
"storage_days": days,
"calibration_days": days,
"image_days": days,
}
def test_annihilate_directory_failure(tmp_path: Path, requests_mock: Mocker, monkeypatch: pytest.MonkeyPatch):
args = _arg_parser().parse_args(["--profile", "nonexistent", "--all"])
dir_to_annihilate = setup_dir_with_file(tmp_path, "tmp12345")
annihilator = SpoolAnnihilator(args, make_settings_for_dir(dir_to_annihilate))
requests_mock = requests_mock_spool_annihilator(requests_mock, annihilator, [str(dir_to_annihilate)])
# Mock rmtree to force a PermissionError, causing annihilate_directory() to fail
monkeypatch.setattr("shutil.rmtree", make_mocked_rmtree([dir_to_annihilate]))
did_fail = SpoolAnnihilator(args, make_settings_for_dir(dir_to_annihilate)).annihilate_directory(
str(dir_to_annihilate)
)
assert did_fail is True
# The filesystem won't have changed since rmtree is mocked, so no need to assert about it
def test_annihilate_directory_success(tmp_path: Path, requests_mock: Mocker):
args = _arg_parser().parse_args(["--profile", "nonexistent", "--all"])
dir_to_annihilate = setup_dir_with_file(tmp_path, "tmp12345")
assert dir_to_annihilate.exists()
annihilator = SpoolAnnihilator(args, make_settings_for_dir(dir_to_annihilate))
requests_mock = requests_mock_spool_annihilator(requests_mock, annihilator, [str(dir_to_annihilate)])
did_succeed = annihilator.annihilate_directory(str(dir_to_annihilate))
assert did_succeed is True
assert not dir_to_annihilate.exists()
def test_multiple_failures(tmp_path: Path, requests_mock: Mocker, monkeypatch: pytest.MonkeyPatch):
expected_failed_dirs = [setup_dir_with_file(tmp_path, f"failure-{i}") for i in range(3)]
expected_successful_dir = setup_dir_with_file(tmp_path, "success")
all_dirs = list(chain(expected_failed_dirs, [expected_successful_dir]))
assert all(dir.exists() for dir in all_dirs)
# Need to mock rmtree to get it to raise a PermissionError, triggering the notification
monkeypatch.setattr("shutil.rmtree", make_mocked_rmtree(expected_failed_dirs))
args = _arg_parser().parse_args(["--profile", "nonexistent"]) # Set up for spool annihilation
settings = make_settings_for_dir(tmp_path)
requests_mock = requests_mock_notify_good(requests_mock)
# Needed for requests mocking since it knows what endpoints any instance with the same args and settings need to access
annihilator = SpoolAnnihilator(args, settings)
requests_mock = requests_mock_spool_annihilator(
requests_mock, annihilator, [str(dir_path) for dir_path in all_dirs]
)
run_annihilator(args, settings)
assert requests_mock.call_count > 1
assert requests_mock.last_request
notification_request = requests_mock.last_request
assert notification_request.port == MOCKED_NOTIFICATION_PORT
assert FAILURE_NOTIFICATION_TEMPLATE_NAME in notification_request.path
assert notification_request.json().get("failed_directories") is not None
assert set(notification_request.json()["failed_directories"]) == set(
str(dir_path) for dir_path in expected_failed_dirs
)
# The filesystem won't have changed since rmtree is mocked, so no need to assert about it
# Copyright (C) 2023 Associated Universities, Inc. Washington DC, USA.
#
# This file is part of NRAO Workspaces.
#
# Workspaces is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Workspaces is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
#
"""
Facility for removing old or QAFailed files and directories from the Workspaces lustre areas
* Download area clean up is handled by the AAT/PPI mr_clean system
......@@ -7,22 +24,22 @@ import argparse
import datetime
import logging
import os
import requests
import shutil
import sys
from argparse import ArgumentParser
from dateutil.relativedelta import relativedelta
from argparse import ArgumentParser, Namespace
from enum import Enum
from pathlib import Path
from typing import Dict, List
from typing import Dict, List, Optional, Set, TypedDict
import requests
from dateutil.relativedelta import relativedelta
from pycapo import CapoConfig
from ws_annihilator.connections import MDDBConnector
from ws_annihilator.connections import MDDBConnector, send_notification
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
FAILURE_NOTIFICATION_TEMPLATE_NAME = "ws_annihilator_failure"
class Areas(Enum):
......@@ -41,7 +58,25 @@ class TypedAnnihilators(Enum):
RESTORE_IMG = "std_restore_imaging"
def _get_settings(profile: str) -> Dict[str, str]:
Settings = TypedDict(
"Settings",
notification_url=str,
failure_notification_email=str,
workflow_url=str,
cache=str,
spool=str,
storage=str,
staging=str,
cache_days=str,
spool_days=str,
staging_days=str,
storage_days=str,
calibration_days=str,
image_days=str,
)
def _get_settings(profile: str) -> Settings:
"""
Get annihilator capo settings
......@@ -51,8 +86,11 @@ def _get_settings(profile: str) -> Dict[str, str]:
annihilator_settings = config.settings("edu.nrao.workspaces.AnnihilatorSettings")
ingestion_settings = config.settings("edu.nrao.workspaces.IngestionSettings")
notification_settings = config.settings("edu.nrao.workspaces.NotificationSettings")
workflow_url = config.settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl
notification_url = notification_settings.serviceUrl
failure_notification_email = notification_settings.analystEmail
cache_directory = config.settings("edu.nrao.workspaces.DeliverySettings").cacheWeblogDirectory
spool_directory = config.settings("edu.nrao.workspaces.ProcessingSettings").rootDirectory
staging_directory = ingestion_settings.stagingDirectory
......@@ -60,6 +98,8 @@ def _get_settings(profile: str) -> Dict[str, str]:
return {
"workflow_url": workflow_url,
"notification_url": notification_url,
"failure_notification_email": failure_notification_email,
"cache": cache_directory,
"spool": spool_directory,
"storage": storage_directory,
......@@ -144,11 +184,12 @@ class Annihilator:
- If directory doesn't exist, sets 'cleaned' flag on associated request
"""
def __init__(self):
self._args = _arg_parser().parse_args()
self._settings = _get_settings(self._args.profile)
def __init__(self, args: Namespace, settings: Settings):
self._args = args
self._settings = settings
self._area_to_clean = None
self._flag_cleaned = False
self.clean_url = self._settings.get("workflow_url") + "/workflows/requests/cleaned"
def determine_path(self) -> str:
"""
......@@ -229,12 +270,12 @@ class Annihilator:
return name_list
def annihilate_directory(self, directory: str):
def annihilate_directory(self, directory: str) -> bool:
"""
Annihilate specified directory
:param directory: directory to remove
:return:
:return: True if annihilation succeeded, false otherwise
"""
path = self.determine_path()
......@@ -251,8 +292,10 @@ class Annihilator:
f"Please check directory for non-vlapipe owned additions. %s",
exc_info=pe,
)
return False
except Exception as e:
logger.error(f"ERROR: {clean_me} encountered the following problem: %s", exc_info=e)
return False
# After annihilation set cleaned flag...
if self._flag_cleaned and not self._args.dry_run:
......@@ -260,6 +303,7 @@ class Annihilator:
self.set_cleaned(clean_me)
else:
logger.info(f"Annihilation complete for {directory} in {self._area_to_clean}.")
return True
def set_cleaned(self, directory: str):
"""
......@@ -268,17 +312,17 @@ class Annihilator:
:param directory: the directory that was annihilated
:return:
"""
clean_url = self._settings.get("workflow_url") + "/workflows/requests/cleaned"
json_payload = {"directory": directory, "update_to": "True"}
return requests.put(clean_url, json=json_payload)
return requests.put(self.clean_url, json=json_payload)
def annihilate(self):
def annihilate(self) -> Set[str]:
"""
Remove stale directories from targeted workspaces areas
:return:
"""
failed_directories = set()
stale = self.get_stale_directories()
# If stale directories were found, annihilate them
......@@ -301,21 +345,33 @@ class Annihilator:
logger.info(f"directory {dir_path} not found in {self._area_to_clean}, skipping...")
else:
logger.info(f"found directory {directory}, annihilating...")
self.annihilate_directory(directory)
did_succeed = self.annihilate_directory(directory)
if did_succeed is False:
failed_directories.add(dir_path)
else:
logger.error(f"Error: Received invalid directory '{dir_path}' as input for annihilation. Aborting.")
exit(1)
return failed_directories
class SpoolAnnihilator(Annihilator):
"""
Annihilates stale spool directories from stale workflow requests.
"""
def __init__(self):
super().__init__()
def __init__(self, args: Namespace, settings: Settings):
super().__init__(args, settings)
self._area_to_clean = Areas.SPOOL
self._flag_cleaned = True
@property
def stale_directory_url(self) -> str:
return (
self._settings.get("workflow_url")
+ "/workflows/requests/stale/"
+ self.get_days_to_keep(self._area_to_clean.value)
)
def get_stale_directories(self) -> List[str]:
"""
It is possible for processing to take multiple days to several weeks. While a job is running in HTCondor, it's
......@@ -328,12 +384,7 @@ class SpoolAnnihilator(Annihilator):
:return: A list of directory names to be removed
"""
annihilation_url = (
self._settings.get("workflow_url") + "/workflows/requests/stale/"
+ self.get_days_to_keep(self._area_to_clean.value)
)
response = requests.get(annihilation_url)
response = requests.get(self.stale_directory_url)
dir_list = response.json()["stale_dirs"]
name_list = []
......@@ -351,8 +402,9 @@ class CacheAnnihilator(SpoolAnnihilator):
"""
Annihilates stale cache directories from stale workflow requests (uses the same stales as the SpoolAnnihilator).
"""
def __init__(self):
super().__init__()
def __init__(self, args: Namespace, settings: Settings):
super().__init__(args, settings)
self._area_to_clean = Areas.CACHE
self._flag_cleaned = False
......@@ -361,8 +413,9 @@ class StorageAnnihilator(SpoolAnnihilator):
"""
Annihilates stale storage directories from stale workflow requests (uses the same stales as the SpoolAnnihilator).
"""
def __init__(self):
super().__init__()
def __init__(self, args: Namespace, settings: Settings):
super().__init__(args, settings)
self._area_to_clean = Areas.STORAGE
self._flag_cleaned = False
......@@ -371,8 +424,9 @@ class StagingAnnihilator(Annihilator):
"""
Annihilates stale staging directories.
"""
def __init__(self):
super().__init__()
def __init__(self, args: Namespace, settings: Settings):
super().__init__(args, settings)
self._area_to_clean = Areas.STAGING
def get_stale_directories(self) -> List[str]:
......@@ -398,8 +452,9 @@ class WorkflowAnnihilator(SpoolAnnihilator):
"""
Annihilates stale spool directories from stale workflow requests for a single workflow.
"""
def __init__(self, workflow: str):
super().__init__()
def __init__(self, workflow: str, args: Namespace, settings: Settings):
super().__init__(args, settings)
self._workflow = workflow
def get_stale_directories(self) -> List[str]:
......@@ -423,6 +478,7 @@ class CalibrationAnnihilator(WorkflowAnnihilator):
"""
Annihilates stale spool directories from stale calibration requests.
"""
def get_stale_directories(self) -> List[str]:
"""
A stale calibration request is defined as one that hasn't been flagged as SRDP within self.get_days_to_keep().
......@@ -444,7 +500,7 @@ class CalibrationAnnihilator(WorkflowAnnihilator):
params = {
"wf_name": self._workflow,
"time_to_keep": (datetime.datetime.now() - relativedelta(days=int(self.get_days_to_keep('calibration'))))
"time_to_keep": (datetime.datetime.now() - relativedelta(days=int(self.get_days_to_keep("calibration")))),
}
return self.get_stale_directories_from_query(query, params)
......@@ -454,6 +510,7 @@ class ImageAnnihilator(WorkflowAnnihilator):
"""
Annihilates stale spool directories from completed image requests and their associated calibrations.
"""
def get_stale_directories(self) -> List[str]:
"""
A completed image request is defined as one that is in a completed state and ingested after
......@@ -476,7 +533,7 @@ class ImageAnnihilator(WorkflowAnnihilator):
params = {
"wf_name": self._workflow,
"time_to_keep": (datetime.datetime.now() - relativedelta(days=int(self.get_days_to_keep('image'))))
"time_to_keep": (datetime.datetime.now() - relativedelta(days=int(self.get_days_to_keep("image")))),
}
stales = self.get_stale_directories_from_query(query, params)
......@@ -510,7 +567,7 @@ class ImageAnnihilator(WorkflowAnnihilator):
params = {
"wf_name": self._workflow,
"time_to_keep": (datetime.datetime.now() - relativedelta(days=int(self.get_days_to_keep('image'))))
"time_to_keep": (datetime.datetime.now() - relativedelta(days=int(self.get_days_to_keep("image")))),
}
stales.extend(self.get_stale_directories_from_query(query, params))
......@@ -518,54 +575,75 @@ class ImageAnnihilator(WorkflowAnnihilator):
return stales
def main():
"""
run the annihilator based on provided args
:return:
"""
args = _arg_parser().parse_args()
def run_annihilator(args: Namespace, settings: Settings):
failed_directories: Set[str] = set()
if args.all:
logger.info(f"Starting cleaning of Workspaces Staging for profile {args.profile}")
StagingAnnihilator().annihilate()
current_failures = StagingAnnihilator(args, settings).annihilate()
failed_directories.update(current_failures)
logger.info(f"Starting cleaning of Workspaces Storage for profile {args.profile}")
StorageAnnihilator().annihilate()
current_failures = StorageAnnihilator(args, settings).annihilate()
failed_directories.update(current_failures)
# Annihilating spool last because it flags the workflow as being cleaned
logger.info(f"Starting cleaning of Workspaces Spool for profile {args.profile}")
SpoolAnnihilator().annihilate()
current_failures = SpoolAnnihilator(args, settings).annihilate()
failed_directories.update(current_failures)
logger.info(
f"Finished cleaning Workspaces Spool, Staging, and Storage areas for profile {args.profile}!"
)
logger.info(f"Finished cleaning Workspaces Spool, Staging, and Storage areas for profile {args.profile}!")
# Clean completed workflows sooner than the stale time limit
logger.info(f"Starting cleaning of non-SRDP std_calibration workflows for profile {args.profile}")
CalibrationAnnihilator(TypedAnnihilators.CAL.value).annihilate()
current_failures = CalibrationAnnihilator(TypedAnnihilators.CAL.value, args, settings).annihilate()
failed_directories.update(current_failures)
logger.info(f"Starting cleaning of completed imaging workflows for profile {args.profile}")
ImageAnnihilator(TypedAnnihilators.CMS.value).annihilate()
current_failures = ImageAnnihilator(TypedAnnihilators.CMS.value, args, settings).annihilate()
failed_directories.update(current_failures)
logger.info(f"Finished cleaning spool areas for completed calibrations and images for profile {args.profile}!")
elif args.cache:
logger.info(f"Starting cleaning of Workspaces Cache for profile {args.profile}")
CacheAnnihilator().annihilate()
failed_directories = CacheAnnihilator(args, settings).annihilate()
logger.info(f"Finished cleaning Workspaces Cache for profile {args.profile}!")
elif args.curator:
logger.info(f"Starting cleaning of completed Curator workflows for profile {args.profile}")
WorkflowAnnihilator(TypedAnnihilators.CURATOR.value).annihilate()
failed_directories = WorkflowAnnihilator(TypedAnnihilators.CURATOR.value, args, settings).annihilate()
logger.info(f"Finished cleaning completed Curator workflows for profile {args.profile}!")
elif args.calibration:
logger.info(f"Starting cleaning of non-SRDP std_calibration workflows for profile {args.profile}")
CalibrationAnnihilator(TypedAnnihilators.CAL.value).annihilate()
failed_directories = CalibrationAnnihilator(TypedAnnihilators.CAL.value, args, settings).annihilate()
logger.info(f"Finished cleaning completed non-SRDP std_calibration workflows for profile {args.profile}!")
elif args.image:
logger.info(f"Starting cleaning of completed imaging workflows for profile {args.profile}")
ImageAnnihilator(TypedAnnihilators.CMS.value).annihilate()
failed_directories = ImageAnnihilator(TypedAnnihilators.CMS.value, args, settings).annihilate()
logger.info(f"Finished cleaning completed imaging workflows for profile {args.profile}!")
else:
logger.info(f"Starting cleaning of Workspaces Spool for profile {args.profile}")
SpoolAnnihilator().annihilate()
failed_directories = SpoolAnnihilator(args, settings).annihilate()
logger.info(f"Finished cleaning Workspaces Spool for profile {args.profile}!")
if failed_directories:
logger.info(
f"One or more directories weren't annihilated successfully, sending email notification to {settings['failure_notification_email']}"
)
send_notification(
notification_url=settings["notification_url"],
template=FAILURE_NOTIFICATION_TEMPLATE_NAME,
parameters={
"failed_directories": list(failed_directories),
"destination_email": settings["failure_notification_email"],
},
)
def main():
"""
run the annihilator based on provided args
:return:
"""
args = _arg_parser().parse_args()
settings = _get_settings(args.profile)
run_annihilator(args, settings)
# Copyright (C) 2023 Associated Universities, Inc. Washington DC, USA.
#
# This file is part of NRAO Workspaces.
#
# Workspaces is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Workspaces is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
#
import logging
from time import sleep
from typing import Optional
import psycopg2 as pg
import requests
from pycapo import CapoConfig
from sqlalchemy.engine import Connection
......@@ -7,7 +28,7 @@ logger = logging.getLogger("ws_annihilator")
class MDDBConnector:
"""Use this connection to interrogate the archive"""
"""Use this connection to question the archive"""
def __init__(self):
self.connection = self._connect_to_db()
......@@ -47,3 +68,32 @@ class MDDBConnector:
def commit(self):
self.connection.commit()
def send_notification(notification_url: str, template: str, parameters: Optional[dict] = None) -> None:
"""Send an email notification through the notification service
:param notification_url: URL of the notification service to use
:param template: Name of mustache template for email, must correspond to entry in the notification_templates table
:param parameters: Parameters for given template
:raises RuntimeError: When the notification service fails to send the email
"""
if parameters is None:
parameters = dict()
url = f"{notification_url}/notify/{template}/send"
response = requests.post(url, json=parameters)
if response.status_code != 200:
# wait 5 seconds and try again: There is an issue with message collisions in the notification system,
# this should be unnecessary once it it resolved.
sleep(5)
response = requests.post(url, json=parameters)
if response.status_code != 200:
raise RuntimeError(
f"Notification failure: Expected 200 (OK) response, got {response.status_code} response instead: {response.text}"
)
try:
json = response.json()
except requests.JSONDecodeError:
raise RuntimeError(f"Notification failure: Expected JSON response, instead got: {response.text}")
if json["errors"] or json["message"] != "Email sent":
raise RuntimeError(f"Notification failure: Expected response with no errors, instead got: {json}")
# Copyright (C) 2023 Associated Universities, Inc. Washington DC, USA.
#
# This file is part of NRAO Workspaces.
#
# Workspaces is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Workspaces is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
#
"""ws_ann failure notification template
Revision ID: 7abf54ae3fe5
Revises: 7ec92e7da566
Create Date: 2024-03-28 09:12:39.453333
"""
from pathlib import Path
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "7abf54ae3fe5"
down_revision = "7ec92e7da566"
branch_labels = None
depends_on = None
content = (Path.cwd() / "versions" / "templates" / "emails" / "ws_annihilator_failure_2.8.3.txt").read_text()
template_name = "ws_annihilator_failure"
def upgrade():
op.execute(
f"""INSERT INTO notification_templates (name, description, template) VALUES ('{template_name}', 'Sent when ws_annihilator fails to remove one or more directories', E'{content}')"""
)
def downgrade():
op.execute(f"""DELETE FROM notification_templates WHERE name = {template_name}""")
"""move raw data
Revision ID: 82f7b57c8ad7
Revises: 7ec92e7da566
Create Date: 2024-03-25 11:53:42.954521
"""
from alembic import op
from pathlib import Path
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '82f7b57c8ad7'
down_revision = '7ec92e7da566'
branch_labels = None
depends_on = None
old_fetch = "fetch_sh_2.8.2.txt"
new_fetch = "fetch_sh_2.8.3.txt"
def set_wf_content(wf_name: str, filename: str) -> bytes:
return (Path.cwd() / "versions" / "templates" / wf_name / filename).read_text().encode()
def upgrade():
conn = op.get_bind()
# Allow fetch templates to reuse rawdata
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'std_calibration_fetch.sh'
""",
set_wf_content("std_calibration", new_fetch),
)
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'restore_cms_fetch.sh'
""",
set_wf_content("restore_cms", new_fetch),
)
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'std_restore_imaging_fetch.sh'
""",
set_wf_content("std_restore_imaging", new_fetch),
)
def downgrade():
conn = op.get_bind()
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'std_calibration_fetch.sh'
""",
set_wf_content("std_calibration", old_fetch),
)
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'restore_cms_fetch.sh'
""",
set_wf_content("restore_cms", old_fetch),
)
conn.execute(
f"""
UPDATE workflow_templates
SET content = %s
WHERE filename = 'std_restore_imaging_fetch.sh'
""",
set_wf_content("std_restore_imaging", old_fetch),
)
Subject: Workspaces failed to remove some directories
Hello,
The Workspaces automated cleanup tool, ws_annihilator, failed to remove the following directories:
{{#failed_directories}}
* {{.}}
{{/failed_directories}}
{{^failed_directories}}Sorry, this email was sent in error, all directories were removed correctly.{{/failed_directories}}
#!/bin/sh
set -o errexit
export HOME={{spool_dir}}
SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin
cd {{spool_dir}}
$SBIN_PATH/framework.sh -d .
chmod 770 .
cd rawdata/
$SBIN_PATH/productfetcher --product-locator $1 --product-locator $2
cd ../
#!/bin/sh
set -o errexit
export HOME={{spool_dir}}
export PREV_SPOOL={{prev_spool_dir}}
SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin
${SBIN_PATH}/update_stage FETCH
cd {{spool_dir}}
$SBIN_PATH/framework.sh -d .
chmod 770 .
if [ ! -z "$PREV_SPOOL" ] && [ -d $PREV_SPOOL/rawdata/ ] && [ ! -z "$(ls -A $PREV_SPOOL/rawdata/)" ]; then
# Reuse the raw data from the previous version
mv $PREV_SPOOL/rawdata/ .
else
# Run the product fetcher
cd rawdata/
$SBIN_PATH/productfetcher --product-locator $1
cd ../
fi
${SBIN_PATH}/update_stage FETCH --complete
#!/bin/sh
set -o errexit
export HOME={{spool_dir}}
SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin
${SBIN_PATH}/update_stage FETCH
cd {{spool_dir}}
$SBIN_PATH/framework.sh -d .
chmod 770 .
cd rawdata/
$SBIN_PATH/productfetcher --product-locator $1
cd ../
${SBIN_PATH}/update_stage FETCH --complete
#!/bin/sh
set -o errexit
export HOME={{spool_dir}}
export PREV_SPOOL={{prev_spool_dir}}
SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin
${SBIN_PATH}/update_stage FETCH
cd {{spool_dir}}
$SBIN_PATH/framework.sh -d .
chmod 770 .
if [ ! -z "$PREV_SPOOL" ] && [ -d $PREV_SPOOL/rawdata/ ] && [ ! -z "$(ls -A $PREV_SPOOL/rawdata/)" ]; then
# Reuse the raw data from the previous version
mv $PREV_SPOOL/rawdata/ .
else
# Run the product fetcher
cd rawdata/
$SBIN_PATH/productfetcher --product-locator $1
cd ../
fi
${SBIN_PATH}/update_stage FETCH --complete
#!/bin/sh
set -o errexit
export HOME={{spool_dir}}
SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin
cd {{spool_dir}}
$SBIN_PATH/framework.sh -d .
chmod 770 .
cd rawdata/
$SBIN_PATH/productfetcher --product-locator $1 --product-locator $2
cd ../
#!/bin/sh
set -o errexit
export HOME={{spool_dir}}
export PREV_SPOOL={{prev_spool_dir}}
SBIN_PATH=/lustre/aoc/cluster/pipeline/$CAPO_PROFILE/workspaces/sbin
${SBIN_PATH}/update_stage FETCH
cd {{spool_dir}}
$SBIN_PATH/framework.sh -d .
chmod 770 .
if [ ! -z "$PREV_SPOOL" ] && [ -d $PREV_SPOOL/rawdata/ ] && [ ! -z "$(ls -A $PREV_SPOOL/rawdata/)" ]; then
# Reuse the raw data from the previous version
mv $PREV_SPOOL/rawdata/ .
else
# Run the product fetcher
cd rawdata/
$SBIN_PATH/productfetcher --product-locator $1
cd ../
fi
${SBIN_PATH}/update_stage FETCH --complete
......@@ -784,6 +784,10 @@ class RestrictedInfo(CapabilityInfo):
current_version_files = request.current_version.files
# Store the previous execution's spool directory for reusing bits of it
if request.current_version and request.current_version.workflow_metadata:
parameters["prev_spool_dir"] = request.current_version.workflow_metadata["processing_dir"]
version = CapabilityVersion(
capability_request_id=request.id,
version_number=len(request.versions) + 1,
......