Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (74)
Showing
with 1217 additions and 255 deletions
...@@ -88,3 +88,6 @@ local_ngas_root/cache/ ...@@ -88,3 +88,6 @@ local_ngas_root/cache/
local_ngas_root/log/ local_ngas_root/log/
local_ngas_root/processing/ local_ngas_root/processing/
local_ngas_root/*.pid local_ngas_root/*.pid
# ignore go version files
**/version.txt
...@@ -26,6 +26,7 @@ import ( ...@@ -26,6 +26,7 @@ import (
"io" "io"
"io/ioutil" "io/ioutil"
"net/http" "net/http"
"net/url"
"path/filepath" "path/filepath"
) )
...@@ -33,11 +34,16 @@ type calibrationInfo struct { ...@@ -33,11 +34,16 @@ type calibrationInfo struct {
Locator string `json:"locator"` Locator string `json:"locator"`
} }
//CapoInput
/**
* struct for holding CAPO information
*/
type CapoInput struct { type CapoInput struct {
CapoPath string CapoPath string
CapoProfile string CapoProfile string
} }
//checkError
/** /**
* Check if an error happened and panic on it if so * Check if an error happened and panic on it if so
* *
...@@ -79,7 +85,7 @@ func GetCalibrationLocator(imgSetPath string, input CapoInput) string { ...@@ -79,7 +85,7 @@ func GetCalibrationLocator(imgSetPath string, input CapoInput) string {
imgSet := filepath.Base(imgSetPath) imgSet := filepath.Base(imgSetPath)
// Make the REST call // Make the REST call
lookupUrl := locatorEndpoint + imgSet lookupUrl := locatorEndpoint + url.QueryEscape(imgSet)
resp, err := http.Get(lookupUrl) resp, err := http.Get(lookupUrl)
checkError(err) checkError(err)
......
...@@ -20,138 +20,143 @@ ...@@ -20,138 +20,143 @@
package pims package pims
import ( import (
"bufio" "bufio"
"database/sql" "database/sql"
"encoding/json" "encoding/json"
"fmt" "fmt"
"os" "os"
"path/filepath" "path/filepath"
"ssa/pims_analyzer/pkg/db" "ssa/pims_analyzer/pkg/db"
"strings" "strings"
) )
// Stores PIMS info requested by the DAs for PIMS notification emails // Stores PIMS info requested by the DAs for PIMS notification emails
type PimsInfo struct { type PimsInfo struct {
// the calibration requested // the calibration requested
Calibration string `json:"calibration"` Calibration string `json:"calibration"`
// The number of splits in the job // The number of splits in the job
NumSplits int `json:"num_splits"` NumSplits int `json:"num_splits"`
// Path to restored MS // Path to restored MS
RestoredMSPath string `json:"restore_path"` RestoredMSPath string `json:"restore_path"`
// Path to CASA used // Path to CASA used
CasaVersion string `json:"casa_path"` CasaVersion string `json:"casa_path"`
// lustre processing area // lustre processing area
WorkingDir string `json:"lustre_dir"` WorkingDir string `json:"lustre_dir"`
// output cache directory // output cache directory
CacheDir string `json:"cache_dir"` CacheDir string `json:"cache_dir"`
// number of SE and CC products per tile // number of SE and CC products per tile
NumProducts []TileProductCounts `json:"num_products"` NumProducts []TileProductCounts `json:"num_products"`
// used to determine workflow status header message // used to determine workflow status header message
StatusMsg string `json:"status"` StatusMsg string `json:"status"`
// contains the failed splits in `tile.phasecenter` format // contains the failed splits in `tile.phasecenter` format
FailedSplits []string `json:"failed_splits"` FailedSplits []string `json:"failed_splits"`
// Num failed splits (needs to be in JSON to get passed to email template) // Num failed splits (needs to be in JSON to get passed to email template)
NumFailedSplits int `json:"num_failed_splits"` NumFailedSplits int `json:"num_failed_splits"`
} }
// Contains all the relevant columns of the `pims_split` entry in the archive // Contains all the relevant columns of the `pims_split` entry in the archive
// database // database
type workflowEntry struct { type workflowEntry struct {
arguments []byte arguments []byte
state string resultsDir string
resultsDir string
} }
// Populates a PimsInfo struct with data from a given workflow identified by the // GetPimsInfo Populates a PimsInfo struct with data from a given workflow identified by the
// workflow ID // workflow ID
// //
// @param workflowId the ID of the PIMS workflow // @param workflowId the ID of the PIMS workflow
// @param capoPath the path to the capo property file // @param capoPath the path to the capo property file
// @param capProfile the CAPO profile used during the workflow execution // @param capProfile the CAPO profile used during the workflow execution
func GetPimsInfo(workflowId int, capoPath string, capoProfile string) PimsInfo { func GetPimsInfo(workflowId int, capoPath string, capoProfile string) PimsInfo {
var pimsInfo PimsInfo var pimsInfo PimsInfo
// We need connections to both the VLASS and Archive databases for everything // We need connections to both the VLASS and Archive databases for everything
var vlassInfo db.DbInfo var vlassInfo db.DbInfo
var archiveInfo db.DbInfo var archiveInfo db.DbInfo
// VLASS profile is the same as the dsoc profile with `vlass.` // VLASS profile is the same as the dsoc profile with `vlass.`
vlassInfo.Profile = strings.Replace(capoProfile, "dsoc-", "vlass.", -1) vlassInfo.Profile = strings.Replace(capoProfile, "dsoc-", "vlass.", -1)
archiveInfo.Profile = capoProfile archiveInfo.Profile = capoProfile
vlassInfo.CapoPath, archiveInfo.CapoPath = capoPath, capoPath vlassInfo.CapoPath, archiveInfo.CapoPath = capoPath, capoPath
// Vlass and Archive have different prefixes // Vlass and Archive have different prefixes
vlassInfo.Prefix = "edu.nrao.vlass.config.VlassMngrSettings" vlassInfo.Prefix = "edu.nrao.vlass.config.VlassMngrSettings"
archiveInfo.Prefix = "metadataDatabase" archiveInfo.Prefix = "metadataDatabase"
vlassDb := db.GetConnection(vlassInfo) vlassDb := db.GetConnection(vlassInfo)
archiveDb := db.GetConnection(archiveInfo) archiveDb := db.GetConnection(archiveInfo)
defer vlassDb.Close() defer vlassDb.Close()
defer archiveDb.Close() defer archiveDb.Close()
wfEntry, err := getWorkflowColumns(workflowId, archiveDb) wfEntry, err := getWorkflowColumns(workflowId, archiveDb)
db.CheckError(err) db.CheckError(err)
// Populate PimsInfo with data from archive database // Populate PimsInfo with data from archive database
var jsonArgs map[string]interface{} var jsonArgs map[string]interface{}
err = json.Unmarshal(wfEntry.arguments, &jsonArgs) err = json.Unmarshal(wfEntry.arguments, &jsonArgs)
db.CheckError(err) db.CheckError(err)
pimsInfo.FailedSplits, err = readFailedSplits() pimsInfo.FailedSplits, err = readFailedSplits()
db.CheckError(err) db.CheckError(err)
pimsInfo.NumFailedSplits = len(pimsInfo.FailedSplits) pimsInfo.NumFailedSplits = len(pimsInfo.FailedSplits)
pimsInfo.Calibration = jsonArgs["vlass_product"].(string) pimsInfo.Calibration = jsonArgs["vlass_product"].(string)
pimsInfo.CasaVersion = jsonArgs["casaHome"].(string) pimsInfo.CasaVersion = jsonArgs["casaHome"].(string)
pimsInfo.WorkingDir = wfEntry.resultsDir pimsInfo.WorkingDir = wfEntry.resultsDir
pimsInfo.CacheDir = fmt.Sprintf("/lustre/aoc/cluster/pipeline/%s/cache/pims/%s", pimsInfo.CacheDir = fmt.Sprintf("/lustre/aoc/cluster/pipeline/%s/cache/pims/%s",
strings.Replace(capoProfile, "dsoc-", "vlass_", -1), strings.Replace(capoProfile, "dsoc-", "vlass_", -1),
pimsInfo.Calibration, pimsInfo.Calibration,
) )
pimsInfo.StatusMsg = fmt.Sprintf("PIMS Split workflow completed with state: %s",
wfEntry.state, // Workflow reported as failed when there are failed splits
) status := "succeeded"
if pimsInfo.NumFailedSplits != 0 {
// Check if there's a Restored MS Path status = "failed"
if jsonArgs["existing_restore"] != nil { }
pimsInfo.RestoredMSPath = jsonArgs["existing_restore"].(string) pimsInfo.StatusMsg = fmt.Sprintf("PIMS Split workflow %s!",
} else { status,
// Check if a restore is present in the working directory )
// Should be in working/ in the lustre directory
var restoreFiles []string // Check if there's a Restored MS Path
restoreFiles, err = filepath.Glob(pimsInfo.WorkingDir + "/working/*.ms") if jsonArgs["existing_restore"] != nil {
pimsInfo.RestoredMSPath = jsonArgs["existing_restore"].(string)
if restoreFiles == nil { } else {
pimsInfo.RestoredMSPath = "Restore failed!" // Check if a restore is present in the working directory
} else { // Should be in working/ in the lustre directory
pimsInfo.RestoredMSPath = restoreFiles[0] var restoreFiles []string
} restoreFiles, err = filepath.Glob(pimsInfo.WorkingDir + "/working/*.ms")
}
if restoreFiles == nil {
// Add Splits pimsInfo.RestoredMSPath = "Restore failed!"
split_list_raw := jsonArgs["splits"].([]interface {}) } else {
split_list := make([]string, len(split_list_raw)) pimsInfo.RestoredMSPath = restoreFiles[0]
}
for i, v := range jsonArgs["splits"].([]interface {}) { }
split_list[i] = fmt.Sprint(v)
} // Add Splits
split_list_raw := jsonArgs["splits"].([]interface{})
pimsInfo.NumSplits = len(split_list) split_list := make([]string, len(split_list_raw))
// Get unique tile values from `tile/phasecenter` values for i, v := range jsonArgs["splits"].([]interface{}) {
tiles := make(map[string]bool) split_list[i] = fmt.Sprint(v)
for _, v := range split_list { }
tile := strings.Split(v,"/")[0]
tiles[tile] = true pimsInfo.NumSplits = len(split_list)
}
// Get unique tile values from `tile/phasecenter` values
pimsInfo.NumProducts, err = getNumProductsPerTile(tiles, vlassDb) tiles := make(map[string]bool)
db.CheckError(err) for _, v := range split_list {
tile := strings.Split(v, "/")[0]
return pimsInfo tiles[tile] = true
}
pimsInfo.NumProducts, err = getNumProductsPerTile(tiles, vlassDb)
db.CheckError(err)
return pimsInfo
} }
// Store the relevant columns for the workflow entry in the archive database in // Store the relevant columns for the workflow entry in the archive database in
...@@ -161,38 +166,37 @@ func GetPimsInfo(workflowId int, capoPath string, capoProfile string) PimsInfo { ...@@ -161,38 +166,37 @@ func GetPimsInfo(workflowId int, capoPath string, capoProfile string) PimsInfo {
// @param A connection to the database // @param A connection to the database
// @return A workflowEntry struct with the column data or an error // @return A workflowEntry struct with the column data or an error
func getWorkflowColumns(workflowId int, dbConnection *sql.DB) (workflowEntry, error) { func getWorkflowColumns(workflowId int, dbConnection *sql.DB) (workflowEntry, error) {
workflowTable := "workflow_requests" workflowTable := "workflow_requests"
workflowIdKey := "workflow_request_id" workflowIdKey := "workflow_request_id"
dataCols := "argument,state,results_dir" dataCols := "argument,results_dir"
var entry workflowEntry var entry workflowEntry
// var tmpArgs string // Must get JSON as string first // var tmpArgs string // Must get JSON as string first
var err error var err error
// Get relevant columns from the entry in the database // Get relevant columns from the entry in the database
// Get the database rows where metadata is extracted from // Get the database rows where metadata is extracted from
err = dbConnection.QueryRow( err = dbConnection.QueryRow(
fmt.Sprintf("SELECT %s FROM %s WHERE %s='%d'", fmt.Sprintf("SELECT %s FROM %s WHERE %s='%d'",
dataCols, dataCols,
workflowTable, workflowTable,
workflowIdKey, workflowIdKey,
workflowId), workflowId),
).Scan(&entry.arguments, &entry.state, &entry.resultsDir) ).Scan(&entry.arguments, &entry.resultsDir)
return entry, err return entry, err
} }
// Stores the number of SEs and CCs in the database for a tile // Stores the number of SEs and CCs in the database for a tile
type TileProductCounts struct { type TileProductCounts struct {
// Name of the Tile // Name of the Tile
Tile string `json:"tile_name"` Tile string `json:"tile_name"`
// Number of coarse cubes associated // Number of coarse cubes associated
NumCoarseCubes int `json:"num_coarse_cube"` NumCoarseCubes int `json:"num_coarse_cube"`
// Number of continuum images associated // Number of continuum images associated
NumContinuum int `json:"num_continuum"` NumContinuum int `json:"num_continuum"`
} }
// Populates an array of tileProductCounts structs with CC and SE counts for a // Populates an array of tileProductCounts structs with CC and SE counts for a
// list of tiles // list of tiles
// //
...@@ -202,23 +206,25 @@ type TileProductCounts struct { ...@@ -202,23 +206,25 @@ type TileProductCounts struct {
// @return An array of tileProductCounts with the counts and any error // @return An array of tileProductCounts with the counts and any error
// encountered // encountered
func getNumProductsPerTile(tiles map[string]bool, dbConnection *sql.DB) ([]TileProductCounts, error) { func getNumProductsPerTile(tiles map[string]bool, dbConnection *sql.DB) ([]TileProductCounts, error) {
var productCounts []TileProductCounts productCounts := make([]TileProductCounts, 0)
var err error var err error
// Only do this if there are tiles to check products for // Only do this if there are tiles to check products for
if len(tiles) <= 0 { return productCounts, err } if len(tiles) <= 0 {
return productCounts, err
// Get tiles as string in `(tile1, tile2, etc.)` format to fit the query }
// syntax
inTilesQuery := "(" // Get tiles as string in `(tile1, tile2, etc.)` format to fit the query
for key := range tiles { // syntax
inTilesQuery += fmt.Sprintf("'%s', ", key) inTilesQuery := "("
} for key := range tiles {
// Remove trailing , inTilesQuery += fmt.Sprintf("'%s', ", key)
inTilesQuery = strings.TrimRight(inTilesQuery, ", ") }
inTilesQuery += ")" // Remove trailing ,
inTilesQuery = strings.TrimRight(inTilesQuery, ", ")
query := fmt.Sprintf(`select * from crosstab($$ inTilesQuery += ")"
query := fmt.Sprintf(`select * from crosstab($$
select m.name as minitile, pt.name as product_type, count(*) as count select m.name as minitile, pt.name as product_type, count(*) as count
from product from product
join product_type pt on product.product_type_id = pt.id join product_type pt on product.product_type_id = pt.id
...@@ -230,40 +236,46 @@ group by m.name, pt.name ...@@ -230,40 +236,46 @@ group by m.name, pt.name
order by minitile, product_type; order by minitile, product_type;
$$, $$VALUES ('se_coarse_cube_image'),('se_continuum_imaging')$$) as $$, $$VALUES ('se_coarse_cube_image'),('se_continuum_imaging')$$) as
ct(minitile varchar, se_coarse_cube_image varchar, se_continuum_imaging varchar);`, ct(minitile varchar, se_coarse_cube_image varchar, se_continuum_imaging varchar);`,
inTilesQuery) inTilesQuery)
var rows *sql.Rows var rows *sql.Rows
rows, err = dbConnection.Query(query) rows, err = dbConnection.Query(query)
if err != nil { return productCounts, err } if err != nil {
return productCounts, err
defer rows.Close() }
for rows.Next() { defer rows.Close()
var count TileProductCounts
for rows.Next() {
// Needed to deal with potential null values var count TileProductCounts
var nullableCoarseCount, nullableContinuumCount sql.NullInt32
// Needed to deal with potential null values
if err = rows.Scan( var nullableCoarseCount, nullableContinuumCount sql.NullInt32
&count.Tile,
&nullableCoarseCount, if err = rows.Scan(
&nullableContinuumCount) ; err != nil { &count.Tile,
return productCounts, err &nullableCoarseCount,
} &nullableContinuumCount); err != nil {
return productCounts, err
// Make sure null counts from DB handled }
if nullableCoarseCount.Valid {
count.NumCoarseCubes = int(nullableCoarseCount.Int32) // Make sure null counts from DB handled
} else { count.NumCoarseCubes = 0 } if nullableCoarseCount.Valid {
count.NumCoarseCubes = int(nullableCoarseCount.Int32)
if nullableContinuumCount.Valid { } else {
count.NumContinuum = int(nullableContinuumCount.Int32) count.NumCoarseCubes = 0
} else { count.NumContinuum = 0 } }
productCounts = append(productCounts, count) if nullableContinuumCount.Valid {
} count.NumContinuum = int(nullableContinuumCount.Int32)
} else {
return productCounts, err count.NumContinuum = 0
}
productCounts = append(productCounts, count)
}
return productCounts, err
} }
// Whenever a `pimscache` call returns non-0 in the `pims_split` workflow, the // Whenever a `pimscache` call returns non-0 in the `pims_split` workflow, the
...@@ -274,20 +286,22 @@ $$, $$VALUES ('se_coarse_cube_image'),('se_continuum_imaging')$$) as ...@@ -274,20 +286,22 @@ $$, $$VALUES ('se_coarse_cube_image'),('se_continuum_imaging')$$) as
// @return a string array with the split entries and any error that was // @return a string array with the split entries and any error that was
// encountered // encountered
func readFailedSplits() ([]string, error) { func readFailedSplits() ([]string, error) {
failedTilesFilename := "failed_splits.txt" failedTilesFilename := "failed_splits.txt"
var failedTiles []string failedTiles := make([]string, 0)
failedTilesFile, err := os.Open(failedTilesFilename) failedTilesFile, err := os.Open(failedTilesFilename)
if err != nil { return failedTiles, err } if err != nil {
return failedTiles, err
}
defer failedTilesFile.Close() defer failedTilesFile.Close()
tileScanner := bufio.NewScanner(failedTilesFile) tileScanner := bufio.NewScanner(failedTilesFile)
for tileScanner.Scan() { for tileScanner.Scan() {
failedTiles = append(failedTiles, tileScanner.Text()) failedTiles = append(failedTiles, tileScanner.Text())
} }
err = tileScanner.Err() err = tileScanner.Err()
return failedTiles, err return failedTiles, err
} }
# spelunker
`spelunker` is a utility that prints the lustre working directory for a
workflow given the (Archive or Workspaces) ID for the workflow.
## Example
```
./spelunker -id 1235219485
/lustre/aoc/cluster/pipeline/dsoc-dev/workspaces/spool/tmpxxtgst2b
```
# Usage
```
Usage of ./spelunker:
-id int
[Required] The Workspaces or Archive ID for the workflow (default -1)
-profile string
[Optional] The CAPO profile to use, defaults to CAPO_PROFILE env variable
-v Display the version of this utility
```
#!/bin/sh
# pull version from latest gitlab tag and save to file
git describe --tags --abbrev=0 > version.txt
module ssa/spelunker
go 1.18
require (
github.com/PaesslerAg/jsonpath v0.1.0
github.com/lib/pq v1.10.8
gitlab.nrao.edu/ssa/gocapo v0.0.0-20230307183307-91ffd4356566
)
require (
github.com/PaesslerAG/gval v1.0.0 // indirect
github.com/PaesslerAG/jsonpath v0.1.1 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.5 // indirect
github.com/spf13/afero v1.9.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/spf13/viper v1.13.0 // indirect
github.com/stretchr/testify v1.8.1 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/text v0.8.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
This diff is collapsed.
/*
* Copyright (C) 2023 Associated Universities, Inc. Washington DC, USA.
*
* This file is part of NRAO Workspaces.
*
* Workspaces is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Workspaces is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
*/
package main
import (
_ "embed"
"flag"
"log"
"os"
"ssa/spelunker/pkg/db"
"ssa/spelunker/pkg/helpers"
"ssa/spelunker/pkg/properties"
)
//version set executable version based on latest project git tag. Uses go generation and file embedding
////go:generate sh get_version.sh
////go:embed version.txt
var version string
func main() {
// Store argument values in these
var capoProfile string
var wfId int
flag.StringVar(&capoProfile, "profile", os.Getenv("CAPO_PROFILE"), "[Optional] The CAPO profile to use, defaults to CAPO_PROFILE env variable")
flag.IntVar(&wfId, "id", -1, "[Required] The Workspaces or Archive ID for the workflow")
v := flag.Bool("v", false, "Display the version of this utility")
flag.Parse()
if *v {
//TODO: fix version system for containers
log.Println("2.8.2rc1")
os.Exit(0)
}
if wfId < 0 {
log.Print("Must specify a Workspaces or Archive workflow ID!\n")
flag.Usage()
os.Exit(1)
}
if len(capoProfile) <= 0 {
log.Print("There is no CAPO profile value. Specify the profile or ensure the CAPO_PROFILE environment variable is set\n")
flag.Usage()
os.Exit(1)
}
DB, err := db.GetConnection(capoProfile)
helpers.CheckError(err)
var resultsDir string
resultsDir, err = properties.GetArgument(properties.ResultsDir, wfId, DB)
helpers.CheckError(err)
println(resultsDir)
}
/*
* Copyright (C) 2023 Associated Universities, Inc. Washington DC, USA.
*
* This file is part of NRAO Workspaces.
*
* Workspaces is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Workspaces is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
*/
// Package db This file contains the methods and structs to get a connection to the archive
// database
package db
import (
"database/sql"
"fmt"
_ "github.com/lib/pq"
"gitlab.nrao.edu/ssa/gocapo/capo/config"
capohelpers "gitlab.nrao.edu/ssa/gocapo/helpers"
"strings"
)
// dbInfo Contains all necessary data to connect to the database
type dbInfo struct {
host string
port int
user string
password string
dbname string
isSsl bool
}
// getDbLoginFromProperties Retrieve the archive database login details from a CAPO profile
//
// @param capoProfile a string with the CAPO profile to get the details from
// @param capoPath a string with the path to the CAPO profile
func getDbLoginFromProperties(capoProfile string) (dbInfo, error) {
var connectionInfo dbInfo
prop, err := config.InitConfig(capoProfile, capohelpers.DefaultCapoPath)
if err != nil {
return connectionInfo, err
}
// Prefix in the CAPO files for the archive database details
prefix := "metadataDatabase"
settings := prop.SettingsForPrefix(prefix)
connectionInfo.user = settings.GetString("jdbcUsername")
connectionInfo.password = settings.GetString("jdbcPassword")
url := settings.GetString("jdbcUrl")
url = strings.TrimPrefix(url, "jdbc:postgresql://")
splitUrl := strings.Split(url, "/")
connectionInfo.host = splitUrl[0]
// Remove extraneous port on hostname if it exists
connectionInfo.host = strings.Split(connectionInfo.host, ":")[0]
connectionInfo.dbname = splitUrl[1]
connectionInfo.port = 5432
// Currently the archive database doesn't use SSL
connectionInfo.isSsl = false
return connectionInfo, err
}
// GetConnection Establish a connection to the database and return it
//
// @param capoProfile a string with the CAPO profile to get the database from
// @param capoPath a string with the path to the CAPO profile
func GetConnection(capoProfile string) (*sql.DB, error) {
var db *sql.DB
dbInfo, err := getDbLoginFromProperties(capoProfile)
if err != nil {
return db, err
}
sslMode := "disable"
if dbInfo.isSsl {
sslMode = "require"
}
connInfo := fmt.Sprintf(
"host=%s port=%d user=%s password=%s dbname=%s sslmode=%s",
dbInfo.host,
dbInfo.port,
dbInfo.user,
dbInfo.password,
dbInfo.dbname,
sslMode,
)
db, err = sql.Open("postgres", connInfo)
if err != nil {
return db, err
}
err = db.Ping()
if err != nil {
return db, err
}
return db, err
}
/*
* Copyright (C) 2023 Associated Universities, Inc. Washington DC, USA.
*
* This file is part of NRAO Workspaces.
*
* Workspaces is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Workspaces is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
*/
// Package helpers This file contains helpers for handling errors
package helpers
import "log"
// CheckError Check if an error happened and panic on it if so
//
// @param err An error object to report
func CheckError(err error) {
if err != nil {
log.Fatalln(err)
}
}
/*
* Copyright (C) 2023 Associated Universities, Inc. Washington DC, USA.
*
* This file is part of NRAO Workspaces.
*
* Workspaces is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Workspaces is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
*/
// Package properties This file contains methods to retrieve properties from workflows in the
// archive database
package properties
import (
"database/sql"
"encoding/json"
"fmt"
"github.com/PaesslerAg/jsonpath"
"strconv"
)
// Intermediary object to store values from the workflow_requests entry so they
// can be marshalled to JSON
type workflowRequest struct {
WfRequestId int `json:"workflow_request_id"`
WfName string `json:"workflow_name"`
Arguments interface{} `json:"argument"`
State string `json:"state"`
ResultsDir string `json:"results_dir"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
HtcondorJobId string `json:"htcondor_job_id"`
IsCleaned bool `json:"cleaned"`
HtCondorIterations int `json:"htcondor_iterations"`
Controller string `json:"controller"`
}
// Property The type which stores the JSONPath queries for each property
// If we want to be able to get a new property from the database, we simply
// 1. Add a command line flag to retrieve it
// 2. Add an enum value for it
// 3. Add the JSONPath query in the String method
type Property int
const (
ResultsDir Property = 1
)
func (p Property) String() string {
switch p {
case ResultsDir:
return "$.results_dir"
default:
return ""
}
}
// GetArgument Query the workflow arguments for a property with JSONPath
//
// @param prop a Property with the JSONPath query
// @param wfId an int with the Archive or Workspaces workflow ID
// @param db a *sql.DB with a connection to the archive database
// @return A string with the property value or an error
func GetArgument(prop Property, wfId int, db *sql.DB) (string, error) {
var value string
arguments, err := getWfArgs(wfId, db)
if err != nil {
return value, err
}
var valInterface interface{}
valInterface, err = jsonpath.Get(prop.String(), arguments)
if err != nil {
return value, err
}
value = valInterface.(string)
return value, err
}
// getWfArgs Get the workflow request arguments as JSON in an interface so they can be
// queried
//
// @param wfId an int with the workflow ID
// @param db a *sql.DB with a connection to the archive database
// @return the workflow arguments as an interface or an error
func getWfArgs(wfId int, db *sql.DB) (interface{}, error) {
var arguments map[string]interface{}
// Allow querying for either workspaces ID or archive ID
queryString := fmt.Sprintf(
`SELECT * FROM workflow_requests WHERE workflow_request_id=$1 OR argument->>'rh_id'=CAST($1 AS VARCHAR)`,
)
query, err := db.Prepare(queryString)
if err != nil {
return arguments, err
}
row := query.QueryRow(strconv.Itoa(wfId))
var argBytes []byte
var htCJobIdNullable sql.NullInt32 // This field could be null
var reqArgs workflowRequest
err = row.Scan(
&reqArgs.WfRequestId,
&reqArgs.WfName,
&argBytes,
&reqArgs.State,
&reqArgs.ResultsDir,
&reqArgs.CreatedAt,
&reqArgs.UpdatedAt,
&htCJobIdNullable,
&reqArgs.IsCleaned,
&reqArgs.HtCondorIterations,
&reqArgs.Controller,
)
if err != nil {
return arguments, err
}
// Include the nested JSON in the workflow arguments
err = json.Unmarshal(argBytes, &reqArgs.Arguments)
if err != nil {
return arguments, err
}
// Handle null HTCondor Job ID
reqArgs.HtcondorJobId = "null"
if htCJobIdNullable.Valid {
reqArgs.HtcondorJobId = strconv.Itoa(int(htCJobIdNullable.Int32))
}
// Convert the workflowRequest struct to an interface for querying
var reqJson []byte
reqJson, err = json.Marshal(reqArgs)
if err != nil {
return arguments, err
}
err = json.Unmarshal(reqJson, &arguments)
if err != nil {
return arguments, err
}
return arguments, err
}
...@@ -22,6 +22,7 @@ import os ...@@ -22,6 +22,7 @@ import os
import signal import signal
import subprocess import subprocess
import sys import sys
import time
from http.server import HTTPServer, SimpleHTTPRequestHandler from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path from pathlib import Path
from threading import Thread from threading import Thread
...@@ -185,6 +186,15 @@ class CartaLauncher: ...@@ -185,6 +186,15 @@ class CartaLauncher:
f"{file_browser_path!s}", f"{file_browser_path!s}",
] ]
try: try:
# finish setup for CARTA and ensure completion before launch
CartaWrapperLauncher.deploy_wrapper_html(file_browser_path, carta_url, session_timeout_date)
# BIG NASTY COMMENT OF DOOM!!!!!
# DO NOT REMOVE!!
# This is required to ensure that proxy url setup is complete *before* CARTA launches.
# Without this, users are presented with a 404 page on launch as of CARTA3
time.sleep(2)
# start CARTA
CARTA_PROCESS = subprocess.Popen( CARTA_PROCESS = subprocess.Popen(
carta_command, carta_command,
preexec_fn=os.setpgrp(), preexec_fn=os.setpgrp(),
...@@ -196,8 +206,6 @@ class CartaLauncher: ...@@ -196,8 +206,6 @@ class CartaLauncher:
self.teardown() self.teardown()
sys.exit(f"ERROR: Failed to launch CARTA: {err}") sys.exit(f"ERROR: Failed to launch CARTA: {err}")
else: else:
CartaWrapperLauncher.deploy_wrapper_html(file_browser_path, carta_url, session_timeout_date)
# CARTA is running and accessible, so send CARTA URL to AAT system or notify user # CARTA is running and accessible, so send CARTA URL to AAT system or notify user
self.notify_ready(wrapper_url) self.notify_ready(wrapper_url)
......
...@@ -9,7 +9,7 @@ readme = "README.md" ...@@ -9,7 +9,7 @@ readme = "README.md"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = ">=3.10,<3.12" python = ">=3.10,<3.12"
pycapo = "^0.3.1" pycapo = "^0.3.1"
bs4 = "^0.0.1" beautifulsoup4 = "^4.12.2"
lxml = "^4.9.2" lxml = "^4.9.2"
prettierfier = "^1.0.3" prettierfier = "^1.0.3"
......
...@@ -160,7 +160,7 @@ def send_aat_image_message(parameters: dict, ingest_type: Union[IngestType, VLAS ...@@ -160,7 +160,7 @@ def send_aat_image_message(parameters: dict, ingest_type: Union[IngestType, VLAS
if response.status_code != http.HTTPStatus.OK: if response.status_code != http.HTTPStatus.OK:
logger.warning( logger.warning(
f"WARNING: Failed to send image ingestion complete to archive for image associated with" f"WARNING: Failed to send image ingestion complete to archive for image associated with "
f"calibration {parameters['calSpl']} and project {parameters['project']}." f"calibration {parameters['calSpl']} and project {parameters['project']}."
f" Please set off the index manually!" f" Please set off the index manually!"
) )
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>. # along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
""" Helper for building image ingestion manifest """ """ Helper for building image ingestion manifest """
import logging import logging
from pathlib import Path from pathlib import Path
...@@ -73,9 +74,7 @@ class ImageIngestionProductsFinder: ...@@ -73,9 +74,7 @@ class ImageIngestionProductsFinder:
sp_image_files = [ sp_image_files = [
file file
for file in self.files_found for file in self.files_found
if file.name.endswith(FITS) if file.name.endswith(FITS) and PBCOR in file.name and TT0 in file.name
and PBCOR in file.name
and TT0 in file.name
] ]
self.logger.info(f"Science Product(s) to ingest: {sp_image_files}") self.logger.info(f"Science Product(s) to ingest: {sp_image_files}")
image_files = [ image_files = [
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>. # along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
""" """
Our custom errors. Our custom errors.
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
# #
# You should have received a copy of the GNU General Public License # You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>. # along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
""" """
Fetch plans exist to manage running each of a group of FileFetchers. At the moment, Fetch plans exist to manage running each of a group of FileFetchers. At the moment,
we have identified only two kinds of fetch plan: we have identified only two kinds of fetch plan:
......
...@@ -9,10 +9,9 @@ readme = "README.md" ...@@ -9,10 +9,9 @@ readme = "README.md"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.10" python = "^3.10"
pendulum = "2.1.2" pendulum = "2.1.2"
messaging = {path = "../../../../../shared/messaging"}
schema = {path = "../../../../../shared/schema"}
workspaces = {path = "../../../../../shared/workspaces"}
aenum = "^3.1.12" aenum = "^3.1.12"
psycopg2-binary = "^2.9.6"
sqlalchemy = "1.4.47"
[tool.poetry.group.test.dependencies] [tool.poetry.group.test.dependencies]
pytest = "^7.3.1" pytest = "^7.3.1"
......
def test_always_passes():
assert True
...@@ -21,10 +21,7 @@ Testing suite for ws_metrics ...@@ -21,10 +21,7 @@ Testing suite for ws_metrics
import argparse import argparse
import logging import logging
import sys import sys
from unittest.mock import patch
import pytest
from ws_metrics.deep_thought import LifeUniverseEverything
logger = logging.getLogger("test_ws_metrics") logger = logging.getLogger("test_ws_metrics")
logger.setLevel(logging.INFO) logger.setLevel(logging.INFO)
...@@ -42,46 +39,47 @@ result_rd = 10 ...@@ -42,46 +39,47 @@ result_rd = 10
args_d = argparse.Namespace(requestdatasize=None, datasize=["2021-03-30T00:00:00", "2021-03-30T23:59:59"]) args_d = argparse.Namespace(requestdatasize=None, datasize=["2021-03-30T00:00:00", "2021-03-30T23:59:59"])
result_d = 100 result_d = 100
# they're all broken anyway
def mock_leu(args: argparse.Namespace) -> LifeUniverseEverything: # def mock_leu(args: argparse.Namespace) -> LifeUniverseEverything:
with patch("psycopg2.connect") as mock_connect: # with patch("psycopg2.connect") as mock_connect:
return LifeUniverseEverything(connection=mock_connect, args=args) # return LifeUniverseEverything(connection=mock_connect, args=args)
mock_leu_b = mock_leu(args_b) # mock_leu_b = mock_leu(args_b)
mock_leu_c = mock_leu(args_c) # mock_leu_c = mock_leu(args_c)
mock_leu_rd = mock_leu(args_rd) # mock_leu_rd = mock_leu(args_rd)
mock_leu_d = mock_leu(args_d) # mock_leu_d = mock_leu(args_d)
class TestWSMetrics: # class TestWSMetrics:
@pytest.mark.skip("broken by rewrite") # @pytest.mark.skip("broken by rewrite")
def test_get_total_cap_executions(self): # def test_get_total_cap_executions(self):
mock_leu_c.conn.cursor.return_value.fetchone.return_value = result_c # mock_leu_c.conn.cursor.return_value.fetchone.return_value = result_c
assert args_c.capability == "null" # assert args_c.capability == "null"
value = mock_leu_c.get_total_cap_executions(args_c.capability) # value = mock_leu_c.get_total_cap_executions(args_c.capability)
assert value == result_c # assert value == result_c
#
@pytest.mark.skip("broken by rewrite") # @pytest.mark.skip("broken by rewrite")
def test_get_total_executions_in_range(self): # def test_get_total_executions_in_range(self):
mock_leu_b.conn.cursor.return_value.fetchone.return_value = result_b # mock_leu_b.conn.cursor.return_value.fetchone.return_value = result_b
assert args_b.between[0] == "null" # assert args_b.between[0] == "null"
assert args_b.between[1] == "2021-03-30T00:00:00" # assert args_b.between[1] == "2021-03-30T00:00:00"
assert args_b.between[2] == "2021-03-30T23:59:59" # assert args_b.between[2] == "2021-03-30T23:59:59"
value = mock_leu_b.get_total_executions_in_range(args_b.between[0], args_b.between[1], args_b.between[2]) # value = mock_leu_b.get_total_executions_in_range(args_b.between[0], args_b.between[1], args_b.between[2])
assert value == result_b # assert value == result_b
#
@pytest.mark.skip("broken by rewrite") # @pytest.mark.skip("broken by rewrite")
def test_get_total_data_volume(self): # def test_get_total_data_volume(self):
mock_leu_d.conn.cursor.return_value.fetchone.return_value = result_d # mock_leu_d.conn.cursor.return_value.fetchone.return_value = result_d
assert args_d.datasize[0] == "2021-03-30T00:00:00" # assert args_d.datasize[0] == "2021-03-30T00:00:00"
assert args_d.datasize[1] == "2021-03-30T23:59:59" # assert args_d.datasize[1] == "2021-03-30T23:59:59"
value = mock_leu_d.get_total_data_volume(args_d.datasize[0], args_d.datasize[1]) # value = mock_leu_d.get_total_data_volume(args_d.datasize[0], args_d.datasize[1])
assert value == result_d # assert value == result_d
#
@pytest.mark.skip("broken by rewrite") # @pytest.mark.skip("broken by rewrite")
def test_get_request_data_volume(self): # def test_get_request_data_volume(self):
mock_leu_rd.conn.cursor.return_value.fetchone.return_value = result_rd # mock_leu_rd.conn.cursor.return_value.fetchone.return_value = result_rd
assert args_rd.requestdatasize[0] == "1" # assert args_rd.requestdatasize[0] == "1"
value = mock_leu_rd.get_request_data_volume("1") # value = mock_leu_rd.get_request_data_volume("1")
assert value == result_rd # assert value == result_rd