diff --git a/apps/cli/executables/pexable/ws_annihilator/ws_annihilator/annihilator.py b/apps/cli/executables/pexable/ws_annihilator/ws_annihilator/annihilator.py index 06f9fc23d3d75c365324cbc0362cd1c34cd2ebfd..f794ad81ec8b7349b7ef5ffa5e169045beda2b42 100644 --- a/apps/cli/executables/pexable/ws_annihilator/ws_annihilator/annihilator.py +++ b/apps/cli/executables/pexable/ws_annihilator/ws_annihilator/annihilator.py @@ -117,6 +117,22 @@ class Annihilator: """ return self._settings.get(area_to_clean.lower()) + @staticmethod + def path_valid(constructed_path: str, base_path: str) -> bool: + """ + Check if the constructed path is valid for annihilation + + :param constructed_path: path to verify + :param base_path: base path which directory should be annihilated from + :return: True if path is valid for annihilation + """ + + return ( + constructed_path != base_path + and constructed_path != base_path + "/" + and len(constructed_path) > len(base_path) + ) + def get_days_to_keep(self, area_to_clean: str) -> str: """ Return the number of days to keep directories in a specified area from capo settings @@ -150,7 +166,9 @@ class Annihilator: # catch empty string response if dir_list: for stale in dir_list.split(","): - name_list.append(pathlib.Path(str(stale)).stem) + # catch empty string directory or whitespace only directory + if stale and not stale.isspace(): + name_list.append(pathlib.Path(str(stale)).stem) return name_list @@ -168,7 +186,9 @@ class Annihilator: # After annihilation set cleaned flag... if area == Areas.SPOOL.value: logger.info(f"Annihilation complete for {directory}, setting 'cleaned' flag on workflow request...") - self.set_cleaned(directory) + path = self.determine_path(area) + clean_me = path + "/" + directory + self.set_cleaned(clean_me) else: logger.info(f"Annihilation complete for {directory} in {area}.") @@ -201,15 +221,21 @@ class Annihilator: for directory in stale: dir_path = path + "/" + directory - if not pathlib.Path(dir_path).exists(): - if area_to_clean == Areas.SPOOL.value: - logger.info(f"directory {dir_path} not found, setting 'cleaned' flag on workflow request...") - self.set_cleaned(dir_path) + if self.path_valid(dir_path, path): + if not pathlib.Path(dir_path).exists(): + if area_to_clean == Areas.SPOOL.value: + logger.info( + f"directory {dir_path} not found, setting 'cleaned' flag on workflow request..." + ) + self.set_cleaned(dir_path) + else: + logger.info(f"directory {dir_path} not found in {area_to_clean}, skipping...") else: - logger.info(f"directory {dir_path} not found in {area_to_clean}, skipping...") + logger.info(f"found directory {directory}, annihilating...") + self.annihilate_directory(area_to_clean, directory) else: - logger.info(f"found directory {directory}, annihilating...") - self.annihilate_directory(area_to_clean, directory) + logger.error(f"Error: Received invalid directory '{dir_path}' as input for annihilation. Aborting.") + exit(1) def run(self): if self._args.all: