Skip to content
Snippets Groups Projects
Commit 5ac647e8 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

Update seci_ingestion_status for gocapo

parent 0731e678
No related branches found
No related tags found
1 merge request!1073Update seci_ingestion_status for gocapo
Pipeline #6768 passed
Pipeline: workspaces

#6769

    # Ingest Image Workflow Trigger: # Ingest Image Workflow Trigger:
    `iiwf_trigger` is a command line utility to initiate a workflow for a VLASS image set. `iiwf_trigger` is a command line utility to initiate a workflow for a VLASS image set.
    It is not typically run by hand, but called by the [ingest_vlass_tile.sh](../../wf_framework/sh) script. It is not typically run by hand, but called by the ingest_vlass_tile.sh script.
    It works by making a REST call to Workspaces to start the workflow. It is recommended It works by making a REST call to Workspaces to start the workflow. It is recommended
    to have the `CAPO_PROFILE` environment variable set, otherwise you will need to to have the `CAPO_PROFILE` environment variable set, otherwise you will need to
    manually specify the CAPO file with the calibration lookup URL. manually specify the CAPO file with the calibration lookup URL.
    ## Usage ## Usage
    ``` ```
    Usage: iiwf_trigger -path /path/to/VLASS/image/set [-prop CAPO_PATH] Usage: iiwf_trigger -path /path/to/VLASS/image/set [-prop CAPO_PATH] [-profile CAPO_PROFILE]
    -path Path to the VLASS image set -path [Required] Path to a cached VLASS image set
    -prop Path to the CAPO properties file with the calibration lookup URL (default is $CAPO_PROFILE.properties) -prop [Optional] Path to the CAPO properties file, defaults to CAPO_PATH env variable
    -profile The CAPO profile to use, defaults to CAPO_PROFILE env variable
    ``` ```
    ...@@ -7,7 +7,8 @@ workflow's results directory. ...@@ -7,7 +7,8 @@ workflow's results directory.
    ## Usage ## Usage
    ``` ```
    Usage: seci_ingestion_status -id <WF Request ID> [-prop CAPO_PATH] Usage: seci_ingestion_status -id <WF Request ID> [-prop CAPO_PATH] [-profile CAPO_PROFILE]
    -id Workflow Request ID of the SECI Ingestion to inspect -id [Required] Workflow Request Id of SECI Ingestion to inspect
    -prop Path to the CAPO properties file (default is $CAPO_PROFILE.properties) -path [Optional] Path to the CAPO properties file, defaults to CAPO_PATH env variable
    -profile The CAPO profile to use, defaults to CAPO_PROFILE env variable
    ``` ```
    ...@@ -2,4 +2,24 @@ module ssa/seci_ingestion_status ...@@ -2,4 +2,24 @@ module ssa/seci_ingestion_status
    go 1.18 go 1.18
    require github.com/magiconair/properties v1.8.6 // indirect require gitlab.nrao.edu/ssa/gocapo v0.0.0-20220922144228-05f75dd5e5cd
    require (
    github.com/fsnotify/fsnotify v1.5.4 // indirect
    github.com/hashicorp/hcl v1.0.0 // indirect
    github.com/magiconair/properties v1.8.6 // indirect
    github.com/mitchellh/mapstructure v1.5.0 // indirect
    github.com/pelletier/go-toml v1.9.5 // indirect
    github.com/pelletier/go-toml/v2 v2.0.5 // indirect
    github.com/spf13/afero v1.9.2 // indirect
    github.com/spf13/cast v1.5.0 // indirect
    github.com/spf13/jwalterweatherman v1.1.0 // indirect
    github.com/spf13/pflag v1.0.5 // indirect
    github.com/spf13/viper v1.13.0 // indirect
    github.com/subosito/gotenv v1.4.1 // indirect
    golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
    golang.org/x/text v0.3.7 // indirect
    gopkg.in/ini.v1 v1.67.0 // indirect
    gopkg.in/yaml.v2 v2.4.0 // indirect
    gopkg.in/yaml.v3 v3.0.1 // indirect
    )
    This diff is collapsed.
    ...@@ -22,25 +22,29 @@ package main ...@@ -22,25 +22,29 @@ package main
    import ( import (
    "flag" "flag"
    "fmt" "fmt"
    "gitlab.nrao.edu/ssa/gocapo/helpers"
    "os"
    "ssa/seci_ingestion_status/pkg/inspect" "ssa/seci_ingestion_status/pkg/inspect"
    "ssa/seci_ingestion_status/pkg/wf_info" "ssa/seci_ingestion_status/pkg/wf_info"
    ) )
    func main() { func main() {
    var wf_req_id string var wfReqId string
    var capo_profile string var capoProfile string
    var capoPath string
    flag.StringVar(&wf_req_id, "id", "", "Workflow Request Id of SECI Ingestion to inspect") flag.StringVar(&wfReqId, "id", "", "[Required] Workflow Request Id of SECI Ingestion to inspect")
    flag.StringVar(&capo_profile, "prop", "/home/casa/capo/${CAPO_PROFILE}.properties", "Path to the CAPO properties file") flag.StringVar(&capoPath, "path", helpers.DefaultCapoPath, "[Optional] Path to the CAPO properties file, defaults to CAPO_PATH env variable")
    flag.StringVar(&capoProfile, "profile", os.Getenv("CAPO_PROFILE"), "The CAPO profile to use, defaults to CAPO_PROFILE env variable")
    flag.Parse() flag.Parse()
    // Make sure a wf_req_id was provided // Make sure a wf_req_id was provided
    if len(wf_req_id) == 0 { if len(wfReqId) == 0 {
    flag.Usage() flag.Usage()
    return return
    } }
    seci_wf := wf_info.GetSECIWorkflowInfo(wf_req_id, capo_profile) seciWf := wf_info.GetSECIWorkflowInfo(wfReqId, capoProfile, capoPath)
    status_msg := inspect.InspectSECIIngestionStatus(seci_wf) statusMsg := inspect.SeciIngestionStatus(seciWf)
    fmt.Println(status_msg) fmt.Println(statusMsg)
    } }
    ...@@ -57,39 +57,40 @@ func checkError(err error) { ...@@ -57,39 +57,40 @@ func checkError(err error) {
    } }
    } }
    //SeciIngestionStatus
    /** /**
    * Inspect a SECI Ingestion Workflow Request to determine status * Inspect a SECI Ingestion Workflow Request to determine status
    * *
    * @param WfInfo A struct with needed workflow information * @param WfInfo A struct with needed workflow information
    * @return a formatted string containing the status of a given SECI Ingestion Workflow * @return a formatted string containing the status of a given SECI Ingestion Workflow
    **/ **/
    func InspectSECIIngestionStatus(wf_info wf.WfInfo) string { func SeciIngestionStatus(wfInfo wf.WfInfo) string {
    var statusFile string = "ingest-result.json" var statusFile = "ingest-result.json"
    ingest_status := NewIngestionStatus(wf_info) ingestStatus := NewIngestionStatus(wfInfo)
    file_content, err := ioutil.ReadFile(fmt.Sprintf("%s/%s", wf_info.ResultsDir, statusFile)) fileContent, err := ioutil.ReadFile(fmt.Sprintf("%s/%s", wfInfo.ResultsDir, statusFile))
    if errors.Is(err, os.ErrNotExist) { if errors.Is(err, os.ErrNotExist) {
    ingest_status.Status = "Unknown" ingestStatus.Status = "Unknown"
    return ingest_status.generateStatusMsg() return ingestStatus.generateStatusMsg()
    } else { } else {
    checkError(err) checkError(err)
    } }
    err = json.Unmarshal(file_content, &ingest_status) err = json.Unmarshal(fileContent, &ingestStatus)
    if err != nil { if err != nil {
    log.Fatal("Error during Unmarshal(): ", err) log.Fatal("Error during Unmarshal(): ", err)
    } }
    if ingest_status.Type == "workflow-complete" { if ingestStatus.Type == "workflow-complete" {
    ingest_status.IsIngested = true ingestStatus.IsIngested = true
    ingest_status.Status = "Ingested" ingestStatus.Status = "Ingested"
    } }
    return ingest_status.generateStatusMsg() return ingestStatus.generateStatusMsg()
    } }
    func (s *IngestionStatus) generateStatusMsg() string { func (s *IngestionStatus) generateStatusMsg() string {
    var additionalInfo string = "" var additionalInfo = ""
    if s.Status == "Unknown" { if s.Status == "Unknown" {
    additionalInfo = fmt.Sprintf("\n\nUnable to determine ingestion status. Ingestion may still be running or the ingestion workflow request may not have been created") additionalInfo = fmt.Sprintf("\n\nUnable to determine ingestion status. Ingestion may still be running or the ingestion workflow request may not have been created")
    } else if !s.IsIngested { } else if !s.IsIngested {
    ......
    ...@@ -22,9 +22,9 @@ package wf_info ...@@ -22,9 +22,9 @@ package wf_info
    import ( import (
    "encoding/json" "encoding/json"
    "fmt" "fmt"
    "gitlab.nrao.edu/ssa/gocapo/capo/config"
    "io"
    "net/http" "net/http"
    "github.com/magiconair/properties"
    ) )
    type WfInfo struct { type WfInfo struct {
    ...@@ -38,6 +38,7 @@ type Argument struct { ...@@ -38,6 +38,7 @@ type Argument struct {
    CachePath string `json:"cache_path"` CachePath string `json:"cache_path"`
    } }
    //checkError
    /** /**
    * Check if an error happened and panic on it if so * Check if an error happened and panic on it if so
    * *
    ...@@ -49,54 +50,40 @@ func checkError(err error) { ...@@ -49,54 +50,40 @@ func checkError(err error) {
    } }
    } }
    //GetSECIWorkflowInfo
    /** /**
    * Pull a property from a CAPO property file * Get Workflow info for SECI Ingestion
    *
    * @param capo_profile A string with a path to the CAPO property file
    * @param property_name A string with the name of the property to be returned
    * @return a string with the property value
    **/
    func getCapoProperty(capo_profile string, property_name string) string {
    prop_file := properties.MustLoadFile(capo_profile, properties.UTF8)
    property := prop_file.GetString(property_name, "")
    if len(property) == 0 {
    panic(fmt.Sprintf("Unable to retrieve property %s from %s", property_name, capo_profile))
    }
    return property
    }
    /**
    * Get Workflow info for SECI Ingestions
    * *
    * @param wf_req_id A string of a workflow request id * @param wf_req_id A string of a workflow request id
    * @param capo_profile A string with the path to the CAPO property file used to * @param capo_profile A string with the path to the CAPO property file used to
    * get the workflow service url * get the workflow service url
    * @return a WfInfo struct * @return a WfInfo struct
    **/ **/
    func GetSECIWorkflowInfo(wf_req_id string, capo_profile string) WfInfo { func GetSECIWorkflowInfo(wfReqId string, capoProfile string, path string) WfInfo {
    workflow_url_prop := "edu.nrao.workspaces.WorkflowSettings.serviceUrl" properties, err := config.InitConfig(capoProfile, path)
    workflow_url := getCapoProperty(capo_profile, workflow_url_prop) workflowUrl := properties.SettingsForPrefix("edu.nrao.workspaces.WorkflowSettings").GetString("serviceUrl")
    workflow_endpoint := fmt.Sprintf("%s/workflows/ingest_seci/requests/%s", workflow_url, wf_req_id) workflowEndpoint := fmt.Sprintf("%s/workflows/ingest_seci/requests/%s", workflowUrl, wfReqId)
    resp, err := http.Get(workflow_endpoint) resp, err := http.Get(workflowEndpoint)
    checkError(err) checkError(err)
    if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
    panic(fmt.Sprintf("Error making GET request to URL %s, got status code %d", workflow_endpoint, resp.StatusCode)) panic(fmt.Sprintf("Error making GET request to URL %s, got status code %d", workflowEndpoint, resp.StatusCode))
    } }
    defer resp.Body.Close() defer func(Body io.ReadCloser) {
    err := Body.Close()
    checkError(err)
    }(resp.Body)
    wf_info := new(WfInfo) wfInfo := new(WfInfo)
    err = json.NewDecoder(resp.Body).Decode(wf_info) err = json.NewDecoder(resp.Body).Decode(wfInfo)
    checkError(err) checkError(err)
    if wf_info.WfName != "ingest_seci" { if wfInfo.WfName != "ingest_seci" {
    panic(fmt.Sprintf("The provided workflow request id %s is not an ingest_seci workflow. Cannot provide status.", wf_req_id)) panic(fmt.Sprintf("The provided workflow request id %s is not an ingest_seci workflow. Cannot provide status.", wfReqId))
    } }
    return *wf_info return *wfInfo
    } }
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment