diff --git a/apps/cli/executables/go/pims_analyzer/pkg/pims/get_data.go b/apps/cli/executables/go/pims_analyzer/pkg/pims/get_data.go index 74323befc0b1c0957ddc5d45103773fa494a4ab9..92f03f39589817bbb8a086e8e115363baa70a66e 100644 --- a/apps/cli/executables/go/pims_analyzer/pkg/pims/get_data.go +++ b/apps/cli/executables/go/pims_analyzer/pkg/pims/get_data.go @@ -20,141 +20,143 @@ package pims import ( - "bufio" - "database/sql" - "encoding/json" - "fmt" - "os" - "path/filepath" - "ssa/pims_analyzer/pkg/db" - "strings" + "bufio" + "database/sql" + "encoding/json" + "fmt" + "os" + "path/filepath" + "ssa/pims_analyzer/pkg/db" + "strings" ) // Stores PIMS info requested by the DAs for PIMS notification emails type PimsInfo struct { - // the calibration requested - Calibration string `json:"calibration"` - // The number of splits in the job - NumSplits int `json:"num_splits"` - // Path to restored MS - RestoredMSPath string `json:"restore_path"` - // Path to CASA used - CasaVersion string `json:"casa_path"` - // lustre processing area - WorkingDir string `json:"lustre_dir"` - // output cache directory - CacheDir string `json:"cache_dir"` - // number of SE and CC products per tile - NumProducts []TileProductCounts `json:"num_products"` - // used to determine workflow status header message - StatusMsg string `json:"status"` - // contains the failed splits in `tile.phasecenter` format - FailedSplits []string `json:"failed_splits"` - // Num failed splits (needs to be in JSON to get passed to email template) - NumFailedSplits int `json:"num_failed_splits"` + // the calibration requested + Calibration string `json:"calibration"` + // The number of splits in the job + NumSplits int `json:"num_splits"` + // Path to restored MS + RestoredMSPath string `json:"restore_path"` + // Path to CASA used + CasaVersion string `json:"casa_path"` + // lustre processing area + WorkingDir string `json:"lustre_dir"` + // output cache directory + CacheDir string `json:"cache_dir"` + // number of SE and CC products per tile + NumProducts []TileProductCounts `json:"num_products"` + // used to determine workflow status header message + StatusMsg string `json:"status"` + // contains the failed splits in `tile.phasecenter` format + FailedSplits []string `json:"failed_splits"` + // Num failed splits (needs to be in JSON to get passed to email template) + NumFailedSplits int `json:"num_failed_splits"` } // Contains all the relevant columns of the `pims_split` entry in the archive // database type workflowEntry struct { - arguments []byte - resultsDir string + arguments []byte + resultsDir string } -// Populates a PimsInfo struct with data from a given workflow identified by the +// GetPimsInfo Populates a PimsInfo struct with data from a given workflow identified by the // workflow ID // // @param workflowId the ID of the PIMS workflow // @param capoPath the path to the capo property file // @param capProfile the CAPO profile used during the workflow execution func GetPimsInfo(workflowId int, capoPath string, capoProfile string) PimsInfo { - var pimsInfo PimsInfo - - // We need connections to both the VLASS and Archive databases for everything - var vlassInfo db.DbInfo - var archiveInfo db.DbInfo - - // VLASS profile is the same as the dsoc profile with `vlass.` - vlassInfo.Profile = strings.Replace(capoProfile, "dsoc-", "vlass.", -1) - archiveInfo.Profile = capoProfile - vlassInfo.CapoPath, archiveInfo.CapoPath = capoPath, capoPath - - // Vlass and Archive have different prefixes - vlassInfo.Prefix = "edu.nrao.vlass.config.VlassMngrSettings" - archiveInfo.Prefix = "metadataDatabase" - - vlassDb := db.GetConnection(vlassInfo) - archiveDb := db.GetConnection(archiveInfo) - - defer vlassDb.Close() - defer archiveDb.Close() - - wfEntry, err := getWorkflowColumns(workflowId, archiveDb) - - db.CheckError(err) - - // Populate PimsInfo with data from archive database - var jsonArgs map[string]interface{} - err = json.Unmarshal(wfEntry.arguments, &jsonArgs) - db.CheckError(err) - - pimsInfo.FailedSplits, err = readFailedSplits() - db.CheckError(err) - - pimsInfo.NumFailedSplits = len(pimsInfo.FailedSplits) - - pimsInfo.Calibration = jsonArgs["vlass_product"].(string) - pimsInfo.CasaVersion = jsonArgs["casaHome"].(string) - pimsInfo.WorkingDir = wfEntry.resultsDir - pimsInfo.CacheDir = fmt.Sprintf("/lustre/aoc/cluster/pipeline/%s/cache/pims/%s", - strings.Replace(capoProfile, "dsoc-", "vlass_", -1), - pimsInfo.Calibration, - ) - - // Workflow reported as failed when there are failed splits - status := "succeeded" - if (pimsInfo.NumFailedSplits != 0) { status = "failed" } - pimsInfo.StatusMsg = fmt.Sprintf("PIMS Split workflow %s!", - status, - ) - - // Check if there's a Restored MS Path - if jsonArgs["existing_restore"] != nil { - pimsInfo.RestoredMSPath = jsonArgs["existing_restore"].(string) - } else { - // Check if a restore is present in the working directory - // Should be in working/ in the lustre directory - var restoreFiles []string - restoreFiles, err = filepath.Glob(pimsInfo.WorkingDir + "/working/*.ms") - - if restoreFiles == nil { - pimsInfo.RestoredMSPath = "Restore failed!" - } else { - pimsInfo.RestoredMSPath = restoreFiles[0] - } - } - - // Add Splits - split_list_raw := jsonArgs["splits"].([]interface {}) - split_list := make([]string, len(split_list_raw)) - - for i, v := range jsonArgs["splits"].([]interface {}) { - split_list[i] = fmt.Sprint(v) - } - - pimsInfo.NumSplits = len(split_list) - - // Get unique tile values from `tile/phasecenter` values - tiles := make(map[string]bool) - for _, v := range split_list { - tile := strings.Split(v,"/")[0] - tiles[tile] = true - } - - pimsInfo.NumProducts, err = getNumProductsPerTile(tiles, vlassDb) - db.CheckError(err) - - return pimsInfo + var pimsInfo PimsInfo + + // We need connections to both the VLASS and Archive databases for everything + var vlassInfo db.DbInfo + var archiveInfo db.DbInfo + + // VLASS profile is the same as the dsoc profile with `vlass.` + vlassInfo.Profile = strings.Replace(capoProfile, "dsoc-", "vlass.", -1) + archiveInfo.Profile = capoProfile + vlassInfo.CapoPath, archiveInfo.CapoPath = capoPath, capoPath + + // Vlass and Archive have different prefixes + vlassInfo.Prefix = "edu.nrao.vlass.config.VlassMngrSettings" + archiveInfo.Prefix = "metadataDatabase" + + vlassDb := db.GetConnection(vlassInfo) + archiveDb := db.GetConnection(archiveInfo) + + defer vlassDb.Close() + defer archiveDb.Close() + + wfEntry, err := getWorkflowColumns(workflowId, archiveDb) + + db.CheckError(err) + + // Populate PimsInfo with data from archive database + var jsonArgs map[string]interface{} + err = json.Unmarshal(wfEntry.arguments, &jsonArgs) + db.CheckError(err) + + pimsInfo.FailedSplits, err = readFailedSplits() + db.CheckError(err) + + pimsInfo.NumFailedSplits = len(pimsInfo.FailedSplits) + + pimsInfo.Calibration = jsonArgs["vlass_product"].(string) + pimsInfo.CasaVersion = jsonArgs["casaHome"].(string) + pimsInfo.WorkingDir = wfEntry.resultsDir + pimsInfo.CacheDir = fmt.Sprintf("/lustre/aoc/cluster/pipeline/%s/cache/pims/%s", + strings.Replace(capoProfile, "dsoc-", "vlass_", -1), + pimsInfo.Calibration, + ) + + // Workflow reported as failed when there are failed splits + status := "succeeded" + if pimsInfo.NumFailedSplits != 0 { + status = "failed" + } + pimsInfo.StatusMsg = fmt.Sprintf("PIMS Split workflow %s!", + status, + ) + + // Check if there's a Restored MS Path + if jsonArgs["existing_restore"] != nil { + pimsInfo.RestoredMSPath = jsonArgs["existing_restore"].(string) + } else { + // Check if a restore is present in the working directory + // Should be in working/ in the lustre directory + var restoreFiles []string + restoreFiles, err = filepath.Glob(pimsInfo.WorkingDir + "/working/*.ms") + + if restoreFiles == nil { + pimsInfo.RestoredMSPath = "Restore failed!" + } else { + pimsInfo.RestoredMSPath = restoreFiles[0] + } + } + + // Add Splits + split_list_raw := jsonArgs["splits"].([]interface{}) + split_list := make([]string, len(split_list_raw)) + + for i, v := range jsonArgs["splits"].([]interface{}) { + split_list[i] = fmt.Sprint(v) + } + + pimsInfo.NumSplits = len(split_list) + + // Get unique tile values from `tile/phasecenter` values + tiles := make(map[string]bool) + for _, v := range split_list { + tile := strings.Split(v, "/")[0] + tiles[tile] = true + } + + pimsInfo.NumProducts, err = getNumProductsPerTile(tiles, vlassDb) + db.CheckError(err) + + return pimsInfo } // Store the relevant columns for the workflow entry in the archive database in @@ -164,38 +166,37 @@ func GetPimsInfo(workflowId int, capoPath string, capoProfile string) PimsInfo { // @param A connection to the database // @return A workflowEntry struct with the column data or an error func getWorkflowColumns(workflowId int, dbConnection *sql.DB) (workflowEntry, error) { - workflowTable := "workflow_requests" - workflowIdKey := "workflow_request_id" - dataCols := "argument,results_dir" - - var entry workflowEntry - // var tmpArgs string // Must get JSON as string first - var err error - - // Get relevant columns from the entry in the database - // Get the database rows where metadata is extracted from - err = dbConnection.QueryRow( - fmt.Sprintf("SELECT %s FROM %s WHERE %s='%d'", - dataCols, - workflowTable, - workflowIdKey, - workflowId), - ).Scan(&entry.arguments, &entry.resultsDir) - - return entry, err + workflowTable := "workflow_requests" + workflowIdKey := "workflow_request_id" + dataCols := "argument,results_dir" + + var entry workflowEntry + // var tmpArgs string // Must get JSON as string first + var err error + + // Get relevant columns from the entry in the database + // Get the database rows where metadata is extracted from + err = dbConnection.QueryRow( + fmt.Sprintf("SELECT %s FROM %s WHERE %s='%d'", + dataCols, + workflowTable, + workflowIdKey, + workflowId), + ).Scan(&entry.arguments, &entry.resultsDir) + + return entry, err } // Stores the number of SEs and CCs in the database for a tile type TileProductCounts struct { - // Name of the Tile - Tile string `json:"tile_name"` - // Number of coarse cubes associated - NumCoarseCubes int `json:"num_coarse_cube"` - // Number of continuum images associated - NumContinuum int `json:"num_continuum"` + // Name of the Tile + Tile string `json:"tile_name"` + // Number of coarse cubes associated + NumCoarseCubes int `json:"num_coarse_cube"` + // Number of continuum images associated + NumContinuum int `json:"num_continuum"` } - // Populates an array of tileProductCounts structs with CC and SE counts for a // list of tiles // @@ -205,23 +206,25 @@ type TileProductCounts struct { // @return An array of tileProductCounts with the counts and any error // encountered func getNumProductsPerTile(tiles map[string]bool, dbConnection *sql.DB) ([]TileProductCounts, error) { - var productCounts []TileProductCounts - var err error - - // Only do this if there are tiles to check products for - if len(tiles) <= 0 { return productCounts, err } - - // Get tiles as string in `(tile1, tile2, etc.)` format to fit the query - // syntax - inTilesQuery := "(" - for key := range tiles { - inTilesQuery += fmt.Sprintf("'%s', ", key) - } - // Remove trailing , - inTilesQuery = strings.TrimRight(inTilesQuery, ", ") - inTilesQuery += ")" - - query := fmt.Sprintf(`select * from crosstab($$ + productCounts := make([]TileProductCounts, 0) + var err error + + // Only do this if there are tiles to check products for + if len(tiles) <= 0 { + return productCounts, err + } + + // Get tiles as string in `(tile1, tile2, etc.)` format to fit the query + // syntax + inTilesQuery := "(" + for key := range tiles { + inTilesQuery += fmt.Sprintf("'%s', ", key) + } + // Remove trailing , + inTilesQuery = strings.TrimRight(inTilesQuery, ", ") + inTilesQuery += ")" + + query := fmt.Sprintf(`select * from crosstab($$ select m.name as minitile, pt.name as product_type, count(*) as count from product join product_type pt on product.product_type_id = pt.id @@ -233,40 +236,46 @@ group by m.name, pt.name order by minitile, product_type; $$, $$VALUES ('se_coarse_cube_image'),('se_continuum_imaging')$$) as ct(minitile varchar, se_coarse_cube_image varchar, se_continuum_imaging varchar);`, - inTilesQuery) - - var rows *sql.Rows - rows, err = dbConnection.Query(query) - if err != nil { return productCounts, err } - - defer rows.Close() - - for rows.Next() { - var count TileProductCounts - - // Needed to deal with potential null values - var nullableCoarseCount, nullableContinuumCount sql.NullInt32 - - if err = rows.Scan( - &count.Tile, - &nullableCoarseCount, - &nullableContinuumCount) ; err != nil { - return productCounts, err - } - - // Make sure null counts from DB handled - if nullableCoarseCount.Valid { - count.NumCoarseCubes = int(nullableCoarseCount.Int32) - } else { count.NumCoarseCubes = 0 } - - if nullableContinuumCount.Valid { - count.NumContinuum = int(nullableContinuumCount.Int32) - } else { count.NumContinuum = 0 } - - productCounts = append(productCounts, count) - } - - return productCounts, err + inTilesQuery) + + var rows *sql.Rows + rows, err = dbConnection.Query(query) + if err != nil { + return productCounts, err + } + + defer rows.Close() + + for rows.Next() { + var count TileProductCounts + + // Needed to deal with potential null values + var nullableCoarseCount, nullableContinuumCount sql.NullInt32 + + if err = rows.Scan( + &count.Tile, + &nullableCoarseCount, + &nullableContinuumCount); err != nil { + return productCounts, err + } + + // Make sure null counts from DB handled + if nullableCoarseCount.Valid { + count.NumCoarseCubes = int(nullableCoarseCount.Int32) + } else { + count.NumCoarseCubes = 0 + } + + if nullableContinuumCount.Valid { + count.NumContinuum = int(nullableContinuumCount.Int32) + } else { + count.NumContinuum = 0 + } + + productCounts = append(productCounts, count) + } + + return productCounts, err } // Whenever a `pimscache` call returns non-0 in the `pims_split` workflow, the @@ -277,20 +286,22 @@ $$, $$VALUES ('se_coarse_cube_image'),('se_continuum_imaging')$$) as // @return a string array with the split entries and any error that was // encountered func readFailedSplits() ([]string, error) { - failedTilesFilename := "failed_splits.txt" - var failedTiles []string + failedTilesFilename := "failed_splits.txt" + failedTiles := make([]string, 0) - failedTilesFile, err := os.Open(failedTilesFilename) - if err != nil { return failedTiles, err } + failedTilesFile, err := os.Open(failedTilesFilename) + if err != nil { + return failedTiles, err + } - defer failedTilesFile.Close() + defer failedTilesFile.Close() - tileScanner := bufio.NewScanner(failedTilesFile) + tileScanner := bufio.NewScanner(failedTilesFile) - for tileScanner.Scan() { - failedTiles = append(failedTiles, tileScanner.Text()) - } - err = tileScanner.Err() + for tileScanner.Scan() { + failedTiles = append(failedTiles, tileScanner.Text()) + } + err = tileScanner.Err() - return failedTiles, err + return failedTiles, err } diff --git a/ci/bin/generate-go-yaml.py b/ci/bin/generate-go-yaml.py index fea601ab5982b5dd8fc391ab1263f7a4cfb8b0bb..3bced414eaf580a3e187833476308280f55dac78 100755 --- a/ci/bin/generate-go-yaml.py +++ b/ci/bin/generate-go-yaml.py @@ -109,7 +109,7 @@ def main(argv): # Get list of files that have changed from commit SHA # git diff-tree --no-commit-id --name-only -r $CI_COMMIT_SHA sp = subprocess.run( - ["git", "diff-tree", "--no-commit-id", "--name-only", "-r", f"{commit_sha}"], + ["git", "diff-tree", "-m", "--no-commit-id", "--name-only", "-r", f"{commit_sha}"], stdout=subprocess.PIPE, universal_newlines=True, )