Skip to content
Snippets Groups Projects
Commit 46e91452 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

workspaces directory annihilator

parent d1aeea4a
No related branches found
No related tags found
1 merge request!856workspaces directory annihilator
Pipeline #4663 passed
Pipeline: workspaces

#4664

    # Workspaces Annihilator
    A commandline utility that removes old unused directories from the Workspaces lustre areas.
    Annihilator requires the capo profile for the environment to be cleaned (dsoc-dev, dsoc-test, dsoc-prod)
    ## Usage
    ```shell
    usage: ws_annihilator [-h] -p PROFILE [-a] [-c]
    Workspaces Directory Annihilator
    optional arguments:
    -h, --help show this help message and exit
    -p PROFILE, --profile PROFILE
    Specify which environment to run on. Required. Example: dsoc-dev
    -a, --all Run annihilation on workspaces spool, staging, and storage areas.
    -c, --cache Run annihilation on workspaces cache area.
    ```
    from pathlib import Path
    from setuptools import find_packages, setup
    VERSION = open("ws_annihilator/_version.py").readlines()[-1].split()[-1].strip("\"'")
    README = Path("README.md").read_text()
    requires = [
    "argparse",
    "pycapo",
    "requests",
    ]
    setup(
    name="ssa-" + Path().absolute().name,
    version=VERSION,
    description="Workspaces Directory Annihilator; Clean up generated products from lustre!",
    long_description=README,
    author="NRAO SSA Team",
    author_email="dms-ssa@nrao.edu",
    url="TBD",
    license="GPL",
    install_requires=requires,
    keywords=[],
    packages=find_packages(),
    classifiers=["Programming Language :: Python :: 3.8"],
    entry_points={"console_scripts": ["ws_annihilator = ws_annihilator.annihilator:main"]},
    )
    ___version___ = "4.0.0a1.dev1"
    """
    Facility for removing old or QAFailed files and directories from the Workspaces lustre areas
    * Download area clean up is handled by the AAT/PPI mr_clean system
    """
    import argparse
    import logging
    import os
    import pathlib
    import shutil
    from enum import Enum
    import requests
    from argparse import ArgumentParser
    from typing import Dict, List
    import sys
    from pycapo import CapoConfig
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.INFO)
    logger.addHandler(logging.StreamHandler(sys.stdout))
    class Areas(Enum):
    CACHE = "cache"
    SPOOL = "spool"
    STAGING = "staging"
    STORAGE = "storage"
    def _get_settings(profile: str) -> Dict[str, str]:
    """
    Get annihilator capo settings
    :return:
    """
    config = CapoConfig(profile=profile)
    annihilator_settings = config.settings("edu.nrao.workspaces.AnnihilatorSettings")
    ingestion_settings = config.settings("edu.nrao.workspaces.IngestionSettings")
    workflow_url = config.settings("edu.nrao.workspaces.WorkflowSettings").serviceUrl
    cache_directory = config.settings("edu.nrao.workspaces.DeliverySettings").cacheWeblogDirectory
    spool_directory = config.settings("edu.nrao.workspaces.ProcessingSettings").rootDirectory
    staging_directory = ingestion_settings.stagingDirectory
    storage_directory = ingestion_settings.storageDirectory
    return {
    "workflow_url": workflow_url,
    "cache": cache_directory,
    "spool": spool_directory,
    "storage": storage_directory,
    "staging": staging_directory,
    "cache_days": annihilator_settings.keepCacheForDays,
    "spool_days": annihilator_settings.keepSpoolForDays,
    "staging_days": annihilator_settings.keepStagingForDays,
    "storage_days": annihilator_settings.keepStorageForDays,
    }
    def _arg_parser() -> ArgumentParser:
    """
    Create parser for annihilator
    :return:
    """
    arg_parser = ArgumentParser(
    description="Workspaces Directory Annihilator",
    formatter_class=argparse.RawTextHelpFormatter,
    )
    arg_parser.add_argument(
    "-p",
    "--profile",
    action="store",
    required=True,
    help="Specify which environment to run on. Required. Example: dsoc-dev",
    )
    arg_parser.add_argument(
    "-a",
    "--all",
    action="store_true",
    required=False,
    help="Run annihilation on workspaces spool, staging, and storage areas.",
    )
    arg_parser.add_argument(
    "-c",
    "--cache",
    action="store_true",
    required=False,
    help="Run annihilation on workspaces cache area.",
    )
    return arg_parser
    class Annihilator:
    """
    Class for removing workspaces directories that are no longer in use and past a given date
    - Retrieves a list of stale request directories to annihilate
    - Checks if directories still exist
    (DAs might manually clean up directories until scheduling system is available)
    - If directory exists in the workspaces' area for the specified profile, annihilates directory
    and sets 'cleaned' flag on the associated request
    - If directory doesn't exist, sets 'cleaned' flag on associated request
    """
    def __init__(self):
    self._args = _arg_parser().parse_args()
    self._settings = _get_settings(self._args.profile)
    def determine_path(self, area_to_clean: str) -> str:
    """
    return path to the location from capo settings
    :param area_to_clean: workspaces lustre area to clean, i.e. "spool"
    :return: path string
    """
    return self._settings.get(area_to_clean.lower())
    def get_days_to_keep(self, area_to_clean: str) -> str:
    """
    Return the number of days to keep directories in a specified area from capo settings
    :param area_to_clean: workspaces lustre area to clean, i.e. "spool"
    :return:
    """
    key = area_to_clean.lower() + "_days"
    return self._settings.get(key)
    def get_stale_directories(self, area_to_clean: str) -> List[str]:
    """
    It is possible for processing to take multiple days to several weeks. While a job is running in HTCondor, it's
    parent workspaces directory in the spool area won't update other than the HTCondor log file(s).
    We want to be sure that only stale directories are annihilated.
    It is also possible for the dev and test systems to pick up production directories due to database copying.
    For this reason we only want to use the name of the directory itself and strip off the rest of the path.
    :param area_to_clean: workspaces lustre area to clean, i.e. "spool"
    :return: A list of directory names to be removed
    """
    annihilation_url = (
    self._settings.get("workflow_url") + "/workflows/requests/stale/" + self.get_days_to_keep(area_to_clean)
    )
    response = requests.get(annihilation_url)
    dir_list = response.json()["stale_dirs"]
    name_list = []
    # catch empty string response
    if dir_list:
    for stale in dir_list.split(","):
    name_list.append(pathlib.Path(str(stale)).stem)
    return name_list
    def annihilate_directory(self, area: str, directory: str):
    """
    Annihilate specified directory
    :param area: area being cleaned
    :param directory: directory to remove
    :return:
    """
    shutil.rmtree(directory)
    # After annihilation set cleaned flag...
    if area == Areas.SPOOL.value:
    logger.info(f"Annihilation complete for {directory}, setting 'cleaned' flag on workflow request...")
    self.set_cleaned(directory)
    else:
    logger.info(f"Annihilation complete for {directory} in {area}.")
    def set_cleaned(self, directory: str):
    """
    Set the 'cleaned' flag on the request of the specified directory
    :param directory: the directory that was annihilated
    :return:
    """
    clean_url = self._settings.get("workflow_url") + "/workflows/requests/cleaned"
    json_payload = {"directory": directory, "update_to": "True"}
    return requests.put(clean_url, json=json_payload)
    def annihilate(self, area_to_clean: str):
    """
    Remove stale directories from targeted workspaces areas
    :return:
    """
    stale = self.get_stale_directories(area_to_clean)
    # If stale directories were found, annihilate them
    if stale:
    path = self.determine_path(area_to_clean)
    # change to area to clean
    os.chdir(path)
    for directory in stale:
    dir_path = path + "/" + directory
    if not pathlib.Path(dir_path).exists():
    if area_to_clean == Areas.SPOOL.value:
    logger.info(f"directory {dir_path} not found, setting 'cleaned' flag on workflow request...")
    self.set_cleaned(dir_path)
    else:
    logger.info(f"directory {dir_path} not found in {area_to_clean}, skipping...")
    else:
    logger.info(f"found directory {directory}, annihilating...")
    self.annihilate_directory(area_to_clean, directory)
    def run(self):
    if self._args.all:
    logger.info(f"Starting cleaning of Workspaces Spool for profile {self._args.profile}")
    self.annihilate(Areas.SPOOL.value)
    logger.info(f"Starting cleaning of Workspaces Staging for profile {self._args.profile}")
    self.annihilate(Areas.STAGING.value)
    logger.info(f"Starting cleaning of Workspaces Storage for profile {self._args.profile}")
    self.annihilate(Areas.STORAGE.value)
    logger.info(
    f"Finished cleaning Workspaces Spool, Staging, and Storage areas for profile {self._args.profile}!"
    )
    if self._args.cache:
    logger.info(f"Starting cleaning of Workspaces Cache for profile {self._args.profile}")
    self.annihilate(Areas.CACHE.value)
    logger.info(f"Finished cleaning Workspaces Cache for profile {self._args.profile}!")
    else:
    logger.info(f"Starting cleaning of Workspaces Spool for profile {self._args.profile}")
    self.annihilate(Areas.SPOOL.value)
    logger.info(f"Finished cleaning Workspaces Spool for profile {self._args.profile}!")
    def main():
    Annihilator().run()
    ...@@ -56,7 +56,6 @@ if sentry_key != "local": ...@@ -56,7 +56,6 @@ if sentry_key != "local":
    traces_sample_rate=1.0, traces_sample_rate=1.0,
    ) )
    logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
    ...@@ -498,11 +497,35 @@ class WorkflowRequestRestService: ...@@ -498,11 +497,35 @@ class WorkflowRequestRestService:
    Pyramid view that returns a list of workflow request processing directories that Pyramid view that returns a list of workflow request processing directories that
    have completed processing and are marked for annihilation have completed processing and are marked for annihilation
    :return: HTTP Response with list of directories to annihilate :return: HTTP Response with list of directories to annihilate
    """ """
    dir_list = []
    keep = self.request.matchdict["days"] keep = self.request.matchdict["days"]
    return self.request.info.lookup_stale_requests(int(keep)) req_list = self.request.info.lookup_stale_requests(int(keep))
    for r in req_list:
    dir_list.append(r.results_dir)
    # return list as comma seperated string
    return Response(json_body={"stale_dirs": str(",".join(dir_list))})
    @view_config(request_method="PUT", route_name="set_request_cleaned")
    def set_request_cleaned(self):
    """
    Pyramid view to set the workflow request cleaned flag upon directory annihilation
    :return: HTTP Response
    """
    directory = self.request.json_body["directory"]
    update_to = eval(self.request.json_body["update_to"])
    req = self.request.info.lookup_workflow_request_by_directory(directory)
    self.request.info.update_request_cleaned(req, update_to)
    return Response(
    status_code=http.HTTPStatus.OK,
    body=f"SUCCESS: Updated workflow request #{req.workflow_request_id} to {update_to}",
    )
    @view_defaults(route_name="workflow_request_files", renderer="json") @view_defaults(route_name="workflow_request_files", renderer="json")
    ...@@ -756,6 +779,11 @@ def main(global_config, **settings): ...@@ -756,6 +779,11 @@ def main(global_config, **settings):
    "/workflows/requests/stale/{days}", "/workflows/requests/stale/{days}",
    ) )
    config.add_route(
    "set_request_cleaned",
    "/workflows/requests/cleaned",
    )
    # Route for healthcheck when bringing up containers # Route for healthcheck when bringing up containers
    config.add_route( config.add_route(
    "get_healthcheck", "get_healthcheck",
    ......
    ...@@ -214,6 +214,9 @@ class WorkflowRequest(JSONSerializable): ...@@ -214,6 +214,9 @@ class WorkflowRequest(JSONSerializable):
    def update_status(self, status: str): def update_status(self, status: str):
    self.state = status self.state = status
    def update_cleaned(self, cleaned: bool):
    self.cleaned = cleaned
    # TODO: create following fields in table # TODO: create following fields in table
    def set_start_time(self, time: str): def set_start_time(self, time: str):
    self.start_time = time self.start_time = time
    ......
    ...@@ -54,10 +54,10 @@ class WorkflowInfo(WorkflowInfoIF): ...@@ -54,10 +54,10 @@ class WorkflowInfo(WorkflowInfoIF):
    """ """
    return self.session.query(WorkflowTemplate).filter_by(workflow_name=workflow_name).all() return self.session.query(WorkflowTemplate).filter_by(workflow_name=workflow_name).all()
    def lookup_stale_requests(self, keep_days: int) -> List[str]: def lookup_stale_requests(self, keep_days: int) -> List[WorkflowRequest]:
    """ """
    Queries the workflow_requests table for all requests that have completed, have not Queries the workflow_requests table for all requests that have not been
    been cleaned up, and are older that the number of days specified by keep_days cleaned up, and are older that the number of days specified by keep_days
    :param keep_days: the number of days to keep directories :param keep_days: the number of days to keep directories
    :return: a list of strings representing processing directories :return: a list of strings representing processing directories
    ...@@ -66,10 +66,20 @@ class WorkflowInfo(WorkflowInfoIF): ...@@ -66,10 +66,20 @@ class WorkflowInfo(WorkflowInfoIF):
    return ( return (
    self.session.query(WorkflowRequest) self.session.query(WorkflowRequest)
    .filter((WorkflowRequest.updated_at + datetime.timedelta(days=keep_days)) < datetime.datetime.now()) .filter((WorkflowRequest.updated_at + datetime.timedelta(days=keep_days)) < datetime.datetime.now())
    .filter_by(htcondor_job_id=None)
    .filter_by(cleaned=False) .filter_by(cleaned=False)
    .all() .all()
    ) )
    def lookup_workflow_request_by_directory(self, results_dir: str) -> WorkflowRequest:
    """
    Queries the workflow_requests table for the request with the matching results directory
    :param results_dir:
    :return: WorkflowRequest
    """
    return self.session.query(WorkflowRequest).filter_by(results_dir=results_dir).first()
    def all_workflows(self) -> List[Workflow]: def all_workflows(self) -> List[Workflow]:
    return self.session.query(Workflow).all() return self.session.query(Workflow).all()
    ...@@ -146,3 +156,14 @@ class WorkflowInfo(WorkflowInfoIF): ...@@ -146,3 +156,14 @@ class WorkflowInfo(WorkflowInfoIF):
    self.session.add(wrf) self.session.add(wrf)
    self.session.flush() self.session.flush()
    return wrf return wrf
    def update_request_cleaned(self, request: WorkflowRequest, update_flag: bool):
    """
    Update an existing workflow request when it's results directory is annihilated
    :param request: The workflow request to update
    :param update_flag: the boolean state to set the cleaned flag to
    :return:
    """
    request.update_cleaned(update_flag)
    self.save_request(request)
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment