Skip to content
Snippets Groups Projects
Commit 0731e678 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

Update iiwf_trigger for gocapo

parent 1c9e9c83
No related branches found
No related tags found
1 merge request!1072Update iiwf_trigger for gocapo
Pipeline #6765 passed
Pipeline: workspaces

#6766

    ......@@ -7,8 +7,10 @@ stages:
    - test-coverage
    - push
    - deploy-coverage-page
    - generate-yaml
    - trigger
    - generate-pex-yaml
    - pex-trigger
    - generate-go-yaml
    - go-trigger
    - deploy
    # - e2e-test # e2e tests disabled
    - .post
    ......@@ -302,7 +304,7 @@ pages:
    # Generate PEX builder yaml for child pipeline
    pex generate yaml:
    stage: generate-yaml
    stage: generate-pex-yaml
    image: python:3.8-slim
    before_script:
    - apt update && apt install -y git
    ......@@ -326,7 +328,7 @@ pex generate yaml:
    # Trigger child pipeline based on generated PEX builder yaml
    pex child pipeline:
    stage: trigger
    stage: pex-trigger
    trigger:
    include:
    - artifact: generated-pex-build-pipeline.yml
    ......@@ -346,7 +348,7 @@ pex child pipeline:
    # Generate go builder yaml for child pipeline
    go generate yaml:
    stage: generate-yaml
    stage: generate-go-yaml
    image: python:3.8-slim
    before_script:
    - apt update && apt install -y git
    ......@@ -370,7 +372,7 @@ go generate yaml:
    # Trigger child pipeline based on generated go builder yaml
    go child pipeline:
    stage: trigger
    stage: go-trigger
    trigger:
    include:
    - artifact: generated-go-build-pipeline.yml
    ......
    ......@@ -2,4 +2,24 @@ module ssa/iiwf_trigger
    go 1.18
    require github.com/magiconair/properties v1.8.6
    require gitlab.nrao.edu/ssa/gocapo v0.0.0-20220922144228-05f75dd5e5cd
    require (
    github.com/fsnotify/fsnotify v1.5.4 // indirect
    github.com/hashicorp/hcl v1.0.0 // indirect
    github.com/magiconair/properties v1.8.6 // indirect
    github.com/mitchellh/mapstructure v1.5.0 // indirect
    github.com/pelletier/go-toml v1.9.5 // indirect
    github.com/pelletier/go-toml/v2 v2.0.5 // indirect
    github.com/spf13/afero v1.9.2 // indirect
    github.com/spf13/cast v1.5.0 // indirect
    github.com/spf13/jwalterweatherman v1.1.0 // indirect
    github.com/spf13/pflag v1.0.5 // indirect
    github.com/spf13/viper v1.13.0 // indirect
    github.com/subosito/gotenv v1.4.1 // indirect
    golang.org/x/sys v0.0.0-20220919091848-fb04ddd9f9c8 // indirect
    golang.org/x/text v0.3.7 // indirect
    gopkg.in/ini.v1 v1.67.0 // indirect
    gopkg.in/yaml.v2 v2.4.0 // indirect
    gopkg.in/yaml.v3 v3.0.1 // indirect
    )
    This diff is collapsed.
    ......@@ -21,23 +21,28 @@ package main
    import (
    "flag"
    "gitlab.nrao.edu/ssa/gocapo/helpers"
    "os"
    "ssa/iiwf_trigger/pkg/ingest"
    )
    var DefaultImgPath = "Not Set"
    func main() {
    var img_set_path string
    var capo_profile string
    var imgSetPath string
    var capo ingest.CapoInput
    flag.StringVar(&img_set_path, "path", "", "Path to a cached VLASS image set")
    flag.StringVar(&capo_profile, "prop", "/home/casa/capo/${CAPO_PROFILE}.properties", "Path to the CAPO properties file with the calibration lookup URL")
    flag.StringVar(&imgSetPath, "path", DefaultImgPath, "[Required] Path to a cached VLASS image set")
    flag.StringVar(&capo.CapoPath, "prop", helpers.DefaultCapoPath, "[Optional] Path to the CAPO properties file, defaults to CAPO_PATH env variable")
    flag.StringVar(&capo.CapoProfile, "profile", os.Getenv("CAPO_PROFILE"), "The CAPO profile to use, defaults to CAPO_PROFILE env variable")
    flag.Parse()
    // Make sure a path was provided
    if len(img_set_path) == 0 {
    if len(imgSetPath) == 0 {
    flag.Usage()
    return
    }
    locator := ingest.GetCalibrationLocator(img_set_path, capo_profile)
    ingest.CallIngestionEndpoint(img_set_path, locator, capo_profile)
    locator := ingest.GetCalibrationLocator(imgSetPath, capo)
    ingest.CallIngestionEndpoint(imgSetPath, locator, capo)
    }
    ......@@ -22,17 +22,22 @@ package ingest
    import (
    "encoding/json"
    "fmt"
    "gitlab.nrao.edu/ssa/gocapo/capo/config"
    "io"
    "io/ioutil"
    "net/http"
    "path/filepath"
    "github.com/magiconair/properties"
    )
    type calibrationInfo struct {
    Locator string `json:"locator"`
    }
    type CapoInput struct {
    CapoPath string
    CapoProfile string
    }
    /**
    * Check if an error happened and panic on it if so
    *
    ......@@ -44,24 +49,21 @@ func checkError(err error) {
    }
    }
    //getCapoConfig
    /**
    * Pull a property from a CAPO property file
    * Create a CapoConfig instance from a CAPO property file
    *
    * @param capo_profile A string with a path to the CAPO property file
    * @param property_name A string with the name of the property to be returned
    * @return a string with the property value
    * @param input A CapoInput instance encapsulating the requested profile and path
    * @return a CapoConfig instance
    **/
    func getCapoProperty(capo_profile string, property_name string) string {
    prop_file := properties.MustLoadFile(capo_profile, properties.UTF8)
    property := prop_file.GetString(property_name, "")
    if len(property) == 0 {
    panic(fmt.Sprintf("Unable to retrieve property %s from %s", property_name, capo_profile))
    }
    func getCapoConfig(input CapoInput) config.CapoConfig {
    properties, err := config.InitConfig(input.CapoProfile, input.CapoPath)
    checkError(err)
    return property
    return properties
    }
    //GetCalibrationLocator
    /**
    * Get the locator for a VLASS image set
    *
    ......@@ -70,29 +72,33 @@ func getCapoProperty(capo_profile string, property_name string) string {
    * get the calibration lookup URL
    * @return a string with the locator value
    **/
    func GetCalibrationLocator(img_set_path string, capo_profile string) string {
    lookup_url_prop := "edu.nrao.archive.workflow.config.IngestionWorkflowSettings.calibrationLookupUrl"
    workflow_endpoint := getCapoProperty(capo_profile, lookup_url_prop)
    func GetCalibrationLocator(imgSetPath string, input CapoInput) string {
    settings := getCapoConfig(input).SettingsForPrefix("edu.nrao.archive.workflow.config.IngestionWorkflowSettings")
    locatorEndpoint := settings.GetString("calibrationLookupUrl")
    img_set := filepath.Base(img_set_path)
    imgSet := filepath.Base(imgSetPath)
    // Make the REST call
    lookup_url := workflow_endpoint + img_set
    resp, err := http.Get(lookup_url)
    lookupUrl := locatorEndpoint + imgSet
    resp, err := http.Get(lookupUrl)
    checkError(err)
    defer resp.Body.Close()
    defer func(Body io.ReadCloser) {
    err := Body.Close()
    checkError(err)
    }(resp.Body)
    if resp.StatusCode != http.StatusOK {
    panic(fmt.Sprintf("Error making GET request to URL %s, got status code %d", lookup_url, resp.StatusCode))
    panic(fmt.Sprintf("Error making GET request to URL %s, got status code %d", lookupUrl, resp.StatusCode))
    }
    resp_bytes, err := ioutil.ReadAll(resp.Body)
    respBytes, err := ioutil.ReadAll(resp.Body)
    checkError(err)
    // Get the json of the response body
    var calibration calibrationInfo
    json.Unmarshal(resp_bytes, &calibration)
    err = json.Unmarshal(respBytes, &calibration)
    checkError(err)
    return calibration.Locator
    }
    ......@@ -4,6 +4,7 @@ import (
    "bytes"
    "encoding/json"
    "fmt"
    "io"
    "io/ioutil"
    "log"
    "net/http"
    ......@@ -20,6 +21,7 @@ type ResponsePayload struct {
    WorkflowRequestId int `json:"workflow_request_id"`
    }
    //CallIngestionEndpoint
    /**
    * Call the SECI ingestion endpoint to initiate the workflow
    *
    ......@@ -28,39 +30,43 @@ type ResponsePayload struct {
    * @param capo_profile A string with the path to the CAPO property file used to
    * get the Workflow service URL
    **/
    func CallIngestionEndpoint(img_set_path string, locator string, capo_profile string) {
    func CallIngestionEndpoint(imgSetPath string, locator string, input CapoInput) {
    settings := getCapoConfig(input).SettingsForPrefix("edu.nrao.workspaces.WorkflowSettings")
    workflowUrl := settings.GetString("serviceUrl")
    payload := RequestPayload{
    CachePath: img_set_path,
    CachePath: imgSetPath,
    CalSpl: locator,
    }
    // Convert payload to json
    payload_json, err := json.Marshal(payload)
    payloadJson, err := json.Marshal(payload)
    checkError(err)
    fmt.Println(string(payload_json))
    fmt.Println(string(payloadJson))
    // Get post request
    workflow_url_prop := "edu.nrao.workspaces.WorkflowSettings.serviceUrl"
    workflow_url := getCapoProperty(capo_profile, workflow_url_prop)
    post_url := workflow_url + "/workflows/requests/vlass_ingest/seci"
    resp, err := http.Post(post_url, "application/json", bytes.NewBuffer(payload_json))
    postUrl := workflowUrl + "/workflows/requests/vlass_ingest/seci"
    resp, err := http.Post(postUrl, "application/json", bytes.NewBuffer(payloadJson))
    checkError(err)
    defer resp.Body.Close()
    defer func(Body io.ReadCloser) {
    err := Body.Close()
    checkError(err)
    }(resp.Body)
    if resp.StatusCode != http.StatusOK {
    panic(fmt.Sprintf("Error making POST request to URL %s, got status code %d", post_url, resp.StatusCode))
    panic(fmt.Sprintf("Error making POST request to URL %s, got status code %d", postUrl, resp.StatusCode))
    }
    var resp_body ResponsePayload
    resp_bytes, err := ioutil.ReadAll(resp.Body)
    var respBody ResponsePayload
    respBytes, err := ioutil.ReadAll(resp.Body)
    checkError(err)
    err = json.Unmarshal(respBytes, &respBody)
    checkError(err)
    json.Unmarshal(resp_bytes, &resp_body)
    log.Println("POST Request Made")
    log.Printf("%s workflow created.\n", resp_body.WorkflowName)
    log.Println("Workflow Request Id: ", resp_body.WorkflowRequestId)
    log.Printf("\nTo check ingestion status, run: seci_ingestion_status -id %s", strconv.Itoa(resp_body.WorkflowRequestId))
    log.Printf("%s workflow created.\n", respBody.WorkflowName)
    log.Println("Workflow Request Id: ", respBody.WorkflowRequestId)
    log.Printf("\nTo check ingestion status, run: seci_ingestion_status -id %s", strconv.Itoa(respBody.WorkflowRequestId))
    }
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment