Skip to content
Snippets Groups Projects
Commit 12711b75 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

Post Server Apocalypse Round 2

parent 392cea06
No related branches found
No related tags found
1 merge request!355Post Server Apocalypse Round 2
Pipeline #2231 passed
...@@ -30,8 +30,8 @@ def get_fields_for(product_type: str, filename: str) -> list: ...@@ -30,8 +30,8 @@ def get_fields_for(product_type: str, filename: str) -> list:
"creationTime", "creationTime",
"productLocator", "productLocator",
"destinationDirectory", "destinationDirectory",
"cms_path", "calibrationSourceDirectory",
"sdmId", "cmsName",
] ]
restore_metadata_list = [ restore_metadata_list = [
...@@ -85,7 +85,7 @@ class AuditFiles(AuditorIF): ...@@ -85,7 +85,7 @@ class AuditFiles(AuditorIF):
if ".json" in file.filename: if ".json" in file.filename:
for tag in fields: for tag in fields:
if tag not in content or len(json.loads(content)[tag]) == 0: if tag not in content:
missing.append(tag) missing.append(tag)
if len(missing) > 0: if len(missing) > 0:
print(f"Missing fields: {missing}") print(f"Missing fields: {missing}")
...@@ -123,7 +123,7 @@ class AuditFiles(AuditorIF): ...@@ -123,7 +123,7 @@ class AuditFiles(AuditorIF):
invalid_files.append(file.filename) invalid_files.append(file.filename)
if len(invalid_files) != 0: if len(invalid_files) != 0:
self.logger.info(f"INVALID FILE FOUND: {invalid_files}") self.logger.error(f"INVALID FILE FOUND: {invalid_files}")
return False return False
else: else:
return True return True
...@@ -142,7 +142,7 @@ class AuditDirectories(AuditorIF): ...@@ -142,7 +142,7 @@ class AuditDirectories(AuditorIF):
current = os.getcwd() current = os.getcwd()
needed = self.rootDirectory + "/" + self.relative_path needed = self.rootDirectory + "/" + self.relative_path
if needed != current: if needed != current:
self.logger.info("DIRECTORY ERROR: not in correct directory for processing.") self.logger.error("DIRECTORY ERROR: not in correct directory for processing.")
return False return False
else: else:
working = Path(current + "/working").is_dir() working = Path(current + "/working").is_dir()
......
...@@ -50,7 +50,7 @@ class CasaLauncher: ...@@ -50,7 +50,7 @@ class CasaLauncher:
self.logger.info(f"{var}: {env}") self.logger.info(f"{var}: {env}")
if "None" in result_list: if "None" in result_list:
self.logger.info("Environment setup Failed!") self.logger.error("Environment setup Failed!")
sys.exit(1) sys.exit(1)
else: else:
self.logger.info("Environment ready for processing") self.logger.info("Environment ready for processing")
...@@ -189,7 +189,7 @@ class ImagingLauncher(LauncherIF): ...@@ -189,7 +189,7 @@ class ImagingLauncher(LauncherIF):
if cms_name is not None and cms_path is not None and cms_name[-3:] == ".ms": if cms_name is not None and cms_path is not None and cms_name[-3:] == ".ms":
return True return True
else: else:
self.logger.info("CMS ERROR: Imaging requires a valid CMS name and location!") self.logger.error("CMS ERROR: Imaging requires a valid CMS name and location!")
return False return False
def check_cal_and_imageable(self) -> bool: def check_cal_and_imageable(self) -> bool:
...@@ -219,12 +219,12 @@ class ImagingLauncher(LauncherIF): ...@@ -219,12 +219,12 @@ class ImagingLauncher(LauncherIF):
if dir_audit: if dir_audit:
self.logger.info("Directory audit successful!") self.logger.info("Directory audit successful!")
else: else:
self.logger.info("FAILURE: directory structure audit was unsuccessful!") self.logger.error("FAILURE: directory structure audit was unsuccessful!")
sys.exit(1) sys.exit(1)
audit = AuditFiles([self.ppr, self.metadata], parameters).audit() audit = AuditFiles([self.ppr, self.metadata], parameters).audit()
if audit: if audit:
self.logger.info("File audit successful!") self.logger.info("File audit successful!")
else: else:
self.logger.info("FAILURE: file audit was unsuccessful!") self.logger.error("FAILURE: file audit was unsuccessful!")
sys.exit(1) sys.exit(1)
...@@ -72,8 +72,8 @@ def test_get_fields_for(): ...@@ -72,8 +72,8 @@ def test_get_fields_for():
"creationTime", "creationTime",
"productLocator", "productLocator",
"destinationDirectory", "destinationDirectory",
"cms_path", "calibrationSourceDirectory",
"sdmId", "cmsName",
] ]
result = get_fields_for(product_type=product_type2, filename=test_img_metadata.filename) result = get_fields_for(product_type=product_type2, filename=test_img_metadata.filename)
assert result == img_fields assert result == img_fields
......
...@@ -61,7 +61,6 @@ def determine_wrester(connection: MDDBConnector, args: argparse.Namespace): ...@@ -61,7 +61,6 @@ def determine_wrester(connection: MDDBConnector, args: argparse.Namespace):
elif args.stdimg: elif args.stdimg:
data = WrestWorkflowMetadata(connection, sdm_id=args.stdimg[0]).wrest_standard_image_info() data = WrestWorkflowMetadata(connection, sdm_id=args.stdimg[0]).wrest_standard_image_info()
elif args.restore: elif args.restore:
print(args)
data = WrestWorkflowMetadata(connection, spl=args.restore).wrest_restore_info() data = WrestWorkflowMetadata(connection, spl=args.restore).wrest_restore_info()
elif args.observation: elif args.observation:
data = WrestObservationMetadata( data = WrestObservationMetadata(
......
"""correnting restore condor
Revision ID: d3abd475fbb4
Revises: 6508afd4da68
Create Date: 2021-07-16 16:51:30.483952
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "d3abd475fbb4"
down_revision = "6508afd4da68"
branch_labels = None
depends_on = None
condor_content = """executable = std_restore_imaging.sh
arguments = {{product_locator}} {{cal_locator}} {{request_id}} metadata.json PPR.xml
output = restore.out
error = restore.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/deliver, ./PPR.xml, ./metadata.json
transfer_output_files = working, rawdata, products
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
old_content = """executable = std_restore_imaging.sh
arguments = {{product_locator}} {{cal_product_locator}} {{request_id}} metadata.json PPR.xml
output = restore.out
error = restore.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/deliver, ./PPR.xml, ./metadata.json
transfer_output_files = working, rawdata, products
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
def upgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content = E'{condor_content}' where filename = 'restore_cms.condor'
"""
)
def downgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content = E'{old_content}' where filename = 'restore_cms.condor'
"""
)
...@@ -235,22 +235,26 @@ class WorkflowService(WorkflowServiceIF): ...@@ -235,22 +235,26 @@ class WorkflowService(WorkflowServiceIF):
if "calibration" in name: if "calibration" in name:
wrest_type = "-sc" wrest_type = "-sc"
argument = wf_request.argument["product_locator"] argument = wf_request.argument["product_locator"]
argument2 = []
elif "restore" in name: elif "restore" in name:
wrest_type = "-r" wrest_type = "-r"
argument = [wf_request.argument["product_locator"], wf_request.argument["cal_locator"]] argument = wf_request.argument["product_locator"]
argument2 = [wf_request.argument["cal_locator"]]
elif "imaging" in name: elif "imaging" in name:
wrest_type = "-si" wrest_type = "-si"
argument = wf_request.argument["sdmId"] argument = wf_request.argument["sdmId"]
argument2 = []
else: else:
logger.info(f"No wrester found for workflow {name}. Does it actually require metadata?") logger.info(f"No wrester found for workflow {name}. Does it actually require metadata?")
return wf_request return wf_request
logger.info(f" workflow {name} has wrest option: {wrest_type}") logger.info(f" workflow {name} has wrest option: {wrest_type}")
with subprocess.Popen( with subprocess.Popen(
["aat_wrest", wrest_type, argument], ["aat_wrest", wrest_type, argument] + argument2,
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
) as wf_metadata: ) as wf_metadata:
wf_json = wf_metadata.communicate()[0] wf_json = wf_metadata.communicate()[0]
logger.info(wf_json.decode())
if "ERROR" not in wf_json.decode(): if "ERROR" not in wf_json.decode():
merged_args = { merged_args = {
**json.loads(wf_json), **json.loads(wf_json),
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment