diff --git a/apps/cli/executables/pexable/ws_annihilator/README.md b/apps/cli/executables/pexable/ws_annihilator/README.md new file mode 100644 index 0000000000000000000000000000000000000000..fc666fa01bc93c7ee26b418e1ddf278775b386c3 --- /dev/null +++ b/apps/cli/executables/pexable/ws_annihilator/README.md @@ -0,0 +1,20 @@ +# Workspaces Annihilator + +A commandline utility that removes old unused directories from the Workspaces lustre areas. + +Annihilator requires the capo profile for the environment to be cleaned (dsoc-dev, dsoc-test, dsoc-prod) + + +## Usage +```shell +usage: ws_annihilator [-h] -p PROFILE [-a] [-c] + +Workspaces Directory Annihilator + +optional arguments: +-h, --help show this help message and exit +-p PROFILE, --profile PROFILE +Specify which environment to run on. Required. Example: dsoc-dev +-a, --all Run annihilation on workspaces spool, staging, and storage areas. +-c, --cache Run annihilation on workspaces cache area. +``` diff --git a/apps/cli/executables/pexable/ws_annihilator/setup.py b/apps/cli/executables/pexable/ws_annihilator/setup.py new file mode 100644 index 0000000000000000000000000000000000000000..df8aac398cd5dbd0312df7e04d232ad9b0b1c426 --- /dev/null +++ b/apps/cli/executables/pexable/ws_annihilator/setup.py @@ -0,0 +1,28 @@ +from pathlib import Path + +from setuptools import find_packages, setup + +VERSION = open("ws_annihilator/_version.py").readlines()[-1].split()[-1].strip("\"'") +README = Path("README.md").read_text() + +requires = [ + "argparse", + "pycapo", + "requests", +] + +setup( + name="ssa-" + Path().absolute().name, + version=VERSION, + description="Workspaces Directory Annihilator; Clean up generated products from lustre!", + long_description=README, + author="NRAO SSA Team", + author_email="dms-ssa@nrao.edu", + url="TBD", + license="GPL", + install_requires=requires, + keywords=[], + packages=find_packages(), + classifiers=["Programming Language :: Python :: 3.8"], + entry_points={"console_scripts": ["ws_annihilator = ws_annihilator.annihilator:main"]}, +) diff --git a/apps/cli/executables/pexable/ws_annihilator/ws_annihilator/__init__.py b/apps/cli/executables/pexable/ws_annihilator/ws_annihilator/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/apps/cli/executables/pexable/ws_annihilator/ws_annihilator/_version.py b/apps/cli/executables/pexable/ws_annihilator/ws_annihilator/_version.py new file mode 100644 index 0000000000000000000000000000000000000000..427549ea2fb1722c4a0b559aead85a25493b41f6 --- /dev/null +++ b/apps/cli/executables/pexable/ws_annihilator/ws_annihilator/_version.py @@ -0,0 +1 @@ +___version___ = "4.0.0a1.dev1" diff --git a/apps/cli/executables/pexable/ws_annihilator/ws_annihilator/annihilator.py b/apps/cli/executables/pexable/ws_annihilator/ws_annihilator/annihilator.py new file mode 100644 index 0000000000000000000000000000000000000000..06f9fc23d3d75c365324cbc0362cd1c34cd2ebfd --- /dev/null +++ b/apps/cli/executables/pexable/ws_annihilator/ws_annihilator/annihilator.py @@ -0,0 +1,238 @@ +""" +Facility for removing old or QAFailed files and directories from the Workspaces lustre areas +* Download area clean up is handled by the AAT/PPI mr_clean system +""" +import argparse +import logging +import os +import pathlib +import shutil +from enum import Enum + +import requests +from argparse import ArgumentParser +from typing import Dict, List + +import sys +from pycapo import CapoConfig + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) +logger.addHandler(logging.StreamHandler(sys.stdout)) + + +class Areas(Enum): + CACHE = "cache" + SPOOL = "spool" + STAGING = "staging" + STORAGE = "storage" + + +def _get_settings(profile: str) -> Dict[str, str]: + """ + Get annihilator capo settings + + :return: + """ + config = CapoConfig(profile=profile) + + annihilator_settings = config.settings("edu.nrao.workspaces.AnnihilatorSettings") + ingestion_settings = config.settings("edu.nrao.workspaces.IngestionSettings") + + workflow_url = config.settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl + cache_directory = config.settings("edu.nrao.workspaces.DeliverySettings").cacheWeblogDirectory + spool_directory = config.settings("edu.nrao.workspaces.ProcessingSettings").rootDirectory + staging_directory = ingestion_settings.stagingDirectory + storage_directory = ingestion_settings.storageDirectory + + return { + "workflow_url": workflow_url, + "cache": cache_directory, + "spool": spool_directory, + "storage": storage_directory, + "staging": staging_directory, + "cache_days": annihilator_settings.keepCacheForDays, + "spool_days": annihilator_settings.keepSpoolForDays, + "staging_days": annihilator_settings.keepStagingForDays, + "storage_days": annihilator_settings.keepStorageForDays, + } + + +def _arg_parser() -> ArgumentParser: + """ + Create parser for annihilator + + :return: + """ + arg_parser = ArgumentParser( + description="Workspaces Directory Annihilator", + formatter_class=argparse.RawTextHelpFormatter, + ) + arg_parser.add_argument( + "-p", + "--profile", + action="store", + required=True, + help="Specify which environment to run on. Required. Example: dsoc-dev", + ) + arg_parser.add_argument( + "-a", + "--all", + action="store_true", + required=False, + help="Run annihilation on workspaces spool, staging, and storage areas.", + ) + arg_parser.add_argument( + "-c", + "--cache", + action="store_true", + required=False, + help="Run annihilation on workspaces cache area.", + ) + return arg_parser + + +class Annihilator: + """ + Class for removing workspaces directories that are no longer in use and past a given date + + - Retrieves a list of stale request directories to annihilate + - Checks if directories still exist + (DAs might manually clean up directories until scheduling system is available) + - If directory exists in the workspaces' area for the specified profile, annihilates directory + and sets 'cleaned' flag on the associated request + - If directory doesn't exist, sets 'cleaned' flag on associated request + """ + + def __init__(self): + self._args = _arg_parser().parse_args() + self._settings = _get_settings(self._args.profile) + + def determine_path(self, area_to_clean: str) -> str: + """ + return path to the location from capo settings + + :param area_to_clean: workspaces lustre area to clean, i.e. "spool" + :return: path string + """ + return self._settings.get(area_to_clean.lower()) + + def get_days_to_keep(self, area_to_clean: str) -> str: + """ + Return the number of days to keep directories in a specified area from capo settings + + :param area_to_clean: workspaces lustre area to clean, i.e. "spool" + :return: + """ + key = area_to_clean.lower() + "_days" + return self._settings.get(key) + + def get_stale_directories(self, area_to_clean: str) -> List[str]: + """ + It is possible for processing to take multiple days to several weeks. While a job is running in HTCondor, it's + parent workspaces directory in the spool area won't update other than the HTCondor log file(s). + We want to be sure that only stale directories are annihilated. + + It is also possible for the dev and test systems to pick up production directories due to database copying. + For this reason we only want to use the name of the directory itself and strip off the rest of the path. + + :param area_to_clean: workspaces lustre area to clean, i.e. "spool" + :return: A list of directory names to be removed + """ + + annihilation_url = ( + self._settings.get("workflow_url") + "/workflows/requests/stale/" + self.get_days_to_keep(area_to_clean) + ) + response = requests.get(annihilation_url) + dir_list = response.json()["stale_dirs"] + + name_list = [] + # catch empty string response + if dir_list: + for stale in dir_list.split(","): + name_list.append(pathlib.Path(str(stale)).stem) + + return name_list + + def annihilate_directory(self, area: str, directory: str): + """ + Annihilate specified directory + + :param area: area being cleaned + :param directory: directory to remove + :return: + """ + + shutil.rmtree(directory) + + # After annihilation set cleaned flag... + if area == Areas.SPOOL.value: + logger.info(f"Annihilation complete for {directory}, setting 'cleaned' flag on workflow request...") + self.set_cleaned(directory) + else: + logger.info(f"Annihilation complete for {directory} in {area}.") + + def set_cleaned(self, directory: str): + """ + Set the 'cleaned' flag on the request of the specified directory + + :param directory: the directory that was annihilated + :return: + """ + clean_url = self._settings.get("workflow_url") + "/workflows/requests/cleaned" + json_payload = {"directory": directory, "update_to": "True"} + + return requests.put(clean_url, json=json_payload) + + def annihilate(self, area_to_clean: str): + """ + Remove stale directories from targeted workspaces areas + + :return: + """ + stale = self.get_stale_directories(area_to_clean) + + # If stale directories were found, annihilate them + if stale: + path = self.determine_path(area_to_clean) + + # change to area to clean + os.chdir(path) + + for directory in stale: + dir_path = path + "/" + directory + if not pathlib.Path(dir_path).exists(): + if area_to_clean == Areas.SPOOL.value: + logger.info(f"directory {dir_path} not found, setting 'cleaned' flag on workflow request...") + self.set_cleaned(dir_path) + else: + logger.info(f"directory {dir_path} not found in {area_to_clean}, skipping...") + else: + logger.info(f"found directory {directory}, annihilating...") + self.annihilate_directory(area_to_clean, directory) + + def run(self): + if self._args.all: + logger.info(f"Starting cleaning of Workspaces Spool for profile {self._args.profile}") + self.annihilate(Areas.SPOOL.value) + + logger.info(f"Starting cleaning of Workspaces Staging for profile {self._args.profile}") + self.annihilate(Areas.STAGING.value) + + logger.info(f"Starting cleaning of Workspaces Storage for profile {self._args.profile}") + self.annihilate(Areas.STORAGE.value) + logger.info( + f"Finished cleaning Workspaces Spool, Staging, and Storage areas for profile {self._args.profile}!" + ) + if self._args.cache: + logger.info(f"Starting cleaning of Workspaces Cache for profile {self._args.profile}") + self.annihilate(Areas.CACHE.value) + logger.info(f"Finished cleaning Workspaces Cache for profile {self._args.profile}!") + else: + logger.info(f"Starting cleaning of Workspaces Spool for profile {self._args.profile}") + self.annihilate(Areas.SPOOL.value) + logger.info(f"Finished cleaning Workspaces Spool for profile {self._args.profile}!") + + +def main(): + Annihilator().run() diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index 70ff1af823a91d2294973696759a0870a0c518dd..898418ae6f47849350021da916a0956d0c2c4d88 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -56,7 +56,6 @@ if sentry_key != "local": traces_sample_rate=1.0, ) - logger = logging.getLogger(__name__) @@ -498,11 +497,35 @@ class WorkflowRequestRestService: Pyramid view that returns a list of workflow request processing directories that have completed processing and are marked for annihilation - :return: HTTP Response with list of directories to annihilate + :return: HTTP Response with list of directories to annihilate """ + dir_list = [] keep = self.request.matchdict["days"] - return self.request.info.lookup_stale_requests(int(keep)) + req_list = self.request.info.lookup_stale_requests(int(keep)) + + for r in req_list: + dir_list.append(r.results_dir) + + # return list as comma seperated string + return Response(json_body={"stale_dirs": str(",".join(dir_list))}) + + @view_config(request_method="PUT", route_name="set_request_cleaned") + def set_request_cleaned(self): + """ + Pyramid view to set the workflow request cleaned flag upon directory annihilation + + :return: HTTP Response + """ + + directory = self.request.json_body["directory"] + update_to = eval(self.request.json_body["update_to"]) + req = self.request.info.lookup_workflow_request_by_directory(directory) + self.request.info.update_request_cleaned(req, update_to) + return Response( + status_code=http.HTTPStatus.OK, + body=f"SUCCESS: Updated workflow request #{req.workflow_request_id} to {update_to}", + ) @view_defaults(route_name="workflow_request_files", renderer="json") @@ -756,6 +779,11 @@ def main(global_config, **settings): "/workflows/requests/stale/{days}", ) + config.add_route( + "set_request_cleaned", + "/workflows/requests/cleaned", + ) + # Route for healthcheck when bringing up containers config.add_route( "get_healthcheck", diff --git a/shared/workspaces/workspaces/workflow/schema.py b/shared/workspaces/workspaces/workflow/schema.py index df8dc65fe90c05b106243449960a6d2623c8426a..194730098ec5094b3fdcc9281f709bd1c97d0bf0 100644 --- a/shared/workspaces/workspaces/workflow/schema.py +++ b/shared/workspaces/workspaces/workflow/schema.py @@ -214,6 +214,9 @@ class WorkflowRequest(JSONSerializable): def update_status(self, status: str): self.state = status + def update_cleaned(self, cleaned: bool): + self.cleaned = cleaned + # TODO: create following fields in table def set_start_time(self, time: str): self.start_time = time diff --git a/shared/workspaces/workspaces/workflow/services/workflow_info.py b/shared/workspaces/workspaces/workflow/services/workflow_info.py index 71875fe6828a8489876e67c40b2aaf6acc6d5335..c1077f99755060fffa23481dfa14a10ca808886a 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_info.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_info.py @@ -54,10 +54,10 @@ class WorkflowInfo(WorkflowInfoIF): """ return self.session.query(WorkflowTemplate).filter_by(workflow_name=workflow_name).all() - def lookup_stale_requests(self, keep_days: int) -> List[str]: + def lookup_stale_requests(self, keep_days: int) -> List[WorkflowRequest]: """ - Queries the workflow_requests table for all requests that have completed, have not - been cleaned up, and are older that the number of days specified by keep_days + Queries the workflow_requests table for all requests that have not been + cleaned up, and are older that the number of days specified by keep_days :param keep_days: the number of days to keep directories :return: a list of strings representing processing directories @@ -66,10 +66,20 @@ class WorkflowInfo(WorkflowInfoIF): return ( self.session.query(WorkflowRequest) .filter((WorkflowRequest.updated_at + datetime.timedelta(days=keep_days)) < datetime.datetime.now()) + .filter_by(htcondor_job_id=None) .filter_by(cleaned=False) .all() ) + def lookup_workflow_request_by_directory(self, results_dir: str) -> WorkflowRequest: + """ + Queries the workflow_requests table for the request with the matching results directory + + :param results_dir: + :return: WorkflowRequest + """ + return self.session.query(WorkflowRequest).filter_by(results_dir=results_dir).first() + def all_workflows(self) -> List[Workflow]: return self.session.query(Workflow).all() @@ -146,3 +156,14 @@ class WorkflowInfo(WorkflowInfoIF): self.session.add(wrf) self.session.flush() return wrf + + def update_request_cleaned(self, request: WorkflowRequest, update_flag: bool): + """ + Update an existing workflow request when it's results directory is annihilated + + :param request: The workflow request to update + :param update_flag: the boolean state to set the cleaned flag to + :return: + """ + request.update_cleaned(update_flag) + self.save_request(request)