From 12711b75457418e383c69b7cb10150aa5e9b4113 Mon Sep 17 00:00:00 2001 From: Charlotte Hausman <chausman@nrao.edu> Date: Mon, 19 Jul 2021 14:39:38 +0000 Subject: [PATCH] Post Server Apocalypse Round 2 --- .../pexable/casa_envoy/casa_envoy/auditor.py | 10 +-- .../casa_envoy/casa_envoy/launchers.py | 8 +- .../pexable/casa_envoy/test/test_auditor.py | 4 +- .../utilities/aat_wrest/aat_wrest/wrest.py | 1 - .../d3abd475fbb4_correcting_restore_condor.py | 77 +++++++++++++++++++ .../workflow/services/workflow_service.py | 8 +- 6 files changed, 94 insertions(+), 14 deletions(-) create mode 100644 schema/versions/d3abd475fbb4_correcting_restore_condor.py diff --git a/apps/cli/executables/pexable/casa_envoy/casa_envoy/auditor.py b/apps/cli/executables/pexable/casa_envoy/casa_envoy/auditor.py index 85feb6899..f182017ba 100644 --- a/apps/cli/executables/pexable/casa_envoy/casa_envoy/auditor.py +++ b/apps/cli/executables/pexable/casa_envoy/casa_envoy/auditor.py @@ -30,8 +30,8 @@ def get_fields_for(product_type: str, filename: str) -> list: "creationTime", "productLocator", "destinationDirectory", - "cms_path", - "sdmId", + "calibrationSourceDirectory", + "cmsName", ] restore_metadata_list = [ @@ -85,7 +85,7 @@ class AuditFiles(AuditorIF): if ".json" in file.filename: for tag in fields: - if tag not in content or len(json.loads(content)[tag]) == 0: + if tag not in content: missing.append(tag) if len(missing) > 0: print(f"Missing fields: {missing}") @@ -123,7 +123,7 @@ class AuditFiles(AuditorIF): invalid_files.append(file.filename) if len(invalid_files) != 0: - self.logger.info(f"INVALID FILE FOUND: {invalid_files}") + self.logger.error(f"INVALID FILE FOUND: {invalid_files}") return False else: return True @@ -142,7 +142,7 @@ class AuditDirectories(AuditorIF): current = os.getcwd() needed = self.rootDirectory + "/" + self.relative_path if needed != current: - self.logger.info("DIRECTORY ERROR: not in correct directory for processing.") + self.logger.error("DIRECTORY ERROR: not in correct directory for processing.") return False else: working = Path(current + "/working").is_dir() diff --git a/apps/cli/executables/pexable/casa_envoy/casa_envoy/launchers.py b/apps/cli/executables/pexable/casa_envoy/casa_envoy/launchers.py index a98f38f38..bd33001c2 100644 --- a/apps/cli/executables/pexable/casa_envoy/casa_envoy/launchers.py +++ b/apps/cli/executables/pexable/casa_envoy/casa_envoy/launchers.py @@ -50,7 +50,7 @@ class CasaLauncher: self.logger.info(f"{var}: {env}") if "None" in result_list: - self.logger.info("Environment setup Failed!") + self.logger.error("Environment setup Failed!") sys.exit(1) else: self.logger.info("Environment ready for processing") @@ -189,7 +189,7 @@ class ImagingLauncher(LauncherIF): if cms_name is not None and cms_path is not None and cms_name[-3:] == ".ms": return True else: - self.logger.info("CMS ERROR: Imaging requires a valid CMS name and location!") + self.logger.error("CMS ERROR: Imaging requires a valid CMS name and location!") return False def check_cal_and_imageable(self) -> bool: @@ -219,12 +219,12 @@ class ImagingLauncher(LauncherIF): if dir_audit: self.logger.info("Directory audit successful!") else: - self.logger.info("FAILURE: directory structure audit was unsuccessful!") + self.logger.error("FAILURE: directory structure audit was unsuccessful!") sys.exit(1) audit = AuditFiles([self.ppr, self.metadata], parameters).audit() if audit: self.logger.info("File audit successful!") else: - self.logger.info("FAILURE: file audit was unsuccessful!") + self.logger.error("FAILURE: file audit was unsuccessful!") sys.exit(1) diff --git a/apps/cli/executables/pexable/casa_envoy/test/test_auditor.py b/apps/cli/executables/pexable/casa_envoy/test/test_auditor.py index f5ed148b1..c55e1682f 100644 --- a/apps/cli/executables/pexable/casa_envoy/test/test_auditor.py +++ b/apps/cli/executables/pexable/casa_envoy/test/test_auditor.py @@ -72,8 +72,8 @@ def test_get_fields_for(): "creationTime", "productLocator", "destinationDirectory", - "cms_path", - "sdmId", + "calibrationSourceDirectory", + "cmsName", ] result = get_fields_for(product_type=product_type2, filename=test_img_metadata.filename) assert result == img_fields diff --git a/apps/cli/utilities/aat_wrest/aat_wrest/wrest.py b/apps/cli/utilities/aat_wrest/aat_wrest/wrest.py index c85b8f35e..598e75ffe 100644 --- a/apps/cli/utilities/aat_wrest/aat_wrest/wrest.py +++ b/apps/cli/utilities/aat_wrest/aat_wrest/wrest.py @@ -61,7 +61,6 @@ def determine_wrester(connection: MDDBConnector, args: argparse.Namespace): elif args.stdimg: data = WrestWorkflowMetadata(connection, sdm_id=args.stdimg[0]).wrest_standard_image_info() elif args.restore: - print(args) data = WrestWorkflowMetadata(connection, spl=args.restore).wrest_restore_info() elif args.observation: data = WrestObservationMetadata( diff --git a/schema/versions/d3abd475fbb4_correcting_restore_condor.py b/schema/versions/d3abd475fbb4_correcting_restore_condor.py new file mode 100644 index 000000000..2b047f906 --- /dev/null +++ b/schema/versions/d3abd475fbb4_correcting_restore_condor.py @@ -0,0 +1,77 @@ +"""correnting restore condor + +Revision ID: d3abd475fbb4 +Revises: 6508afd4da68 +Create Date: 2021-07-16 16:51:30.483952 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "d3abd475fbb4" +down_revision = "6508afd4da68" +branch_labels = None +depends_on = None + + +condor_content = """executable = std_restore_imaging.sh +arguments = {{product_locator}} {{cal_locator}} {{request_id}} metadata.json PPR.xml + +output = restore.out +error = restore.err +log = condor.log + +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +should_transfer_files = yes +transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/deliver, ./PPR.xml, ./metadata.json +transfer_output_files = working, rawdata, products + +request_memory = {{ramInGb}} +getenv = True +environment = "CAPO_PATH=/home/casa/capo" + +queue + + +""" + +old_content = """executable = std_restore_imaging.sh +arguments = {{product_locator}} {{cal_product_locator}} {{request_id}} metadata.json PPR.xml + +output = restore.out +error = restore.err +log = condor.log + +SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin +should_transfer_files = yes +transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/deliver, ./PPR.xml, ./metadata.json +transfer_output_files = working, rawdata, products + +request_memory = {{ramInGb}} +getenv = True +environment = "CAPO_PATH=/home/casa/capo" + +queue + + +""" + + +def upgrade(): + op.execute( + f""" + UPDATE workflow_templates + SET content = E'{condor_content}' where filename = 'restore_cms.condor' + """ + ) + + +def downgrade(): + op.execute( + f""" + UPDATE workflow_templates + SET content = E'{old_content}' where filename = 'restore_cms.condor' + """ + ) diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 95e2d1983..38f58f0b9 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -235,22 +235,26 @@ class WorkflowService(WorkflowServiceIF): if "calibration" in name: wrest_type = "-sc" argument = wf_request.argument["product_locator"] + argument2 = [] elif "restore" in name: wrest_type = "-r" - argument = [wf_request.argument["product_locator"], wf_request.argument["cal_locator"]] + argument = wf_request.argument["product_locator"] + argument2 = [wf_request.argument["cal_locator"]] elif "imaging" in name: wrest_type = "-si" argument = wf_request.argument["sdmId"] + argument2 = [] else: logger.info(f"No wrester found for workflow {name}. Does it actually require metadata?") return wf_request logger.info(f" workflow {name} has wrest option: {wrest_type}") with subprocess.Popen( - ["aat_wrest", wrest_type, argument], + ["aat_wrest", wrest_type, argument] + argument2, stdout=subprocess.PIPE, ) as wf_metadata: wf_json = wf_metadata.communicate()[0] + logger.info(wf_json.decode()) if "ERROR" not in wf_json.decode(): merged_args = { **json.loads(wf_json), -- GitLab