Skip to content
Snippets Groups Projects
Commit b143d6ed authored by Sam Kagan's avatar Sam Kagan
Browse files

Drafted change to use piperestorescript if it exists

parent 9b387513
No related branches found
No related tags found
2 merge requests!1706merge 2.8.4 to main,!1657Got EVLA CMS restores working via casa_envoy, using casa_restorepipescript.py when it exists
......@@ -17,7 +17,6 @@
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
import logging
import os
import subprocess
import sys
from casa_envoy.interfaces import RunnerIF
......@@ -87,32 +86,30 @@ class StandardRunner(RunnerIF):
Class for launch of standard (non-parallel) CASA
"""
def __init__(self, parameters: dict):
self.logger = logging.getLogger("casa_envoy")
self.parameters = parameters
def call_casa(self):
"""
Launch subprocess for standard CASA
:return:
"""
build_command = (
@property
def build_command(self):
return (
"PYTHONPATH='' xvfb-run -e ${PWD}/xvfb-run.err.txt -d -s \"-screen 0 800x600x16\" "
"${CASA_HOME}/bin/casa --pipeline --nogui --nologger -c "
"${CASA_HOME}/pipeline/pipeline/runvlapipeline.py ${PPR_FILENAME} || true"
)
self.logger.info(f"RUNNING CASA with command '{build_command}'")
os.chdir("./working")
result = subprocess.Popen(
build_command,
shell=True,
executable="/bin/bash",
stdout=sys.stdout,
stderr=sys.stderr,
class PiperestorescriptRunner(RunnerIF):
"""
Class for launch of non-parallel CASA, driven by a piperestorescript.py
Only intended for CMS restores
"""
@property
def build_command(self):
if not self.parameters.get("piperestorescript_path"):
raise ValueError(f"Exected path to a piperestorescript.py to be specified, found none")
return (
"PYTHONPATH='' xvfb-run -e ${PWD}/xvfb-run.err.txt -d -s \"-screen 0 800x600x16\" "
"${CASA_HOME}/bin/casa --pipeline --nogui --nologger -c "
f"{self.parameters['piperestorescript_path']} || true"
)
return result.communicate()
class ParallelRunner(RunnerIF):
......@@ -120,31 +117,21 @@ class ParallelRunner(RunnerIF):
Class for launch of parallel CASA processing using mpicasa
"""
def __init__(self, parameters: dict):
self.logger = logging.getLogger("casa_envoy")
self.parameters = parameters
@property
def log_message(self):
return f"RUNNING PARALLEL CASA with command '{self.build_command}'"
def call_casa(self):
@property
def build_command(self):
# CASA wants a supervisor process, in addition to the workers.
# So increment the parallelization request by one to account for that
integer_processes = int(self.parameters["requested_parallel"]) + 1
# The --oversubscribe argument to mpicasa lets the N+1 processes share N CPUs happily
build_command = (
return (
"PYTHONPATH='' xvfb-run -e ${PWD}/xvfb-run.err.txt -d -s \"-screen 0 800x600x16\""
" ${CASA_HOME}/bin/mpicasa --oversubscribe -n "
+ str(integer_processes)
+ " ${CASA_HOME}/bin/casa --pipeline --nogui --nologger -c "
"${CASA_HOME}/pipeline/pipeline/runvlapipeline.py ${PPR_FILENAME} || true"
)
self.logger.info(f"RUNNING PARALLEL CASA with command '{build_command}'")
os.chdir("./working")
result = subprocess.Popen(
build_command,
shell=True,
executable="/bin/bash",
stdout=sys.stdout,
stderr=sys.stderr,
)
return result.communicate()
......@@ -90,7 +90,7 @@ class RestoreFoundation(FoundationIF):
self.parameters = parameters
self.parent_path = parameters["parent_path"]
self.content = json.loads(metadata.content)
self._path_to_piperestorescript: Optional[Path] = None
self._piperestorescript_path: Optional[Path] = None
def data_foundation(self):
"""
......@@ -114,13 +114,14 @@ class RestoreFoundation(FoundationIF):
if (path / CASAPIPERESTORESCRIPT).is_file():
dest = Path(f"working/{CASAPIPERESTORESCRIPT}")
shutil.copyfile(path / CASAPIPERESTORESCRIPT, dest)
self._path_to_piperestorescript = dest
self._piperestorescript_path = dest.resolve()
self.logger.info(f"Found {CASAPIPERESTORESCRIPT}, copied it to {dest}")
self.logger.info("Restore data foundation complete!")
@property
def path_to_piperestorescript(self) -> Optional[Path]:
return self._path_to_piperestorescript
def piperestorescript_path(self) -> Optional[Path]:
"""Absolute path to the casapiperestorescript.py for this restore, or None if one doesn't exist"""
return self._piperestorescript_path
def extract_cal(self):
self.logger.info("Extracting calibration tar file to products directory...")
......
......@@ -19,6 +19,10 @@
Interfaces for casa_envoy
"""
import abc
import logging
import os
import subprocess
import sys
from abc import ABC
......@@ -38,10 +42,31 @@ class LauncherIF(ABC):
class RunnerIF(ABC):
def __init__(self, parameters: dict):
self.logger = logging.getLogger("casa_envoy")
self.parameters = parameters
@property
@abc.abstractmethod
def call_casa(self):
def build_command(self) -> str:
pass
@property
def log_message(self):
return f"RUNNING CASA with command '{self.build_command}'"
def call_casa(self) -> tuple[bytes, bytes]:
self.logger.info(self.log_message)
os.chdir("./working")
result = subprocess.Popen(
self.build_command,
shell=True,
executable="/bin/bash",
stdout=sys.stdout,
stderr=sys.stderr,
)
return result.communicate()
class AuditorIF(ABC):
"""
......
......@@ -26,10 +26,15 @@ import sys
from typing import Dict, Union
from casa_envoy.auditor import AuditDirectories, AuditFiles
from casa_envoy.configure import CasaConfigure, ParallelRunner, StandardRunner
from casa_envoy.configure import (
CasaConfigure,
ParallelRunner,
PiperestorescriptRunner,
StandardRunner,
)
from casa_envoy.enums import ProductType
from casa_envoy.foundation import CubeFoundation, GeneralFoundation, RestoreFoundation
from casa_envoy.interfaces import LauncherIF
from casa_envoy.interfaces import LauncherIF, RunnerIF
from casa_envoy.schema import AbstractTextFile
......@@ -56,7 +61,7 @@ def run_audit(logger: logging.Logger, ppr: AbstractTextFile, metadata: AbstractT
sys.exit(1)
def determine_runner_type(parameters: dict) -> Union[StandardRunner, ParallelRunner]:
def determine_runner_type(parameters: dict) -> RunnerIF:
"""
determine the type of CASA runner needed: Standard or Parallel
......@@ -65,12 +70,18 @@ def determine_runner_type(parameters: dict) -> Union[StandardRunner, ParallelRun
"""
if "requested_parallel" in parameters:
return ParallelRunner(parameters)
elif "piperestorescript_path" in parameters:
# This function is currently called when the Launcher is initialized,
# i.e. before the RestoreFoundation would know if there's a piperestorescript or not,
# so its results will be overriden if the RestoreFoundation finds a piperestorescript.
# This case is here only in case that order of operations changes.
return PiperestorescriptRunner(parameters)
else:
return StandardRunner(parameters)
class CasaLauncher:
def __init__(self, runner: Union[StandardRunner, ParallelRunner]):
def __init__(self, runner: RunnerIF):
self.logger = logging.getLogger("casa_envoy")
self.runner = runner
self.parameters = runner.parameters
......@@ -154,7 +165,13 @@ class CalibrationLauncher(LauncherIF):
# Ensure all data is in the required locations for CASA processing (This is not always rawdata!)
GeneralFoundation(self.parameters, self.metadata).data_foundation()
if self.parameters["product_type"] == ProductType.RESTORE.value:
RestoreFoundation(self.parameters, self.metadata).data_foundation()
restore_foundation = RestoreFoundation(self.parameters, self.metadata)
restore_foundation.data_foundation()
if restore_foundation.piperestorescript_path:
# Need to make this available to the Runner if it exists
self.parameters["piperestorescript_path"] = str(restore_foundation.piperestorescript_path)
# Since determine_runner_type() is called before this, need to override it if a piperestorescript is found
self.runner = PiperestorescriptRunner(self.parameters)
return
def check_calibratable(self) -> bool:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment