Skip to content
GitLab
Explore
Sign in
Register
Primary navigation
Search or go to…
Project
workspaces
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Wiki
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Build
Pipelines
Jobs
Pipeline schedules
Artifacts
Deploy
Releases
Package Registry
Model registry
Operate
Environments
Terraform modules
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
CI/CD analytics
Repository analytics
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Terms and privacy
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
ssa
workspaces
Merge requests
!1076
WS-1013: Create the pims_split Workspaces workflow
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
WS-1013: Create the pims_split Workspaces workflow
pims-split-foo
into
main
Overview
8
Commits
5
Pipelines
1
Changes
1
Merged
Nathan Bockisch
requested to merge
pims-split-foo
into
main
2 years ago
Overview
8
Commits
5
Pipelines
1
Changes
1
Expand
A first draft of the new pims_split workflow, based on input from our in-person working group
Edited
2 years ago
by
Nathan Bockisch
0
0
Merge request reports
Compare
main
version 4
3fdd34b0
2 years ago
version 3
8900acde
2 years ago
version 2
809dc3af
2 years ago
version 1
6584ad64
2 years ago
main (base)
and
version 1
latest version
ed5eb5a4
5 commits,
2 years ago
version 4
3fdd34b0
5 commits,
2 years ago
version 3
8900acde
4 commits,
2 years ago
version 2
809dc3af
3 commits,
2 years ago
version 1
6584ad64
2 commits,
2 years ago
1 file
+
218
−
0
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
shared/workspaces/alembic/versions/e8e6d54d8444_add_pimscache_split_workflow.py
0 → 100644
+
218
−
0
Options
"""
add pimscache split workflow
Revision ID: e8e6d54d8444
Revises: 55e5b37d1ccf
Create Date: 2022-09-21 11:19:03.245980
"""
from
alembic
import
op
import
sqlalchemy
as
sa
# revision identifiers, used by Alembic.
revision
=
'
e8e6d54d8444
'
down_revision
=
'
55e5b37d1ccf
'
branch_labels
=
None
depends_on
=
None
# Main DAG for the workflow
pims_split_dag
=
"""
JOB RESTORE restore_cms.condor
{#splits}
JOB {{.}} split_workflow.condor
VARS {{.}} jobname=
"
$(JOB)
"
split_dir=
"
$({{.}})
"
{/splits}
JOB FINISH write_finished_file.condor
PARENT RESTORE CHILD SPLIT
PARENT SPLIT CHILD FINISH
"""
# Restore job
restore_cms_condor
=
"""
executable = restore_cms.sh
arguments = {{product_locator}} {{cal_locator}} metadata.json PPR.xml
output = restore_cms.out
error = restore_cms.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(SBIN_PATH)/productfetcher, nraorsync://$(SBIN_PATH)/casa_envoy
transfer_output_files = .job.ad
+nrao_output_files =
"
rawdata
"
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment =
"
CAPO_PATH=/home/casa/capo
"
requirements = HasLustre == True
queue
"""
restore_cms_sh
=
"""
#!/bin/sh
set -o errexit
cd {{data_location}}
{{^existingRestore}}
cd rawdata/
../productfetcher --product-locator $1 --product-locator $2
cd ../
./casa_envoy --restore -c $3 $4
{{/existingRestore}}
"""
# Workflow run on each split
split_workflow_condor
=
"""
executable = split_workflow.sh
arguments =
"
$(split_dir)
"
SPLIT_DIR =
"
$(split_dir)
"
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
VLASS_DIR = {{data_location}}
SPOOL_DIR = {{spool_dir}}
PIMS_PATH = /lustre/aoc/cluster/pipeline/vlass_{{vlass_env}}/workspaces/bin
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/casa_envoy, nraorsync://$(SBIN_PATH)/pycapo, nraorsync://$(PIMS_PATH)/pimscache, nraosync://$(VLASS_DIR)/metadata.json, nraosync://$(VLASS_DIR)/working nraosync://$(VLASS_DIR)/tiles/$(SPLIT_DIR)
transfer_output_files = .job.ad
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment =
"
CAPO_PATH=/home/casa/capo
"
requirements = HasLustre == True
queue
"""
split_workflow_sh
=
"""
#!/bin/sh
set -o errexit
# Something to link measurement set to rawdata directory
# CASA envoy call?
# Populate the cache area
TILE=$(echo $1 | cut -d
'
/
'
-f 1)
PHCENTER=$(echo $1 | cut -d
'
/
'
-f 2)
# Get the measurement set path
{{^existingRestore}}
MS={{data_location}}/*.ms
{{/existingRestore}}
{{#existingRestore}}
MS={{existingRestore}}
{{/existingRestore}}
# Link it in the split
'
s rawdata
ln -s $MS $1/rawdata/
# Run CASA
./casa_envoy --split metadata.json $1/PPR.xml
# Populate cache
./pimscache cp -c {{vlass_product}} -t $TILE -p $PHCENTER $MS
"""
# Finish file creation
write_finished_file_condor
=
"""
write_finished_file.sh
output = write_finished.out
error = write_finished.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo
transfer_output_files = .job.ad
+nrao_output_files =
"
finished
"
output_destination = nraorsync://$(SPOOL_DIR)
+WantIOProxy = True
getenv = True
environment =
"
CAPO_PATH=/home/casa/capo
"
requirements = HasLustre == True
queue
"""
write_finished_file_sh
=
"""
#!/bin/sh
set -o errexit
cd {{data_location}}
/bin/date > finished
"""
def
upgrade
():
op
.
execute
(
"""
INSERT INTO workflows (workflow_name) VALUES (
'
pims_split
'
)
"""
)
op
.
execute
(
f
"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES (
'
pims_split.dag
'
, E
'
{
pims_split_dag
}
'
,
'
pims_split
'
)
"""
)
op
.
execute
(
f
"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES (
'
restore_cms.condor
'
, E
'
{
restore_cms_condor
}
'
,
'
pims_split
'
)
"""
)
op
.
execute
(
f
"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES (
'
restore_cms.sh
'
, E
'
{
restore_cms_sh
}
'
,
'
pims_split
'
)
"""
)
op
.
execute
(
f
"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES (
'
split_workflow.condor
'
, E
'
{
split_workflow_condor
}
'
,
'
pims_split
'
)
"""
)
op
.
execute
(
f
"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES (
'
split_workflow.sh
'
, E
'
{
split_workflow_sh
}
'
,
'
pims_split
'
)
"""
)
op
.
execute
(
f
"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES (
'
write_finished_file.condor
'
, E
'
{
write_finished_file_condor
}
'
,
'
pims_split
'
)
"""
)
op
.
execute
(
f
"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES (
'
write_finished_file.sh
'
, E
'
{
write_finished_file_sh
}
'
,
'
pims_split
'
)
"""
)
pass
def
downgrade
():
pass
Loading