From b143d6ed91e62eba412aba1d89a37970fffe10f4 Mon Sep 17 00:00:00 2001 From: Sam Kagan <skagan@nrao.edu> Date: Fri, 10 May 2024 16:59:30 -0600 Subject: [PATCH] Drafted change to use piperestorescript if it exists --- .../casa_envoy/casa_envoy/configure.py | 61 ++++++++----------- .../casa_envoy/casa_envoy/foundation.py | 9 +-- .../casa_envoy/casa_envoy/interfaces.py | 27 +++++++- .../casa_envoy/casa_envoy/launchers.py | 27 ++++++-- 4 files changed, 77 insertions(+), 47 deletions(-) diff --git a/apps/cli/executables/pexable/casa_envoy/casa_envoy/configure.py b/apps/cli/executables/pexable/casa_envoy/casa_envoy/configure.py index 2751fcae7..cec881720 100644 --- a/apps/cli/executables/pexable/casa_envoy/casa_envoy/configure.py +++ b/apps/cli/executables/pexable/casa_envoy/casa_envoy/configure.py @@ -17,7 +17,6 @@ # along with Workspaces. If not, see <https://www.gnu.org/licenses/>. import logging import os -import subprocess import sys from casa_envoy.interfaces import RunnerIF @@ -87,32 +86,30 @@ class StandardRunner(RunnerIF): Class for launch of standard (non-parallel) CASA """ - def __init__(self, parameters: dict): - self.logger = logging.getLogger("casa_envoy") - self.parameters = parameters - - def call_casa(self): - """ - Launch subprocess for standard CASA - - :return: - """ - build_command = ( + @property + def build_command(self): + return ( "PYTHONPATH='' xvfb-run -e ${PWD}/xvfb-run.err.txt -d -s \"-screen 0 800x600x16\" " "${CASA_HOME}/bin/casa --pipeline --nogui --nologger -c " "${CASA_HOME}/pipeline/pipeline/runvlapipeline.py ${PPR_FILENAME} || true" ) - self.logger.info(f"RUNNING CASA with command '{build_command}'") - os.chdir("./working") - result = subprocess.Popen( - build_command, - shell=True, - executable="/bin/bash", - stdout=sys.stdout, - stderr=sys.stderr, + +class PiperestorescriptRunner(RunnerIF): + """ + Class for launch of non-parallel CASA, driven by a piperestorescript.py + Only intended for CMS restores + """ + + @property + def build_command(self): + if not self.parameters.get("piperestorescript_path"): + raise ValueError(f"Exected path to a piperestorescript.py to be specified, found none") + return ( + "PYTHONPATH='' xvfb-run -e ${PWD}/xvfb-run.err.txt -d -s \"-screen 0 800x600x16\" " + "${CASA_HOME}/bin/casa --pipeline --nogui --nologger -c " + f"{self.parameters['piperestorescript_path']} || true" ) - return result.communicate() class ParallelRunner(RunnerIF): @@ -120,31 +117,21 @@ class ParallelRunner(RunnerIF): Class for launch of parallel CASA processing using mpicasa """ - def __init__(self, parameters: dict): - self.logger = logging.getLogger("casa_envoy") - self.parameters = parameters + @property + def log_message(self): + return f"RUNNING PARALLEL CASA with command '{self.build_command}'" - def call_casa(self): + @property + def build_command(self): # CASA wants a supervisor process, in addition to the workers. # So increment the parallelization request by one to account for that integer_processes = int(self.parameters["requested_parallel"]) + 1 # The --oversubscribe argument to mpicasa lets the N+1 processes share N CPUs happily - build_command = ( + return ( "PYTHONPATH='' xvfb-run -e ${PWD}/xvfb-run.err.txt -d -s \"-screen 0 800x600x16\"" " ${CASA_HOME}/bin/mpicasa --oversubscribe -n " + str(integer_processes) + " ${CASA_HOME}/bin/casa --pipeline --nogui --nologger -c " "${CASA_HOME}/pipeline/pipeline/runvlapipeline.py ${PPR_FILENAME} || true" ) - - self.logger.info(f"RUNNING PARALLEL CASA with command '{build_command}'") - os.chdir("./working") - result = subprocess.Popen( - build_command, - shell=True, - executable="/bin/bash", - stdout=sys.stdout, - stderr=sys.stderr, - ) - return result.communicate() diff --git a/apps/cli/executables/pexable/casa_envoy/casa_envoy/foundation.py b/apps/cli/executables/pexable/casa_envoy/casa_envoy/foundation.py index 446295438..f5fa68864 100644 --- a/apps/cli/executables/pexable/casa_envoy/casa_envoy/foundation.py +++ b/apps/cli/executables/pexable/casa_envoy/casa_envoy/foundation.py @@ -90,7 +90,7 @@ class RestoreFoundation(FoundationIF): self.parameters = parameters self.parent_path = parameters["parent_path"] self.content = json.loads(metadata.content) - self._path_to_piperestorescript: Optional[Path] = None + self._piperestorescript_path: Optional[Path] = None def data_foundation(self): """ @@ -114,13 +114,14 @@ class RestoreFoundation(FoundationIF): if (path / CASAPIPERESTORESCRIPT).is_file(): dest = Path(f"working/{CASAPIPERESTORESCRIPT}") shutil.copyfile(path / CASAPIPERESTORESCRIPT, dest) - self._path_to_piperestorescript = dest + self._piperestorescript_path = dest.resolve() self.logger.info(f"Found {CASAPIPERESTORESCRIPT}, copied it to {dest}") self.logger.info("Restore data foundation complete!") @property - def path_to_piperestorescript(self) -> Optional[Path]: - return self._path_to_piperestorescript + def piperestorescript_path(self) -> Optional[Path]: + """Absolute path to the casapiperestorescript.py for this restore, or None if one doesn't exist""" + return self._piperestorescript_path def extract_cal(self): self.logger.info("Extracting calibration tar file to products directory...") diff --git a/apps/cli/executables/pexable/casa_envoy/casa_envoy/interfaces.py b/apps/cli/executables/pexable/casa_envoy/casa_envoy/interfaces.py index 4e8c62076..ac48c8d8a 100644 --- a/apps/cli/executables/pexable/casa_envoy/casa_envoy/interfaces.py +++ b/apps/cli/executables/pexable/casa_envoy/casa_envoy/interfaces.py @@ -19,6 +19,10 @@ Interfaces for casa_envoy """ import abc +import logging +import os +import subprocess +import sys from abc import ABC @@ -38,10 +42,31 @@ class LauncherIF(ABC): class RunnerIF(ABC): + def __init__(self, parameters: dict): + self.logger = logging.getLogger("casa_envoy") + self.parameters = parameters + + @property @abc.abstractmethod - def call_casa(self): + def build_command(self) -> str: pass + @property + def log_message(self): + return f"RUNNING CASA with command '{self.build_command}'" + + def call_casa(self) -> tuple[bytes, bytes]: + self.logger.info(self.log_message) + os.chdir("./working") + result = subprocess.Popen( + self.build_command, + shell=True, + executable="/bin/bash", + stdout=sys.stdout, + stderr=sys.stderr, + ) + return result.communicate() + class AuditorIF(ABC): """ diff --git a/apps/cli/executables/pexable/casa_envoy/casa_envoy/launchers.py b/apps/cli/executables/pexable/casa_envoy/casa_envoy/launchers.py index 9597796e7..a8de6db33 100644 --- a/apps/cli/executables/pexable/casa_envoy/casa_envoy/launchers.py +++ b/apps/cli/executables/pexable/casa_envoy/casa_envoy/launchers.py @@ -26,10 +26,15 @@ import sys from typing import Dict, Union from casa_envoy.auditor import AuditDirectories, AuditFiles -from casa_envoy.configure import CasaConfigure, ParallelRunner, StandardRunner +from casa_envoy.configure import ( + CasaConfigure, + ParallelRunner, + PiperestorescriptRunner, + StandardRunner, +) from casa_envoy.enums import ProductType from casa_envoy.foundation import CubeFoundation, GeneralFoundation, RestoreFoundation -from casa_envoy.interfaces import LauncherIF +from casa_envoy.interfaces import LauncherIF, RunnerIF from casa_envoy.schema import AbstractTextFile @@ -56,7 +61,7 @@ def run_audit(logger: logging.Logger, ppr: AbstractTextFile, metadata: AbstractT sys.exit(1) -def determine_runner_type(parameters: dict) -> Union[StandardRunner, ParallelRunner]: +def determine_runner_type(parameters: dict) -> RunnerIF: """ determine the type of CASA runner needed: Standard or Parallel @@ -65,12 +70,18 @@ def determine_runner_type(parameters: dict) -> Union[StandardRunner, ParallelRun """ if "requested_parallel" in parameters: return ParallelRunner(parameters) + elif "piperestorescript_path" in parameters: + # This function is currently called when the Launcher is initialized, + # i.e. before the RestoreFoundation would know if there's a piperestorescript or not, + # so its results will be overriden if the RestoreFoundation finds a piperestorescript. + # This case is here only in case that order of operations changes. + return PiperestorescriptRunner(parameters) else: return StandardRunner(parameters) class CasaLauncher: - def __init__(self, runner: Union[StandardRunner, ParallelRunner]): + def __init__(self, runner: RunnerIF): self.logger = logging.getLogger("casa_envoy") self.runner = runner self.parameters = runner.parameters @@ -154,7 +165,13 @@ class CalibrationLauncher(LauncherIF): # Ensure all data is in the required locations for CASA processing (This is not always rawdata!) GeneralFoundation(self.parameters, self.metadata).data_foundation() if self.parameters["product_type"] == ProductType.RESTORE.value: - RestoreFoundation(self.parameters, self.metadata).data_foundation() + restore_foundation = RestoreFoundation(self.parameters, self.metadata) + restore_foundation.data_foundation() + if restore_foundation.piperestorescript_path: + # Need to make this available to the Runner if it exists + self.parameters["piperestorescript_path"] = str(restore_foundation.piperestorescript_path) + # Since determine_runner_type() is called before this, need to override it if a piperestorescript is found + self.runner = PiperestorescriptRunner(self.parameters) return def check_calibratable(self) -> bool: -- GitLab