Skip to content
Snippets Groups Projects
Commit 04bce7eb authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

ensure slice struct fields are initialized

parent 4462e1d8
No related branches found
No related tags found
2 merge requests!1390Catch up with Main,!1356ensure slice struct fields are initialized
Pipeline #9909 passed
......@@ -20,141 +20,143 @@
package pims
import (
"bufio"
"database/sql"
"encoding/json"
"fmt"
"os"
"path/filepath"
"ssa/pims_analyzer/pkg/db"
"strings"
"bufio"
"database/sql"
"encoding/json"
"fmt"
"os"
"path/filepath"
"ssa/pims_analyzer/pkg/db"
"strings"
)
// Stores PIMS info requested by the DAs for PIMS notification emails
type PimsInfo struct {
// the calibration requested
Calibration string `json:"calibration"`
// The number of splits in the job
NumSplits int `json:"num_splits"`
// Path to restored MS
RestoredMSPath string `json:"restore_path"`
// Path to CASA used
CasaVersion string `json:"casa_path"`
// lustre processing area
WorkingDir string `json:"lustre_dir"`
// output cache directory
CacheDir string `json:"cache_dir"`
// number of SE and CC products per tile
NumProducts []TileProductCounts `json:"num_products"`
// used to determine workflow status header message
StatusMsg string `json:"status"`
// contains the failed splits in `tile.phasecenter` format
FailedSplits []string `json:"failed_splits"`
// Num failed splits (needs to be in JSON to get passed to email template)
NumFailedSplits int `json:"num_failed_splits"`
// the calibration requested
Calibration string `json:"calibration"`
// The number of splits in the job
NumSplits int `json:"num_splits"`
// Path to restored MS
RestoredMSPath string `json:"restore_path"`
// Path to CASA used
CasaVersion string `json:"casa_path"`
// lustre processing area
WorkingDir string `json:"lustre_dir"`
// output cache directory
CacheDir string `json:"cache_dir"`
// number of SE and CC products per tile
NumProducts []TileProductCounts `json:"num_products"`
// used to determine workflow status header message
StatusMsg string `json:"status"`
// contains the failed splits in `tile.phasecenter` format
FailedSplits []string `json:"failed_splits"`
// Num failed splits (needs to be in JSON to get passed to email template)
NumFailedSplits int `json:"num_failed_splits"`
}
// Contains all the relevant columns of the `pims_split` entry in the archive
// database
type workflowEntry struct {
arguments []byte
resultsDir string
arguments []byte
resultsDir string
}
// Populates a PimsInfo struct with data from a given workflow identified by the
// GetPimsInfo Populates a PimsInfo struct with data from a given workflow identified by the
// workflow ID
//
// @param workflowId the ID of the PIMS workflow
// @param capoPath the path to the capo property file
// @param capProfile the CAPO profile used during the workflow execution
func GetPimsInfo(workflowId int, capoPath string, capoProfile string) PimsInfo {
var pimsInfo PimsInfo
// We need connections to both the VLASS and Archive databases for everything
var vlassInfo db.DbInfo
var archiveInfo db.DbInfo
// VLASS profile is the same as the dsoc profile with `vlass.`
vlassInfo.Profile = strings.Replace(capoProfile, "dsoc-", "vlass.", -1)
archiveInfo.Profile = capoProfile
vlassInfo.CapoPath, archiveInfo.CapoPath = capoPath, capoPath
// Vlass and Archive have different prefixes
vlassInfo.Prefix = "edu.nrao.vlass.config.VlassMngrSettings"
archiveInfo.Prefix = "metadataDatabase"
vlassDb := db.GetConnection(vlassInfo)
archiveDb := db.GetConnection(archiveInfo)
defer vlassDb.Close()
defer archiveDb.Close()
wfEntry, err := getWorkflowColumns(workflowId, archiveDb)
db.CheckError(err)
// Populate PimsInfo with data from archive database
var jsonArgs map[string]interface{}
err = json.Unmarshal(wfEntry.arguments, &jsonArgs)
db.CheckError(err)
pimsInfo.FailedSplits, err = readFailedSplits()
db.CheckError(err)
pimsInfo.NumFailedSplits = len(pimsInfo.FailedSplits)
pimsInfo.Calibration = jsonArgs["vlass_product"].(string)
pimsInfo.CasaVersion = jsonArgs["casaHome"].(string)
pimsInfo.WorkingDir = wfEntry.resultsDir
pimsInfo.CacheDir = fmt.Sprintf("/lustre/aoc/cluster/pipeline/%s/cache/pims/%s",
strings.Replace(capoProfile, "dsoc-", "vlass_", -1),
pimsInfo.Calibration,
)
// Workflow reported as failed when there are failed splits
status := "succeeded"
if (pimsInfo.NumFailedSplits != 0) { status = "failed" }
pimsInfo.StatusMsg = fmt.Sprintf("PIMS Split workflow %s!",
status,
)
// Check if there's a Restored MS Path
if jsonArgs["existing_restore"] != nil {
pimsInfo.RestoredMSPath = jsonArgs["existing_restore"].(string)
} else {
// Check if a restore is present in the working directory
// Should be in working/ in the lustre directory
var restoreFiles []string
restoreFiles, err = filepath.Glob(pimsInfo.WorkingDir + "/working/*.ms")
if restoreFiles == nil {
pimsInfo.RestoredMSPath = "Restore failed!"
} else {
pimsInfo.RestoredMSPath = restoreFiles[0]
}
}
// Add Splits
split_list_raw := jsonArgs["splits"].([]interface {})
split_list := make([]string, len(split_list_raw))
for i, v := range jsonArgs["splits"].([]interface {}) {
split_list[i] = fmt.Sprint(v)
}
pimsInfo.NumSplits = len(split_list)
// Get unique tile values from `tile/phasecenter` values
tiles := make(map[string]bool)
for _, v := range split_list {
tile := strings.Split(v,"/")[0]
tiles[tile] = true
}
pimsInfo.NumProducts, err = getNumProductsPerTile(tiles, vlassDb)
db.CheckError(err)
return pimsInfo
var pimsInfo PimsInfo
// We need connections to both the VLASS and Archive databases for everything
var vlassInfo db.DbInfo
var archiveInfo db.DbInfo
// VLASS profile is the same as the dsoc profile with `vlass.`
vlassInfo.Profile = strings.Replace(capoProfile, "dsoc-", "vlass.", -1)
archiveInfo.Profile = capoProfile
vlassInfo.CapoPath, archiveInfo.CapoPath = capoPath, capoPath
// Vlass and Archive have different prefixes
vlassInfo.Prefix = "edu.nrao.vlass.config.VlassMngrSettings"
archiveInfo.Prefix = "metadataDatabase"
vlassDb := db.GetConnection(vlassInfo)
archiveDb := db.GetConnection(archiveInfo)
defer vlassDb.Close()
defer archiveDb.Close()
wfEntry, err := getWorkflowColumns(workflowId, archiveDb)
db.CheckError(err)
// Populate PimsInfo with data from archive database
var jsonArgs map[string]interface{}
err = json.Unmarshal(wfEntry.arguments, &jsonArgs)
db.CheckError(err)
pimsInfo.FailedSplits, err = readFailedSplits()
db.CheckError(err)
pimsInfo.NumFailedSplits = len(pimsInfo.FailedSplits)
pimsInfo.Calibration = jsonArgs["vlass_product"].(string)
pimsInfo.CasaVersion = jsonArgs["casaHome"].(string)
pimsInfo.WorkingDir = wfEntry.resultsDir
pimsInfo.CacheDir = fmt.Sprintf("/lustre/aoc/cluster/pipeline/%s/cache/pims/%s",
strings.Replace(capoProfile, "dsoc-", "vlass_", -1),
pimsInfo.Calibration,
)
// Workflow reported as failed when there are failed splits
status := "succeeded"
if pimsInfo.NumFailedSplits != 0 {
status = "failed"
}
pimsInfo.StatusMsg = fmt.Sprintf("PIMS Split workflow %s!",
status,
)
// Check if there's a Restored MS Path
if jsonArgs["existing_restore"] != nil {
pimsInfo.RestoredMSPath = jsonArgs["existing_restore"].(string)
} else {
// Check if a restore is present in the working directory
// Should be in working/ in the lustre directory
var restoreFiles []string
restoreFiles, err = filepath.Glob(pimsInfo.WorkingDir + "/working/*.ms")
if restoreFiles == nil {
pimsInfo.RestoredMSPath = "Restore failed!"
} else {
pimsInfo.RestoredMSPath = restoreFiles[0]
}
}
// Add Splits
split_list_raw := jsonArgs["splits"].([]interface{})
split_list := make([]string, len(split_list_raw))
for i, v := range jsonArgs["splits"].([]interface{}) {
split_list[i] = fmt.Sprint(v)
}
pimsInfo.NumSplits = len(split_list)
// Get unique tile values from `tile/phasecenter` values
tiles := make(map[string]bool)
for _, v := range split_list {
tile := strings.Split(v, "/")[0]
tiles[tile] = true
}
pimsInfo.NumProducts, err = getNumProductsPerTile(tiles, vlassDb)
db.CheckError(err)
return pimsInfo
}
// Store the relevant columns for the workflow entry in the archive database in
......@@ -164,38 +166,37 @@ func GetPimsInfo(workflowId int, capoPath string, capoProfile string) PimsInfo {
// @param A connection to the database
// @return A workflowEntry struct with the column data or an error
func getWorkflowColumns(workflowId int, dbConnection *sql.DB) (workflowEntry, error) {
workflowTable := "workflow_requests"
workflowIdKey := "workflow_request_id"
dataCols := "argument,results_dir"
var entry workflowEntry
// var tmpArgs string // Must get JSON as string first
var err error
// Get relevant columns from the entry in the database
// Get the database rows where metadata is extracted from
err = dbConnection.QueryRow(
fmt.Sprintf("SELECT %s FROM %s WHERE %s='%d'",
dataCols,
workflowTable,
workflowIdKey,
workflowId),
).Scan(&entry.arguments, &entry.resultsDir)
return entry, err
workflowTable := "workflow_requests"
workflowIdKey := "workflow_request_id"
dataCols := "argument,results_dir"
var entry workflowEntry
// var tmpArgs string // Must get JSON as string first
var err error
// Get relevant columns from the entry in the database
// Get the database rows where metadata is extracted from
err = dbConnection.QueryRow(
fmt.Sprintf("SELECT %s FROM %s WHERE %s='%d'",
dataCols,
workflowTable,
workflowIdKey,
workflowId),
).Scan(&entry.arguments, &entry.resultsDir)
return entry, err
}
// Stores the number of SEs and CCs in the database for a tile
type TileProductCounts struct {
// Name of the Tile
Tile string `json:"tile_name"`
// Number of coarse cubes associated
NumCoarseCubes int `json:"num_coarse_cube"`
// Number of continuum images associated
NumContinuum int `json:"num_continuum"`
// Name of the Tile
Tile string `json:"tile_name"`
// Number of coarse cubes associated
NumCoarseCubes int `json:"num_coarse_cube"`
// Number of continuum images associated
NumContinuum int `json:"num_continuum"`
}
// Populates an array of tileProductCounts structs with CC and SE counts for a
// list of tiles
//
......@@ -205,23 +206,25 @@ type TileProductCounts struct {
// @return An array of tileProductCounts with the counts and any error
// encountered
func getNumProductsPerTile(tiles map[string]bool, dbConnection *sql.DB) ([]TileProductCounts, error) {
var productCounts []TileProductCounts
var err error
// Only do this if there are tiles to check products for
if len(tiles) <= 0 { return productCounts, err }
// Get tiles as string in `(tile1, tile2, etc.)` format to fit the query
// syntax
inTilesQuery := "("
for key := range tiles {
inTilesQuery += fmt.Sprintf("'%s', ", key)
}
// Remove trailing ,
inTilesQuery = strings.TrimRight(inTilesQuery, ", ")
inTilesQuery += ")"
query := fmt.Sprintf(`select * from crosstab($$
productCounts := make([]TileProductCounts, 0)
var err error
// Only do this if there are tiles to check products for
if len(tiles) <= 0 {
return productCounts, err
}
// Get tiles as string in `(tile1, tile2, etc.)` format to fit the query
// syntax
inTilesQuery := "("
for key := range tiles {
inTilesQuery += fmt.Sprintf("'%s', ", key)
}
// Remove trailing ,
inTilesQuery = strings.TrimRight(inTilesQuery, ", ")
inTilesQuery += ")"
query := fmt.Sprintf(`select * from crosstab($$
select m.name as minitile, pt.name as product_type, count(*) as count
from product
join product_type pt on product.product_type_id = pt.id
......@@ -233,40 +236,46 @@ group by m.name, pt.name
order by minitile, product_type;
$$, $$VALUES ('se_coarse_cube_image'),('se_continuum_imaging')$$) as
ct(minitile varchar, se_coarse_cube_image varchar, se_continuum_imaging varchar);`,
inTilesQuery)
var rows *sql.Rows
rows, err = dbConnection.Query(query)
if err != nil { return productCounts, err }
defer rows.Close()
for rows.Next() {
var count TileProductCounts
// Needed to deal with potential null values
var nullableCoarseCount, nullableContinuumCount sql.NullInt32
if err = rows.Scan(
&count.Tile,
&nullableCoarseCount,
&nullableContinuumCount) ; err != nil {
return productCounts, err
}
// Make sure null counts from DB handled
if nullableCoarseCount.Valid {
count.NumCoarseCubes = int(nullableCoarseCount.Int32)
} else { count.NumCoarseCubes = 0 }
if nullableContinuumCount.Valid {
count.NumContinuum = int(nullableContinuumCount.Int32)
} else { count.NumContinuum = 0 }
productCounts = append(productCounts, count)
}
return productCounts, err
inTilesQuery)
var rows *sql.Rows
rows, err = dbConnection.Query(query)
if err != nil {
return productCounts, err
}
defer rows.Close()
for rows.Next() {
var count TileProductCounts
// Needed to deal with potential null values
var nullableCoarseCount, nullableContinuumCount sql.NullInt32
if err = rows.Scan(
&count.Tile,
&nullableCoarseCount,
&nullableContinuumCount); err != nil {
return productCounts, err
}
// Make sure null counts from DB handled
if nullableCoarseCount.Valid {
count.NumCoarseCubes = int(nullableCoarseCount.Int32)
} else {
count.NumCoarseCubes = 0
}
if nullableContinuumCount.Valid {
count.NumContinuum = int(nullableContinuumCount.Int32)
} else {
count.NumContinuum = 0
}
productCounts = append(productCounts, count)
}
return productCounts, err
}
// Whenever a `pimscache` call returns non-0 in the `pims_split` workflow, the
......@@ -277,20 +286,22 @@ $$, $$VALUES ('se_coarse_cube_image'),('se_continuum_imaging')$$) as
// @return a string array with the split entries and any error that was
// encountered
func readFailedSplits() ([]string, error) {
failedTilesFilename := "failed_splits.txt"
var failedTiles []string
failedTilesFilename := "failed_splits.txt"
failedTiles := make([]string, 0)
failedTilesFile, err := os.Open(failedTilesFilename)
if err != nil { return failedTiles, err }
failedTilesFile, err := os.Open(failedTilesFilename)
if err != nil {
return failedTiles, err
}
defer failedTilesFile.Close()
defer failedTilesFile.Close()
tileScanner := bufio.NewScanner(failedTilesFile)
tileScanner := bufio.NewScanner(failedTilesFile)
for tileScanner.Scan() {
failedTiles = append(failedTiles, tileScanner.Text())
}
err = tileScanner.Err()
for tileScanner.Scan() {
failedTiles = append(failedTiles, tileScanner.Text())
}
err = tileScanner.Err()
return failedTiles, err
return failedTiles, err
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment