Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (2)
......@@ -57,6 +57,7 @@ class DeliveryConveyor(ConveyorIF):
# make new cache directory with name matching workspaces parent directory and unzip weblog into it
os.makedirs(weblog_location)
os.chmod(weblog_location, 0o2770)
location_path = self.settings["destination_dir"] + "/products"
if pathlib.Path(location_path + "/weblog.tgz").exists():
tar = tarfile.open(location_path + "/weblog.tgz")
......@@ -99,6 +100,8 @@ class DeliveryConveyor(ConveyorIF):
if not pathlib.Path(qa_path).exists():
os.makedirs(qa_path)
os.chown(qa_path, vlapipe_id, vlapipe_id)
os.chmod(qa_path, 0o2770)
else:
self.logger.info(f"Delivering to existing {qa_path}:")
self.logger.info(os.listdir(qa_path))
......
......@@ -49,7 +49,8 @@ class TestDeliveryConveyor:
@patch("sys.exit")
@patch("shutil.copytree")
@patch("tarfile.open")
def test_cache_weblog(self, mock_tar, mock_shutil, mock_exit, mock_glob):
@patch("os.chmod")
def test_cache_weblog(self, mock_chmod, mock_tar, mock_shutil, mock_exit, mock_glob):
with patch("os.makedirs") as make_dirs:
with patch("conveyor.deliver.DeliveryConveyor._get_pipeline_dir") as pipeline:
with patch("conveyor.deliver.DeliveryConveyor.create_qa_notes") as qa:
......@@ -79,14 +80,16 @@ class TestDeliveryConveyor:
@patch("os.symlink")
@patch("os.system")
@patch("shutil.move")
@patch("os.chmod")
@patch("os.chown")
@patch("os.makedirs")
def test_move_subdirectories_to_qa_area(self, mock_dirs, mock_chown, mock_move, mock_system, mock_link):
def test_move_subdirectories_to_qa_area(self, mock_dirs, mock_chown, mock_chmod, mock_move, mock_system, mock_link):
with patch("glob.glob", MagicMock(return_value=["rawdata/", "working/", "products/"])) as contents:
path = "/lustre/aoc/cluster/pipeline/docker/workspaces/qa2/tmpabcd1234"
conveyor.move_subdirectories_to_qa_area(path)
assert mock_dirs.call_count == 1
assert mock_chown.call_count == 1
assert mock_chmod.call_count == 1
assert mock_move.call_count == 3
assert mock_system.call_count == 3
assert mock_link.call_count == 3
......
"""Add remaining VLASS SECI imaging workflow
Revision ID: c5b3f866d904
Revises: 637935ee1583
Create Date: 2022-10-04 13:46:07.781964
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'c5b3f866d904'
down_revision = '637935ee1583'
branch_labels = None
depends_on = None
# Overall workflow DAG
vlass_seci_dag = """JOB FETCH vlass_seci_fetch.condor
VARS FETCH jobname="$(JOB)"
JOB ENVOY vlass_seci_envoy.condor
VARS ENVOY jobname="$(JOB)"
JOB POST vlass_seci_post.condor
VARS ENVOY jobname="$(JOB)"
PARENT FETCH CHILD ENVOY
PARENT ENVOY CHILD POST
"""
# Pimscache Fetch Step
fetch_condor = """executable = vlass_seci_fetch.sh
arguments = {{calibration_name}} {{phase_center}} {{data_location}}/rawdata
output = fetch.out
error = fetch.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
PIMS_PATH = /lustre/aoc/cluster/pipeline/vlass_{{vlass_env}}/workspaces/bin
VLASS_DIR = {{data_location}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(PIMS_PATH)/pimscache
when_to_transfer_output = ON_EXIT
transfer_output_files = .job.ad
output_destination = nraorsync://$(VLASS_DIR)
+WantIOProxy = True
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
request_memory = 2G
requirements = HasLustre == True
queue
"""
fetch_sh = """#!/bin/sh
set -o errexit
export HOME=$TMPDIR
./pimscache ln -c $1 -p $2 $3
"""
# CASA Step
old_envoy_condor = """executable = vlass_seci.sh
arguments = metadata.json PPR.xml
output = seci.out
error = seci.err
log = condor.log
VLASS_DIR = {{data_location}}
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/.matplotlib, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(VLASS_DIR)/working, nraorsync://$(VLASS_DIR)/rawdata, nraorsync://$(VLASS_DIR)/products{{#remote}}, nraorsync://$(VLASS_DIR)/{{profile}}.properties{{/remote}}, nraorsync://$(VLASS_DIR)/PPR.xml, nraorsync://$(VLASS_DIR)/metadata.json
when_to_transfer_output = ON_EXIT
transfer_output_files = .job.ad
+nrao_output_files = "working products"
output_destination = nraorsync://$(VLASS_DIR)
+WantIOProxy = True
request_memory = 50G
getenv = True
{{^remote}}
environment = "CAPO_PATH=/home/casa/capo"
requirements = (VLASS == True) && (HasLustre == True)
+partition = "VLASS"
{{/remote}}
{{#remote}}
requirements = (VLASS == True)
+partition = "VLASS"
Rank = (TARGET.VLASS == True) + (TARGET.VLASSTEST =!= True)
{{/remote}}
queue
"""
envoy_condor = """executable = vlass_seci_envoy.sh
arguments = metadata.json PPR.xml
output = envoy.out
error = envoy.err
log = condor.log
VLASS_DIR = {{data_location}}
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/.matplotlib, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/vela, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(VLASS_DIR)/working, nraorsync://$(VLASS_DIR)/rawdata, nraorsync://$(VLASS_DIR)/products{{#remote}}, nraorsync://$(VLASS_DIR)/{{profile}}.properties{{/remote}}, nraorsync://$(VLASS_DIR)/PPR.xml, nraorsync://$(VLASS_DIR)/metadata.json
when_to_transfer_output = ON_EXIT
transfer_output_files = .job.ad
+nrao_output_files = "working products"
output_destination = nraorsync://$(VLASS_DIR)
+WantIOProxy = True
request_memory = 50G
getenv = True
{{^remote}}
environment = "CAPO_PATH=/home/casa/capo"
requirements = (VLASS == True) && (HasLustre == True)
+partition = "VLASS"
{{/remote}}
{{#remote}}
requirements = (VLASS == True)
+partition = "VLASS"
Rank = (TARGET.VLASS == True) + (TARGET.VLASSTEST =!= True)
{{/remote}}
queue
"""
envoy_sh = """#!/bin/sh
export HOME=$TMPDIR
{{#remote}}
export CAPO_PATH=.
export CAPO_PROFILE={{profile}}
{{/remote}}
set -o errexit
./casa_envoy --vlass-img $1 $2
"""
# QA/List Post step
post_condor = """executable = vlass_seci_post.sh
arguments = metadata.json {{manager_job_id}}
output = post.out
error = post.err
log = condor.log
should_transfer_files = NO
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
requirements = HasLustre == True
queue
"""
post_sh = """#!/bin/sh
set -o errexit
export CASAVERS=$(capo -q edu.nrao.workspaces.ProcessingSettings.CasaVersion.vlass)
/lustre/aoc/cluster/pipeline/dsoc-{{vlass_env}}/workspaces/sbin/conveyor --vlass $1
source /lustre/aoc/cluster/pipeline/vlass_{{vlass_env}}/workspaces/bin/activate
export CAPO_PROFILE=vlass.{{vlass_env}}
JOB_ID=$2
printf "JOB_ID = %s
" $JOB_ID
cd working/
CASA_LOG_FILE=`ls casa-*.log`
../../../../workspaces/bin/annotateQaSE.py --jobid $JOB_ID $CASA_LOG_FILE
deactivate
cd ../products
/lustre/aoc/cluster/pipeline/vlass_{{vlass_env}}/workspaces/bin/create-component-list.sh $JOB_ID
"""
def upgrade():
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('vlass_seci.dag', E'{vlass_seci_dag}', 'vlass_seci')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('vlass_seci_fetch.condor', E'{fetch_condor}', 'vlass_seci')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('vlass_seci_fetch.sh', E'{fetch_sh}', 'vlass_seci')
"""
)
op.execute(
f"""
UPDATE workflow_templates SET content=E'{envoy_condor}',
filename='vlass_seci_envoy.condor' WHERE workflow_name='vlass_seci'
AND filename='vlass_seci.condor'
"""
)
op.execute(
f"""
UPDATE workflow_templates SET filename='vlass_seci_envoy.sh'
WHERE workflow_name='vlass_seci' AND filename='vlass_seci.sh'
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('vlass_seci_post.condor', E'{post_condor}', 'vlass_seci')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('vlass_seci_post.sh', E'{post_sh}', 'vlass_seci')
"""
)
def downgrade():
op.execute(
f"""
UPDATE workflow_templates SET content=E'{old_envoy_condor}',
filename='vlass_seci.condor' WHERE workflow_name='vlass_seci'
AND filename='vlass_seci_envoy.condor'
"""
)
op.execute(
f"""
UPDATE workflow_templates SET filename='vlass_seci.sh'
WHERE workflow_name='vlass_seci' AND filename='vlass_seci_envoy.sh'
"""
)
op.execute(
f"""
DELETE from workflow_templates WHERE workflow_name='vlass_seci'
AND filename='vlass_seci.dag'
"""
)
op.execute(
f"""
DELETE from workflow_templates WHERE workflow_name='vlass_seci'
AND filename='vlass_seci_fetch.condor'
"""
)
op.execute(
f"""
DELETE from workflow_templates WHERE workflow_name='vlass_seci'
AND filename='vlass_seci_fetch.sh'
"""
)
op.execute(
f"""
DELETE from workflow_templates WHERE workflow_name='vlass_seci'
AND filename='vlass_seci_post.condor'
"""
)
op.execute(
f"""
DELETE from workflow_templates WHERE workflow_name='vlass_seci'
AND filename='vlass_seci_post.sh'
"""
)
......@@ -416,6 +416,7 @@ class WorkflowService(WorkflowServiceIF):
os.mkdir(temp_folder_path)
temp_folder = Path(mkdtemp(dir=temp_folder_path))
os.chown(temp_folder, vlapipe_id, vlapipe_id)
os.chmod(temp_folder, 0o2770)
logger.info("Settled on temp folder %s", temp_folder)
# set new directory as result_dir
request.results_dir = temp_folder.__str__()
......