Skip to content
Snippets Groups Projects
Commit 8bbb9ce5 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

Merge branch 'analyzer_tweaks' into 'main'

ensure slice struct fields are initialized

See merge request !1356
parents 0a61db23 f0fb1cfb
No related branches found
Tags 2.8.1 2.8.1-rc3
2 merge requests!1390Catch up with Main,!1356ensure slice struct fields are initialized
Pipeline #10090 passed
Pipeline: workspaces

#10092

    Pipeline: workspaces

    #10091

      ......@@ -20,141 +20,143 @@
      package pims
      import (
      "bufio"
      "database/sql"
      "encoding/json"
      "fmt"
      "os"
      "path/filepath"
      "ssa/pims_analyzer/pkg/db"
      "strings"
      "bufio"
      "database/sql"
      "encoding/json"
      "fmt"
      "os"
      "path/filepath"
      "ssa/pims_analyzer/pkg/db"
      "strings"
      )
      // Stores PIMS info requested by the DAs for PIMS notification emails
      type PimsInfo struct {
      // the calibration requested
      Calibration string `json:"calibration"`
      // The number of splits in the job
      NumSplits int `json:"num_splits"`
      // Path to restored MS
      RestoredMSPath string `json:"restore_path"`
      // Path to CASA used
      CasaVersion string `json:"casa_path"`
      // lustre processing area
      WorkingDir string `json:"lustre_dir"`
      // output cache directory
      CacheDir string `json:"cache_dir"`
      // number of SE and CC products per tile
      NumProducts []TileProductCounts `json:"num_products"`
      // used to determine workflow status header message
      StatusMsg string `json:"status"`
      // contains the failed splits in `tile.phasecenter` format
      FailedSplits []string `json:"failed_splits"`
      // Num failed splits (needs to be in JSON to get passed to email template)
      NumFailedSplits int `json:"num_failed_splits"`
      // the calibration requested
      Calibration string `json:"calibration"`
      // The number of splits in the job
      NumSplits int `json:"num_splits"`
      // Path to restored MS
      RestoredMSPath string `json:"restore_path"`
      // Path to CASA used
      CasaVersion string `json:"casa_path"`
      // lustre processing area
      WorkingDir string `json:"lustre_dir"`
      // output cache directory
      CacheDir string `json:"cache_dir"`
      // number of SE and CC products per tile
      NumProducts []TileProductCounts `json:"num_products"`
      // used to determine workflow status header message
      StatusMsg string `json:"status"`
      // contains the failed splits in `tile.phasecenter` format
      FailedSplits []string `json:"failed_splits"`
      // Num failed splits (needs to be in JSON to get passed to email template)
      NumFailedSplits int `json:"num_failed_splits"`
      }
      // Contains all the relevant columns of the `pims_split` entry in the archive
      // database
      type workflowEntry struct {
      arguments []byte
      resultsDir string
      arguments []byte
      resultsDir string
      }
      // Populates a PimsInfo struct with data from a given workflow identified by the
      // GetPimsInfo Populates a PimsInfo struct with data from a given workflow identified by the
      // workflow ID
      //
      // @param workflowId the ID of the PIMS workflow
      // @param capoPath the path to the capo property file
      // @param capProfile the CAPO profile used during the workflow execution
      func GetPimsInfo(workflowId int, capoPath string, capoProfile string) PimsInfo {
      var pimsInfo PimsInfo
      // We need connections to both the VLASS and Archive databases for everything
      var vlassInfo db.DbInfo
      var archiveInfo db.DbInfo
      // VLASS profile is the same as the dsoc profile with `vlass.`
      vlassInfo.Profile = strings.Replace(capoProfile, "dsoc-", "vlass.", -1)
      archiveInfo.Profile = capoProfile
      vlassInfo.CapoPath, archiveInfo.CapoPath = capoPath, capoPath
      // Vlass and Archive have different prefixes
      vlassInfo.Prefix = "edu.nrao.vlass.config.VlassMngrSettings"
      archiveInfo.Prefix = "metadataDatabase"
      vlassDb := db.GetConnection(vlassInfo)
      archiveDb := db.GetConnection(archiveInfo)
      defer vlassDb.Close()
      defer archiveDb.Close()
      wfEntry, err := getWorkflowColumns(workflowId, archiveDb)
      db.CheckError(err)
      // Populate PimsInfo with data from archive database
      var jsonArgs map[string]interface{}
      err = json.Unmarshal(wfEntry.arguments, &jsonArgs)
      db.CheckError(err)
      pimsInfo.FailedSplits, err = readFailedSplits()
      db.CheckError(err)
      pimsInfo.NumFailedSplits = len(pimsInfo.FailedSplits)
      pimsInfo.Calibration = jsonArgs["vlass_product"].(string)
      pimsInfo.CasaVersion = jsonArgs["casaHome"].(string)
      pimsInfo.WorkingDir = wfEntry.resultsDir
      pimsInfo.CacheDir = fmt.Sprintf("/lustre/aoc/cluster/pipeline/%s/cache/pims/%s",
      strings.Replace(capoProfile, "dsoc-", "vlass_", -1),
      pimsInfo.Calibration,
      )
      // Workflow reported as failed when there are failed splits
      status := "succeeded"
      if (pimsInfo.NumFailedSplits != 0) { status = "failed" }
      pimsInfo.StatusMsg = fmt.Sprintf("PIMS Split workflow %s!",
      status,
      )
      // Check if there's a Restored MS Path
      if jsonArgs["existing_restore"] != nil {
      pimsInfo.RestoredMSPath = jsonArgs["existing_restore"].(string)
      } else {
      // Check if a restore is present in the working directory
      // Should be in working/ in the lustre directory
      var restoreFiles []string
      restoreFiles, err = filepath.Glob(pimsInfo.WorkingDir + "/working/*.ms")
      if restoreFiles == nil {
      pimsInfo.RestoredMSPath = "Restore failed!"
      } else {
      pimsInfo.RestoredMSPath = restoreFiles[0]
      }
      }
      // Add Splits
      split_list_raw := jsonArgs["splits"].([]interface {})
      split_list := make([]string, len(split_list_raw))
      for i, v := range jsonArgs["splits"].([]interface {}) {
      split_list[i] = fmt.Sprint(v)
      }
      pimsInfo.NumSplits = len(split_list)
      // Get unique tile values from `tile/phasecenter` values
      tiles := make(map[string]bool)
      for _, v := range split_list {
      tile := strings.Split(v,"/")[0]
      tiles[tile] = true
      }
      pimsInfo.NumProducts, err = getNumProductsPerTile(tiles, vlassDb)
      db.CheckError(err)
      return pimsInfo
      var pimsInfo PimsInfo
      // We need connections to both the VLASS and Archive databases for everything
      var vlassInfo db.DbInfo
      var archiveInfo db.DbInfo
      // VLASS profile is the same as the dsoc profile with `vlass.`
      vlassInfo.Profile = strings.Replace(capoProfile, "dsoc-", "vlass.", -1)
      archiveInfo.Profile = capoProfile
      vlassInfo.CapoPath, archiveInfo.CapoPath = capoPath, capoPath
      // Vlass and Archive have different prefixes
      vlassInfo.Prefix = "edu.nrao.vlass.config.VlassMngrSettings"
      archiveInfo.Prefix = "metadataDatabase"
      vlassDb := db.GetConnection(vlassInfo)
      archiveDb := db.GetConnection(archiveInfo)
      defer vlassDb.Close()
      defer archiveDb.Close()
      wfEntry, err := getWorkflowColumns(workflowId, archiveDb)
      db.CheckError(err)
      // Populate PimsInfo with data from archive database
      var jsonArgs map[string]interface{}
      err = json.Unmarshal(wfEntry.arguments, &jsonArgs)
      db.CheckError(err)
      pimsInfo.FailedSplits, err = readFailedSplits()
      db.CheckError(err)
      pimsInfo.NumFailedSplits = len(pimsInfo.FailedSplits)
      pimsInfo.Calibration = jsonArgs["vlass_product"].(string)
      pimsInfo.CasaVersion = jsonArgs["casaHome"].(string)
      pimsInfo.WorkingDir = wfEntry.resultsDir
      pimsInfo.CacheDir = fmt.Sprintf("/lustre/aoc/cluster/pipeline/%s/cache/pims/%s",
      strings.Replace(capoProfile, "dsoc-", "vlass_", -1),
      pimsInfo.Calibration,
      )
      // Workflow reported as failed when there are failed splits
      status := "succeeded"
      if pimsInfo.NumFailedSplits != 0 {
      status = "failed"
      }
      pimsInfo.StatusMsg = fmt.Sprintf("PIMS Split workflow %s!",
      status,
      )
      // Check if there's a Restored MS Path
      if jsonArgs["existing_restore"] != nil {
      pimsInfo.RestoredMSPath = jsonArgs["existing_restore"].(string)
      } else {
      // Check if a restore is present in the working directory
      // Should be in working/ in the lustre directory
      var restoreFiles []string
      restoreFiles, err = filepath.Glob(pimsInfo.WorkingDir + "/working/*.ms")
      if restoreFiles == nil {
      pimsInfo.RestoredMSPath = "Restore failed!"
      } else {
      pimsInfo.RestoredMSPath = restoreFiles[0]
      }
      }
      // Add Splits
      split_list_raw := jsonArgs["splits"].([]interface{})
      split_list := make([]string, len(split_list_raw))
      for i, v := range jsonArgs["splits"].([]interface{}) {
      split_list[i] = fmt.Sprint(v)
      }
      pimsInfo.NumSplits = len(split_list)
      // Get unique tile values from `tile/phasecenter` values
      tiles := make(map[string]bool)
      for _, v := range split_list {
      tile := strings.Split(v, "/")[0]
      tiles[tile] = true
      }
      pimsInfo.NumProducts, err = getNumProductsPerTile(tiles, vlassDb)
      db.CheckError(err)
      return pimsInfo
      }
      // Store the relevant columns for the workflow entry in the archive database in
      ......@@ -164,38 +166,37 @@ func GetPimsInfo(workflowId int, capoPath string, capoProfile string) PimsInfo {
      // @param A connection to the database
      // @return A workflowEntry struct with the column data or an error
      func getWorkflowColumns(workflowId int, dbConnection *sql.DB) (workflowEntry, error) {
      workflowTable := "workflow_requests"
      workflowIdKey := "workflow_request_id"
      dataCols := "argument,results_dir"
      var entry workflowEntry
      // var tmpArgs string // Must get JSON as string first
      var err error
      // Get relevant columns from the entry in the database
      // Get the database rows where metadata is extracted from
      err = dbConnection.QueryRow(
      fmt.Sprintf("SELECT %s FROM %s WHERE %s='%d'",
      dataCols,
      workflowTable,
      workflowIdKey,
      workflowId),
      ).Scan(&entry.arguments, &entry.resultsDir)
      return entry, err
      workflowTable := "workflow_requests"
      workflowIdKey := "workflow_request_id"
      dataCols := "argument,results_dir"
      var entry workflowEntry
      // var tmpArgs string // Must get JSON as string first
      var err error
      // Get relevant columns from the entry in the database
      // Get the database rows where metadata is extracted from
      err = dbConnection.QueryRow(
      fmt.Sprintf("SELECT %s FROM %s WHERE %s='%d'",
      dataCols,
      workflowTable,
      workflowIdKey,
      workflowId),
      ).Scan(&entry.arguments, &entry.resultsDir)
      return entry, err
      }
      // Stores the number of SEs and CCs in the database for a tile
      type TileProductCounts struct {
      // Name of the Tile
      Tile string `json:"tile_name"`
      // Number of coarse cubes associated
      NumCoarseCubes int `json:"num_coarse_cube"`
      // Number of continuum images associated
      NumContinuum int `json:"num_continuum"`
      // Name of the Tile
      Tile string `json:"tile_name"`
      // Number of coarse cubes associated
      NumCoarseCubes int `json:"num_coarse_cube"`
      // Number of continuum images associated
      NumContinuum int `json:"num_continuum"`
      }
      // Populates an array of tileProductCounts structs with CC and SE counts for a
      // list of tiles
      //
      ......@@ -205,23 +206,25 @@ type TileProductCounts struct {
      // @return An array of tileProductCounts with the counts and any error
      // encountered
      func getNumProductsPerTile(tiles map[string]bool, dbConnection *sql.DB) ([]TileProductCounts, error) {
      var productCounts []TileProductCounts
      var err error
      // Only do this if there are tiles to check products for
      if len(tiles) <= 0 { return productCounts, err }
      // Get tiles as string in `(tile1, tile2, etc.)` format to fit the query
      // syntax
      inTilesQuery := "("
      for key := range tiles {
      inTilesQuery += fmt.Sprintf("'%s', ", key)
      }
      // Remove trailing ,
      inTilesQuery = strings.TrimRight(inTilesQuery, ", ")
      inTilesQuery += ")"
      query := fmt.Sprintf(`select * from crosstab($$
      productCounts := make([]TileProductCounts, 0)
      var err error
      // Only do this if there are tiles to check products for
      if len(tiles) <= 0 {
      return productCounts, err
      }
      // Get tiles as string in `(tile1, tile2, etc.)` format to fit the query
      // syntax
      inTilesQuery := "("
      for key := range tiles {
      inTilesQuery += fmt.Sprintf("'%s', ", key)
      }
      // Remove trailing ,
      inTilesQuery = strings.TrimRight(inTilesQuery, ", ")
      inTilesQuery += ")"
      query := fmt.Sprintf(`select * from crosstab($$
      select m.name as minitile, pt.name as product_type, count(*) as count
      from product
      join product_type pt on product.product_type_id = pt.id
      ......@@ -233,40 +236,46 @@ group by m.name, pt.name
      order by minitile, product_type;
      $$, $$VALUES ('se_coarse_cube_image'),('se_continuum_imaging')$$) as
      ct(minitile varchar, se_coarse_cube_image varchar, se_continuum_imaging varchar);`,
      inTilesQuery)
      var rows *sql.Rows
      rows, err = dbConnection.Query(query)
      if err != nil { return productCounts, err }
      defer rows.Close()
      for rows.Next() {
      var count TileProductCounts
      // Needed to deal with potential null values
      var nullableCoarseCount, nullableContinuumCount sql.NullInt32
      if err = rows.Scan(
      &count.Tile,
      &nullableCoarseCount,
      &nullableContinuumCount) ; err != nil {
      return productCounts, err
      }
      // Make sure null counts from DB handled
      if nullableCoarseCount.Valid {
      count.NumCoarseCubes = int(nullableCoarseCount.Int32)
      } else { count.NumCoarseCubes = 0 }
      if nullableContinuumCount.Valid {
      count.NumContinuum = int(nullableContinuumCount.Int32)
      } else { count.NumContinuum = 0 }
      productCounts = append(productCounts, count)
      }
      return productCounts, err
      inTilesQuery)
      var rows *sql.Rows
      rows, err = dbConnection.Query(query)
      if err != nil {
      return productCounts, err
      }
      defer rows.Close()
      for rows.Next() {
      var count TileProductCounts
      // Needed to deal with potential null values
      var nullableCoarseCount, nullableContinuumCount sql.NullInt32
      if err = rows.Scan(
      &count.Tile,
      &nullableCoarseCount,
      &nullableContinuumCount); err != nil {
      return productCounts, err
      }
      // Make sure null counts from DB handled
      if nullableCoarseCount.Valid {
      count.NumCoarseCubes = int(nullableCoarseCount.Int32)
      } else {
      count.NumCoarseCubes = 0
      }
      if nullableContinuumCount.Valid {
      count.NumContinuum = int(nullableContinuumCount.Int32)
      } else {
      count.NumContinuum = 0
      }
      productCounts = append(productCounts, count)
      }
      return productCounts, err
      }
      // Whenever a `pimscache` call returns non-0 in the `pims_split` workflow, the
      ......@@ -277,20 +286,22 @@ $$, $$VALUES ('se_coarse_cube_image'),('se_continuum_imaging')$$) as
      // @return a string array with the split entries and any error that was
      // encountered
      func readFailedSplits() ([]string, error) {
      failedTilesFilename := "failed_splits.txt"
      var failedTiles []string
      failedTilesFilename := "failed_splits.txt"
      failedTiles := make([]string, 0)
      failedTilesFile, err := os.Open(failedTilesFilename)
      if err != nil { return failedTiles, err }
      failedTilesFile, err := os.Open(failedTilesFilename)
      if err != nil {
      return failedTiles, err
      }
      defer failedTilesFile.Close()
      defer failedTilesFile.Close()
      tileScanner := bufio.NewScanner(failedTilesFile)
      tileScanner := bufio.NewScanner(failedTilesFile)
      for tileScanner.Scan() {
      failedTiles = append(failedTiles, tileScanner.Text())
      }
      err = tileScanner.Err()
      for tileScanner.Scan() {
      failedTiles = append(failedTiles, tileScanner.Text())
      }
      err = tileScanner.Err()
      return failedTiles, err
      return failedTiles, err
      }
      ......@@ -109,7 +109,7 @@ def main(argv):
      # Get list of files that have changed from commit SHA
      # git diff-tree --no-commit-id --name-only -r $CI_COMMIT_SHA
      sp = subprocess.run(
      ["git", "diff-tree", "--no-commit-id", "--name-only", "-r", f"{commit_sha}"],
      ["git", "diff-tree", "-m", "--no-commit-id", "--name-only", "-r", f"{commit_sha}"],
      stdout=subprocess.PIPE,
      universal_newlines=True,
      )
      ......
      0% Loading or .
      You are about to add 0 people to the discussion. Proceed with caution.
      Finish editing this message first!
      Please register or to comment