Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (23)
Showing
with 530 additions and 275 deletions
......@@ -12,7 +12,7 @@ repos:
rev: 22.3.0
hooks:
- id: black
args: ["--line-length", "120", "--target-version", "py38"]
args: ["--line-length", "120", "--target-version", "py310"]
# Commented out until static analysis report is generated
# Pylint https://pylint.pycqa.org/en/latest/
......
......@@ -50,7 +50,7 @@
<br />
<div>
<app-filter-menu *ngIf="showFilterMenu" [state]="statesToFilter" [exec_status]="execStatusToFilter" [downloadStatus]="downloadStatusToFilter" [stage1QaStaff]="qaStaff['Stage 1']" [stage2QaStaff]="qaStaff['Stage 2']" [srdpStatus]="srdpOptions" [filters]="filters" (filterMenuEventEmitter)="emitFilterEvent($event)"></app-filter-menu>
<app-filter-menu *ngIf="showFilterMenu" [state]="statesToFilter" [exec_status]="execStatusToFilter" [download_status]="downloadStatusToFilter" [stage1QaStaff]="qaStaff['Stage 1']" [stage2QaStaff]="qaStaff['Stage 2']" [srdpStatus]="srdpOptions" [filters]="filters" (filterMenuEventEmitter)="emitFilterEvent($event)"></app-filter-menu>
<mat-paginator #requestPaginator
[length]="(sortedActiveRequests$ | async)?.length"
[pageSize]="pageSize"
......@@ -86,7 +86,12 @@
<span><i class="text-dark small fas fa-filter"></i></span>
</button>
</th>
<th>NGAS Download Status</th>
<th>
NGAS Download Status
<button class="btn bg-transparent border-0 btn-light btn-sm" (click)="toggleFilterMenu()">
<span><i class="text-dark small fas fa-filter"></i></span>
</button>
</th>
<th>SDM ID</th>
<th>Bands</th>
<th>Array Configuration</th>
......
......@@ -123,8 +123,8 @@ export class ActiveCapabilityRequestsComponent implements OnInit, OnDestroy {
{ name: "Complete", filter_val: "Complete" },
];
public downloadStatusToFilter = [
{ name: "No Versions", filter_val: "" },
{ name: "Not Started", filter_val: "Not started" },
{ name: "Unknown", filter_val: "Unknown" },
{ name: "Not Started", filter_val: "Not Started" },
{ name: "In Progress", filter_val: "In Progress" },
{ name: "Complete", filter_val: "Complete" },
];
......@@ -709,7 +709,7 @@ export class ActiveCapabilityRequestsComponent implements OnInit, OnDestroy {
getFetchStatus(request: CapabilityRequest): string {
// figure out the latest version
if (request.versions.length == 0)
return "Not started";
return "Not Started";
let version = request.versions[request.versions.length - 1];
......@@ -718,6 +718,6 @@ export class ActiveCapabilityRequestsComponent implements OnInit, OnDestroy {
else if (version.workflow_metadata && version.workflow_metadata.fetch_start_time)
return "In Progress";
else
return "";
return "Unknown";
}
}
......@@ -18,7 +18,7 @@
<div class="bg-light p-2">
<p>NGAS Download Status</p>
<div *ngFor="let s of downloadStatus" class="form-check">
<div *ngFor="let s of download_status" class="form-check">
<input class="form-check-input" type="checkbox" [id]="s.name" [value]="s.filter_val" (change)="addFilter(s.filter_val, 'download_status', $event.target.checked)" [checked]="s.isChecked" />
<label class="form-check-label" [for]="s.name">{{s.name}}</label>
</div>
......
......@@ -35,7 +35,7 @@ export interface Filter {
export class FilterMenuComponent implements OnInit {
@Input() state: any;
@Input() exec_status: any;
@Input() downloadStatus: any;
@Input() download_status: any;
@Input() stage1QaStaff: any;
@Input() stage2QaStaff: any;
@Input() srdpStatus: any;
......
/*
* Copyright (C) 2023 Associated Universities, Inc. Washington DC, USA.
*
* This file is part of NRAO Workspaces.
*
* Workspaces is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Workspaces is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
*
*/
import {Component, Input, OnInit} from '@angular/core';
import {CapabilityVersion} from "../../../../model/capability-version";
import {Capability} from "../../../../model/capability";
......@@ -16,6 +35,7 @@ export class QaControlsComponent implements OnInit {
@Input() public capabilityRequest: CapabilityRequest;
@Input() public currentVersion: CapabilityVersion;
public defaultCC: string;
public emailParams: any;
constructor(
private capabilitiesService: CapabilitiesService,
......@@ -24,6 +44,7 @@ export class QaControlsComponent implements OnInit {
ngOnInit(): void {
this.loadDefaultCC();
this.loadEmailParams();
}
......@@ -183,11 +204,39 @@ export class QaControlsComponent implements OnInit {
return this.capabilitiesService.getAnalystEmail().subscribe(getAnalystEmailObserver);
}
public getEmailParams() {
return {
"destination_email": this.currentVersion.parameters.user_email,
"version": this.currentVersion,
"workflow_metadata": this.currentVersion.workflow_metadata
public loadEmailParams() {
const getEmailObserver = {
next: (response) => {
if (response.resp) {
this.emailParams = {
"destination_email": response.resp,
"version": this.currentVersion,
"workflow_metadata": this.currentVersion.workflow_metadata
};
} else {
this.emailParams = {
"destination_email": null,
"version": this.currentVersion,
"workflow_metadata": this.currentVersion.workflow_metadata
};
}
},
error: (error) => {
console.error("Failed to load destination email:", error);
this.emailParams = {
"destination_email": null,
"version": this.currentVersion,
"workflow_metadata": this.currentVersion.workflow_metadata
};
}
};
return this.capabilityRequestService
.getUserEmail(this.capabilityRequest.id, this.currentVersion.version_number)
.subscribe(getEmailObserver);
}
public getEmailParams() {
return this.emailParams;
}
}
......@@ -269,7 +269,6 @@ export class CapabilityRequestService {
/**
* Toggles SRDP Compatibility to true and false
* @param request_id Given request ID
* @param version_id Given version ID
* @param isAlreadySRDP The current status of SRDP-Compatability
*/
public toggleSRDPCompatibility(request_id: string, isAlreadySRDP: boolean): Observable<any> {
......@@ -278,6 +277,17 @@ export class CapabilityRequestService {
return this.httpClient.post<any>(url, JSON.stringify(toggleSRDP));
}
/**
* Get the user email information for a capability version
* @param request_id Given request ID
* @param version_id Given version ID
* @return A JSON response with the email information or an error
*/
public getUserEmail(request_id: string, version_id: number): Observable<JsonObject> {
const url = `capability/request/${request_id}/version/${version_id}/user_email`;
return this.httpClient.get<JsonObject>(url);
}
/**
* Assigns QA Stage 1 and 2 reviewers to a capability request
* @param requestId Given request ID, string
......
......@@ -222,6 +222,12 @@ def capability_version_routes(config: Configurator):
pattern="capability/request/{capability_request_id}/version/{version_id}/workflow_metadata",
request_method="POST",
)
# Get email information for a capability version
config.add_route(
name="get_user_email",
pattern="capability/request/{capability_request_id}/version/{version_id}/user_email",
request_method="GET",
)
def capability_execution_routes(config: Configurator):
......
......@@ -23,6 +23,7 @@ import copy
import http
import json
import logging
import subprocess
import requests
from pycapo import CapoConfig
......@@ -33,6 +34,8 @@ from pyramid.view import view_config
logger = logging.getLogger(__name__)
NOTIF_SETTINGS_KEY = "edu.nrao.workspaces.NotificationSettings"
from workspaces.capability.enums import CapabilityVersionState
......@@ -371,3 +374,73 @@ def edit_version_metadata(request: Request) -> Response:
else:
no_versions_msg = f"Capability request {capability_request_id} version {version_id} not found."
return HTTPNotFound(detail=no_versions_msg)
@view_config(route_name="get_user_email", renderer="json")
def get_user_email(request: Request) -> Response:
"""
Pyramid view that accepts a request to get the email contact information for a capability request in order to
prevent issues with empty user_email parameters causing QA controls to fail in the UI
URL: capability/request/{capability_request_id}/version/{version_id}/user_email
:param request: GET request
:return: 200 OK response with JSON-formatted user_email field
or 404 response (HTTPNotFound) if the email information couldn't be found
"""
capability_request_id = request.matchdict["capability_request_id"]
version_id = request.matchdict["version_id"]
if version := request.capability_info.lookup_version(capability_request_id, version_id):
if not version.parameters:
not_found_msg = f"No parameters found for version {version_id} of request with ID {capability_request_id}. Couldn't find user_email information."
return HTTPNotFound(detail=not_found_msg)
elif "user_email" in version.parameters and version.parameters["user_email"]:
return Response(status_int=http.HTTPStatus.OK, json_body={"resp": f"{version.parameters['user_email']}"})
# If user_email not populated, search for the PI/CoI emails with the contacts_wrester
elif "product_locator" in version.parameters and version.parameters["product_locator"]:
process_call = ["aat_wrest", "-sc", version.parameters["product_locator"]]
elif "sdmId" in version.parameters and version.parameters["sdmId"]:
process_call = ["aat_wrest", "-si", version.parameters["sdmId"]]
elif (
version.parameters.metadata
and "sdm_id" in version.parameters.metadata
and version.parameters.metadata["sdm_id"]
):
process_call = ["aat_wrest", "-si", version.parameters["sdmId"]]
else:
not_found_msg = f"No user email, product locator, or sdm ID found in version {version_id} of request with ID {capability_request_id}. Cannot find user_email information."
return HTTPNotFound(detail=not_found_msg)
else:
not_found_msg = f"Capability request with ID {capability_request_id} not found."
return HTTPNotFound(detail=not_found_msg)
with subprocess.Popen(
process_call,
stdout=subprocess.PIPE,
) as project_data:
data = project_data.communicate()[0]
project = json.loads(data)
project_code = project["projectCode"]
with subprocess.Popen(
["contacts_wrest", project_code],
stdout=subprocess.PIPE,
) as wrested_contacts:
contacts = wrested_contacts.communicate()[0]
if "ERROR" in contacts.decode():
err_msg = f"Contact wrester failed for project {project_code}"
return HTTPPreconditionFailed(err_msg)
email_list = json.loads(contacts.decode())
user_email = email_list
if isinstance(email_list, list):
user_email = ", ".join(email_list)
logger.info(f"get_user_email endpoint retrieved: {user_email}")
# We only send emails to the PI/CoI in production!
email_pi = CapoConfig().getboolean("edu.nrao.workspaces.NotificationSettings.sendPIEmail")
if not email_pi:
user_email = CapoConfig().settings(NOTIF_SETTINGS_KEY).analystEmail
return Response(status_int=http.HTTPStatus.OK, json_body={"resp": f"{user_email}"})
......@@ -44,6 +44,7 @@ from workspaces.workflow.services.workflow_info import WorkflowInfo
from workspaces.workflow.services.workflow_service import (
WorkflowMessageHandler,
WorkflowService,
subprocess,
)
logger = logging.getLogger(__name__)
......@@ -151,9 +152,7 @@ class WorkflowWorkingDirRestService:
:return: None
"""
requested_workflow = self.request.info.lookup_workflow_request(
self.request.matchdict["request_id"]
)
requested_workflow = self.request.info.lookup_workflow_request(self.request.matchdict["request_id"])
results_path = requested_workflow.results_dir
parent_paths = [requested_workflow.results_dir]
......@@ -171,15 +170,11 @@ class WorkflowWorkingDirRestService:
@view_config(request_method="GET", route_name="serve_weblog", renderer="json")
def serve_weblog(self):
requested_workflow = self.request.info.lookup_workflow_request(
self.request.matchdict["request_id"]
)
requested_workflow = self.request.info.lookup_workflow_request(self.request.matchdict["request_id"])
results_path = Path(requested_workflow.results_dir)
index_path_list = list(results_path.glob("products/pipeline-*/html/index.html"))
failed_weblog_path_list = list(
results_path.glob("working/pipeline-*/html/index.html")
)
failed_weblog_path_list = list(results_path.glob("working/pipeline-*/html/index.html"))
if index_path_list and len(index_path_list) == 1:
index_path = index_path_list[0]
elif failed_weblog_path_list and len(failed_weblog_path_list) == 1:
......@@ -199,14 +194,10 @@ class WorkflowWorkingDirRestService:
@view_config(request_method="GET", route_name="get_qa_notes", renderer="json")
def get_qa_notes(self):
requested_workflow = self.request.info.lookup_workflow_request(
self.request.matchdict["request_id"]
)
requested_workflow = self.request.info.lookup_workflow_request(self.request.matchdict["request_id"])
results_path = Path(requested_workflow.results_dir)
qa_notes_list = list(
results_path.glob("products/pipeline-*/html/qa_notes.html")
)
qa_notes_list = list(results_path.glob("products/pipeline-*/html/qa_notes.html"))
if qa_notes_list and len(qa_notes_list) == 1:
qa_notes_path = qa_notes_list[0]
else:
......@@ -221,20 +212,14 @@ class WorkflowWorkingDirRestService:
with open(qa_notes_path, "r") as qa_notes:
qa_notes_text = qa_notes.read()
return Response(
status_int=http.HTTPStatus.OK, json_body={"resp": f"{qa_notes_text}"}
)
return Response(status_int=http.HTTPStatus.OK, json_body={"resp": f"{qa_notes_text}"})
@view_config(request_method="POST", route_name="get_qa_notes", renderer="json")
def save_qa_notes(self):
requested_workflow = self.request.info.lookup_workflow_request(
self.request.matchdict["request_id"]
)
requested_workflow = self.request.info.lookup_workflow_request(self.request.matchdict["request_id"])
results_path = Path(requested_workflow.results_dir)
qa_notes_list = list(
results_path.glob("products/pipeline-*/html/qa_notes.html")
)
qa_notes_list = list(results_path.glob("products/pipeline-*/html/qa_notes.html"))
if qa_notes_list and len(qa_notes_list) == 1:
qa_notes_path = qa_notes_list[0]
else:
......@@ -249,21 +234,15 @@ class WorkflowWorkingDirRestService:
# sanitize input before writing/persisting
# \\u0000 is an invalid character that is incompatible with postgres json columns
# from StackOverflow: https://stackoverflow.com/questions/63092267/how-to-handle-api-responsesjson-containing-x00-or-u0000-in-its-data-and-s
edits = json.loads(json.dumps(self.request.json_body["edits"])).replace(
"\\u0000", ""
)
edits = json.loads(json.dumps(self.request.json_body["edits"])).replace("\\u0000", "")
qa_notes.write(edits)
return Response(
status_int=http.HTTPStatus.OK,
json_body={
"resp": f"Edits made to QA notes file in workflow {self.request.matchdict['request_id']}."
},
json_body={"resp": f"Edits made to QA notes file in workflow {self.request.matchdict['request_id']}."},
)
@view_config(
request_method="GET", route_name="serve_carta_wrapper", renderer="json"
)
@view_config(request_method="GET", route_name="serve_carta_wrapper", renderer="json")
def serve_carta_wrapper(self):
"""
Dish up some HTML containing the CARTA URL in a frame.
......@@ -271,13 +250,9 @@ class WorkflowWorkingDirRestService:
:return:
"""
path = Path(
f"/lustre/aoc/cluster/pipeline/docker/workspaces/html/{self.request.matchdict['request_id']}"
)
path = Path(f"/lustre/aoc/cluster/pipeline/docker/workspaces/html/{self.request.matchdict['request_id']}")
carta_html_file = list(path.iterdir())[0]
return FileResponse(
carta_html_file, request=self.request, content_type="text/html"
)
return FileResponse(carta_html_file, request=self.request, content_type="text/html")
@view_config(route_name="get_healthcheck", renderer="json")
def get_healthcheck(self) -> Response:
......@@ -288,9 +263,7 @@ class WorkflowWorkingDirRestService:
"""
return Response(
status=http.HTTPStatus.OK,
json_body={
"healthcheck": f"Workflow service returned {http.HTTPStatus.OK}"
},
json_body={"healthcheck": f"Workflow service returned {http.HTTPStatus.OK}"},
)
def generate_working_directory_dict(self, results_path, parent_paths) -> dict:
......@@ -317,13 +290,7 @@ class WorkflowWorkingDirRestService:
# check if url needs a slash to divide paths
divider = ("/", "")[self.request.current_route_url().endswith("/")]
content_key.update(
{
key.name: {
"url": self.request.current_route_url() + divider + key.name
}
}
)
content_key.update({key.name: {"url": self.request.current_route_url() + divider + key.name}})
# add full path for content
content_key[key.name].update({"full_path": key.absolute().__str__()})
......@@ -339,9 +306,7 @@ class WorkflowWorkingDirRestService:
# if it is a directory, create a json object
workdir_json = json.dumps(workdir_dict, indent=2)
# create response with the json object as the body
response = Response(
body=workdir_json, request=self.request, content_type="text/json"
)
response = Response(body=workdir_json, request=self.request, content_type="text/json")
else:
# if it is not a directory, serve the static file
response = FileResponse(
......@@ -352,15 +317,11 @@ class WorkflowWorkingDirRestService:
def generate_url_from_path(self, root_path, results_path):
current_url = self.request.current_route_url()
return current_url.replace("/weblog", "/dir") + root_path.replace(
results_path, ""
)
return current_url.replace("/weblog", "/dir") + root_path.replace(results_path, "")
def generate_qa_notes_path(self, root_path, results_path):
current_url = self.request.current_route_url()
return current_url.replace("/qa_notes", "/dir") + root_path.replace(
results_path, ""
)
return current_url.replace("/qa_notes", "/dir") + root_path.replace(results_path, "")
@view_defaults(route_name="workflow_request", renderer="json")
......@@ -392,9 +353,7 @@ class WorkflowRequestRestService:
# Most common case: Empty body for simple requests, continue with an empty dict
argument_body = {}
return self.request.info.create_workflow_request(
self.request.context.workflow_name, argument_body
)
return self.request.info.create_workflow_request(self.request.context.workflow_name, argument_body)
@view_config(request_method="POST", route_name="submit_workflow_request")
def submit_workflow(self):
......@@ -434,11 +393,7 @@ class WorkflowRequestRestService:
file = lookup_file(request=self.request)
# 2. create ingestion workflow request
ingest_type = (
"ingest_cal"
if "calibration" in self.request.matchdict["name"]
else "ingest_image"
)
ingest_type = "ingest_cal" if "calibration" in self.request.matchdict["name"] else "ingest_image"
ingest_request = self.request.info.create_workflow_request(
workflow=ingest_type,
argument={"parent_wf_request_id": self.request.matchdict["request_id"]},
......@@ -464,9 +419,7 @@ class WorkflowRequestRestService:
argument=self.request.json_body,
)
self.request.workflows.execute(ingestion_request)
return Response(
status_code=http.HTTPStatus.OK, json_body=ingestion_request.__json__()
)
return Response(status_code=http.HTTPStatus.OK, json_body=ingestion_request.__json__())
@view_config(request_method="POST", route_name="abort_workflow_request")
def abort(self):
......@@ -547,16 +500,10 @@ class WorkflowRequestRestService:
"""
body = self.request.json_body
identifier = (
int(body["request_id"]) if "request_id" in body else body["project_code"]
)
identifier = int(body["request_id"]) if "request_id" in body else body["project_code"]
msg_type = self.request.matchdict["msg_type"]
additional = (
body["project_code"]
if "project_code" in body and identifier != body["project_code"]
else None
)
additional = body["project_code"] if "project_code" in body and identifier != body["project_code"] else None
self.request.workflows.message_archive(identifier, msg_type, additional)
return Response(
......@@ -591,22 +538,16 @@ class WorkflowRequestRestService:
request_id = self.request.matchdict["request_id"]
self.request.workflows.send_forced_fail(request_id)
@view_config(
request_method="GET", route_name="workflow_request_htcondor_id", renderer="json"
)
@view_config(request_method="GET", route_name="workflow_request_htcondor_id", renderer="json")
def get_request_htcondor_id(self):
"""
Pyramid view that gives back the HTCondor job ID for a given workflow request
:return: HTTP response with HTCondor job ID in the body
"""
requested_workflow = self.request.info.lookup_workflow_request(
self.request.matchdict["request_id"]
)
requested_workflow = self.request.info.lookup_workflow_request(self.request.matchdict["request_id"])
return Response(
json_body={"htcondor_job_id": str(requested_workflow.htcondor_job_id)}
)
return Response(json_body={"htcondor_job_id": str(requested_workflow.htcondor_job_id)})
@view_config(request_method="GET", route_name="list_stale_requests")
def get_stale_requests(self):
......@@ -752,18 +693,16 @@ def prometheus_route_timing_factory(handler, registry):
# if timing support is enabled, return a wrapper
global http_requests
# Prometheus logging
http_requests = prometheus_client.Summary(
"http_request_timing", "HTTP Requests", ["status_code", "route_name"]
)
http_requests = prometheus_client.Summary("http_request_timing", "HTTP Requests", ["status_code", "route_name"])
def prometheus_route_timer(request):
start = time.time()
response = handler(request)
end = time.time()
if request.matched_route:
http_requests.labels(
status_code=response.status_code, route_name=request.matched_route.name
).observe(end - start)
http_requests.labels(status_code=response.status_code, route_name=request.matched_route.name).observe(
end - start
)
return response
return prometheus_route_timer
......@@ -795,9 +734,7 @@ def main(global_config, **settings):
# we need to build a workflow_info here for the message handler, but
# we won't use it anywhere else, we will make new ones per-request
workflow_info = WorkflowInfo(
get_tm_session(session_factory, transaction.manager)
)
workflow_info = WorkflowInfo(get_tm_session(session_factory, transaction.manager))
message_handler = WorkflowMessageHandler(workflow_info)
workflow_recover = MonitorRecover(workflow_info)
......
......@@ -62,7 +62,7 @@ metadata_json = """{
def upgrade():
op.execute(
f"""
INSERT INTO workflows (workflow_name) VALUES (E'{wf_name}')
INSERT INTO workflows (workflow_name, requires_lustre) VALUES (E'{wf_name}', true)
"""
)
......
"""mark4_import workflow
Revision ID: 61cbcd1d83f7
Revises: 91091612b6d0
Create Date: 2023-09-27 09:46:03.860605
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '61cbcd1d83f7'
down_revision = '91091612b6d0'
branch_labels = None
depends_on = None
wf_name = "mk_four_import"
mk_four_import_condor = """executable = mk_four_import.sh
arguments = {{data_src}}
output = import.out
error = import.err
log = condor.log
BIN_PATH = $ENV(HOME)/workflows/$ENV(CAPO_PROFILE)/bin
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
SPOOL_DIR = {{spool_dir}}
should_transfer_files = yes
transfer_input_files = $ENV(HOME)/.ssh/condor_transfer, nraorsync://$(SBIN_PATH)/pycapo, $(BIN_PATH)/mark4_import
+WantIOProxy = True
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
mk_four_import_sh = """#!/bin/sh
set -o errexit
./mark4_import
"""
def upgrade():
op.execute(
f"""
INSERT INTO workflows (workflow_name) VALUES (E'{wf_name}')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('mk_four_import.condor', E'{mk_four_import_condor}', E'{wf_name}')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('mk_four_import.sh', E'{mk_four_import_sh}', E'{wf_name}')
"""
)
def downgrade():
op.execute(
f"""
DELETE FROM workflow_templates WHERE workflow_name = E'{wf_name}'
"""
)
op.execute(
f"""
DELETE FROM workflows WHERE workflow_name = E'{wf_name}'
"""
)
"""mark4 workflow renaming
Revision ID: 91091612b6d0
Revises: d20ceed949b3
Create Date: 2023-09-22 13:34:54.054114
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '91091612b6d0'
down_revision = 'd20ceed949b3'
branch_labels = None
depends_on = None
old_ingest_mk_four_obs_sh = """#!/bin/sh
set -o errexit
# Get NGAS hosts and set up variables to randomly select one
NGASHOSTSTR=$(./pycapo -q archive-ingestion.NGASHosts)
NGASHOSTARR=(`/bin/echo ${NGASHOSTSTR}`) # Put the space-delimited host list into an array
NGASHOSTLEN=${#NGASHOSTARR[@]}
# Copy from the difx area to the Workspaces staging area
WSSTAGINGDIR=$(./pycapo -q edu.nrao.workspaces.IngestionSettings.stagingDirectory)
/bin/cp -r $1 $WSSTAGINGDIR
OBSDIR=$(/bin/basename $1)
/bin/chmod -R 750 $WSSTAGINGDIR/$OBSDIR # Make sure NGAS has permissions to ingest the files
cd $WSSTAGINGDIR/$OBSDIR
for FILE in *; do
# Pick random NGAS host to distribute the ingestion load
NGASINDEX=$(($RANDOM % $NGASHOSTLEN))
NGASHOST=${NGASHOSTARR[$NGASINDEX]}
FULLPATH=$(/bin/readlink -f $FILE)
NGASCMD="${NGASHOST}ARCHIVE?filename=file://${FULLPATH}"
INGESTOUT=$(/bin/curl $NGASCMD)
if echo $INGESTOUT | grep -i "error"; then
echo "Failed to ingest ${FILE}"
exit 1
fi
done"""
# Workflow will rename the directory to indicate the ingestion status
new_ingest_mk_four_obs_sh = """#!/bin/sh
set -o errexit
ingested=$1.ingested
failed=$1.failed
# Get NGAS hosts and set up variables to randomly select one
NGASHOSTSTR=$(./pycapo -q archive-ingestion.NGASHosts)
NGASHOSTARR=(`/bin/echo ${NGASHOSTSTR}`) # Put the space-delimited host list into an array
NGASHOSTLEN=${#NGASHOSTARR[@]}
# Copy from the difx area to the Workspaces staging area
WSSTAGINGDIR=$(./pycapo -q edu.nrao.workspaces.IngestionSettings.stagingDirectory)
/bin/cp -r $1 $WSSTAGINGDIR
OBSDIR=$(/bin/basename $1)
/bin/chmod -R 750 $WSSTAGINGDIR/$OBSDIR # Make sure NGAS has permissions to ingest the files
cd $WSSTAGINGDIR/$OBSDIR
for FILE in *; do
# Pick random NGAS host to distribute the ingestion load
NGASINDEX=$(($RANDOM % $NGASHOSTLEN))
NGASHOST=${NGASHOSTARR[$NGASINDEX]}
FULLPATH=$(/bin/readlink -f $FILE)
NGASCMD="${NGASHOST}ARCHIVE?filename=file://${FULLPATH}"
INGESTOUT=$(/bin/curl $NGASCMD)
if echo $INGESTOUT | grep -i "error"; then
echo "Failed to ingest ${FILE}"
mv $1 $failed
exit 1
fi
done
mv $1 $ingested
"""
def upgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{new_ingest_mk_four_obs_sh}'
WHERE filename='ingest_mk_four_obs.sh'
"""
)
def downgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_ingest_mk_four_obs_sh}'
WHERE filename='ingest_mk_four_obs.sh'
"""
)
"""make sure qa fail custom text is in srdp and non srdp sections
Revision ID: d20ceed949b3
Revises: 569416c40ca8
Create Date: 2023-09-29 12:27:27.449396
"""
from pathlib import Path
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision = "d20ceed949b3"
down_revision = "569416c40ca8"
branch_labels = None
depends_on = None
template_name = "std_calibration_complete"
old_content = (Path.cwd() / "versions" / "templates" / "emails" / "std_calibration_complete_newer.txt").read_text()
new_content = (
Path.cwd() / "versions" / "templates" / "emails" / "std_calibration_complete_non_srdp_custom_text.txt"
).read_text()
def upgrade():
op.execute(
f"""
UPDATE notification_templates
SET template=E'{new_content}'
WHERE name=E'{template_name}'
"""
)
def downgrade():
op.execute(
f"""
UPDATE notification_templates
SET template=E'{old_content}'
WHERE name=E'{template_name}'
"""
)
{{#version}}{{#parameters}}{{#metadata}}{{#is_srdp}}Subject: Calibration complete for {{sdm_id}}
Hello,
One of your Scheduling Blocks,
{{sdm_id}}, observed on {{obs_end_time}}, {{bands}}, <filesize>
has been processed through the VLA CASA Calibration Pipeline using CASA 6.4.1, which is designed to handle Stokes I continuum data, and is now available should you wish to access the calibrated data.
These results have been checked by NRAO staff and notes about the quality assurance are below.
{{#custom_text}}{{custom_text}}{{/custom_text}}{{^custom_text}}The results of this data processing request passed our quality assurance, with the following notes:{{/custom_text}}
The following notes are also in the weblog:
{{#workflow_metadata}}{{#qa}}{{qa}}{{/qa}}{{^qa}}No QA notes available.{{/qa}}{{/workflow_metadata}}{{^workflow_metadata}}No QA notes available.{{/workflow_metadata}}
Accessing Pipeline Products:
There are two main pipeline products you may be interested in: the calibrated Measurement Set (MS) or the smaller pipeline product tar file that includes the weblog, final calibration and flag tables, a restore script, and other small files related to the calibration.
For the entire Calibrated MS and pipeline tar file: you will need to request a calibrated MS (CMS) from our new archive access tool at https://data.nrao.edu . To access the pipeline products, please follow these steps:
Click the "Log in" link at the top right of this page and select the NRAO login, which should take you to a page where you will enter your my.nrao.edu login information.
Once logged in, you should be able to access your projects, both public and proprietary by selecting <yourusername>\'s data: navigate to the desired project and click the "+" symbol at the left to expand the list of available SBs.
Find the specific observation you want and if you would like to recreate the pipeline calibration, click the "Add to clipboard" button to the left of that observation.
Click the "Download" button at the top of the project list.
In the pop-up window that opens, leave the default options the same: this should have the "Choose download data format:" option to "Calibrated Measurement Set", and the "Restore previous CMS" option should be filled in with a tar file named something like project-code_YYYY_MM_DD_THH_MM_SS.SSS.tar. Note that these two options are unavailable when more than one archive file is added to the clipboard. If the calibrated measurement set is needed for multiple scheduling blocks they must be downloaded one at a time. Since this calibration was done with CASA 6.2.1-7 leave the "CASA Version:" drop-down menu at this version. Now click the "Submit Request" button.
Once ready, you should receive an email with download instructions. This restoration of the calibrated MS may take several hours or longer, depending on the specifics of your observation.
=====
For just the pipeline tar file (includes the weblog, final flag versions, final calibration tables, restore script, and other related small files)
Click the "Log in" link at the top right of this page and select the NRAO login, which should take you to a page where you will enter your my.nrao.edu login information.
Once logged in, you should be able to access your projects, both public and proprietary: navigate to the desired project and click the "+" symbol at the left to expand the list of available SBs.
Find the specific observation you want and if you would like to recreate the pipeline calibration, click the icon in the "Cals" column near the right side of the returned list, which should bring up a pop-up window to download a tar file named something like project-code_YYYY_MM_DD_THH_MM_SS.SSS.tar, and click "Submit Request". Once ready, you should receive an email with download instructions.
____________
For more information about the pipeline, including instructions for rerunning the pipeline, applying pipeline calibration to raw data, or modification to suit your particular science goals, or access to the scripted pipeline please visit our pipeline web page: https://science.nrao.edu/facilities/vla/data-processing/pipeline
For more information about the SRDP project, please see https://science.nrao.edu/srdp
Please let us know if you have any questions or concerns through the NRAO Helpdesk (https://help.nrao.edu/) , using the VLA Pipeline department for questions about the pipeline processing, the VLA/VLBA Archive and Data Retrieval department for questions about data retrieval, and the VLA Data Products department for questions about quality assurance and the use of of Science-Ready (SRDP) products.{{/is_srdp}}{{^is_srdp}}Subject: Calibration complete for {{sdm_id}}
Hello,
One of your Scheduling Blocks has been processed through the VLA CASA Calibration Pipeline using CASA 6.4.1, which is designed to handle Stokes I continuum data, and is now available should you wish to access the calibrated data.
{{sdm_id}}, observed on {{obs_end_time}}, {{bands}}, <filesize>
These results have been checked by NRAO staff and notes about the quality assurance are below.
{{#custom_text}}{{custom_text}}{{/custom_text}}{{^custom_text}}The results of this data processing request passed our quality assurance, with the following notes:{{/custom_text}}
The following notes are also in the weblog:
{{#workflow_metadata}}{{#qa}}{{qa}}{{/qa}}{{^qa}}No QA notes available.{{/qa}}{{/workflow_metadata}}{{^workflow_metadata}}No QA notes available.{{/workflow_metadata}}
These products are not included in NRAO\'s Science Ready Data Products (SRDP) program but have been checked by NRAO staff for general quality.
Some data may need further flagging before imaging: please check your data and the target calibration carefully before using the output from the pipeline for science!
If your observations used a resolved calibrator source, but does not have standard model images in CASA, the pipeline calibration would not be accurate. In such instances, re-calibration using UV limits, or imaging the calibrator first and using the resulting image model, will be needed.
If your science involves spectral lines, you should be aware of the following:
1) The pipeline applies Hanning-smoothing by default, which may make the calibrated data set not optimal for certain spectral-line science.
2) During the calibration process, several edge channels in each sub-band get flagged by default because they are noisier. Therefore, breaks in the frequency span get introduced in the pipeline calibrated data, which in turn may make the output not suitable for certain spectral-line science.
3) The pipeline runs an RFI flagging algorithm which should flag strong lines and may remove spectral lines of interest to your science.
Accessing Pipeline Products:
There are two main pipeline products you may be interested in: the calibrated Measurement Set (MS) or the smaller pipeline product tar file that includes the weblog, final calibration and flag tables, a restore script, and other small files related to the calibration.
For the entire Calibrated MS and pipeline tar file: you will need to request a calibrated MS (CMS) from our new archive access tool at https://data.nrao.edu . To access the pipeline products, please follow these steps:
Click the "Log in" link at the top right of this page and select the NRAO login, which should take you to a page where you will enter your my.nrao.edu login information.
Once logged in, you should be able to access your projects, both public and proprietary by selecting <yourusername>\'s data: navigate to the desired project and click the "+" symbol at the left to expand the list of available SBs.
Find the specific observation you want and if you would like to recreate the pipeline calibration, click the "Add to clipboard" button to the left of that observation.
Click the "Download" button at the top of the project list.
In the pop-up window that opens, leave the default options the same: this should have the "Choose download data format:" option to "Calibrated Measurement Set", and the "Restore previous CMS" option should be filled in with a tar file named something like project-code_YYYY_MM_DD_THH_MM_SS.SSS.tar. Note that these two options are unavailable when more than one archive file is added to the clipboard. If the calibrated measurement set is needed for multiple scheduling blocks they must be downloaded one at a time. Since this calibration was done with CASA , 6.2.1-7 leave the "CASA Version:" drop-down menu at this version. Now click the "Submit Request" button.
Once ready, you should receive an email with download instructions. This restoration of the calibrated MS may take several hours or longer, depending on the specifics of your observation.
=====
For just the pipeline tar file (includes the weblog, final flag versions, final calibration tables, restore script, and other related small files)
Click the "Log in" link at the top right of this page and select the NRAO login, which should take you to a page where you will enter your my.nrao.edu login information.
Once logged in, you should be able to access your projects, both public and proprietary: navigate to the desired project and click the "+" symbol at the left to expand the list of available SBs.
Find the specific observation you want and if you would like to recreate the pipeline calibration, click the icon in the "Cals" column near the right side of the returned list, which should bring up a pop-up window to download a tar file named something like project-code_YYYY_MM_DD_THH_MM_SS.SSS.tar, and click "Submit Request". Once ready, you should receive an email with download instructions.
More Information:
For more information about the pipeline, including access to the scripted pipeline, instructions for rerunning the pipeline, applying pipeline calibration to raw data, or modification to suit your particular science goals, please go here:
https://science.nrao.edu/facilities/vla/data-processing/pipeline
Please let us know if you have any questions or concerns through the VLA Pipeline department of the Helpdesk at:
https://help.nrao.edu/{{/is_srdp}}{{/metadata}}{{/parameters}}{{/version}}
---
ref: ws
name: Workspaces Integration
description: Actions and sensors for integrating with the NRAO archive and workspaces systems
keywords:
- nrao
version: 3.7.0
python_versions:
- "3"
dependencies:
- core
author: Daniel Lyons
email: dlyons@nrao.edu
# Optional list of additional contributors to the pack.
contributors:
- "Nathan Bockisch <nbockisc@nrao.edu>"
- "Charlotte Hausman <chausman@nrao.edu>"
- "Jim Sheckard <jsheckar@nrao.edu>"
#
# Copyright (C) 2021 Associated Universities, Inc. Washington DC, USA.
#
# This file is part of NRAO Workspaces.
#
# Workspaces is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Workspaces is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
from pathlib import Path
from st2reactor.sensor.base import PollingSensor
class DirectoryWatcher(PollingSensor):
"""
A Sensor that watches a directory for changes.
Works by keeping track of what files it has alerted for already, in memory (so will realert on restart).
"""
def setup(self):
self._logger = self._sensor_service.get_logger(__name__)
# We keep a dictionary of all the watchers in here so that we can update/remove them easily
self.triggers = {}
def poll(self):
# If we have no triggers, there's nothing for us to do but return
if not self.triggers:
self._logger.info("DirectoryWatcher: No triggers are configured; nothing to do.")
# If we have triggers, examine the filesystem for each one
for monitor in self.triggers.values():
monitor.check(self._sensor_service)
def cleanup(self):
# This is called when the st2 system goes down. We don't currently need
# to release any resources here
pass
def add_trigger(self, trigger):
# This method is called when someone creates a new trigger based on the ws.new_files trigger.
self._logger.info("DirectoryWatcher: Adding trigger")
directory = trigger["parameters"].get("directory")
self.triggers[directory] = DirectoryMonitor(self._logger, trigger)
def update_trigger(self, trigger):
# This method is called when trigger is updated. This usually doesn't matter to us.
directory = trigger["parameters"].get("directory")
self.triggers[directory] = DirectoryMonitor(self._logger, trigger)
def remove_trigger(self, trigger):
# This method is called when trigger is deleted
self._logger.info("DirectorWatcher: Removing trigger")
directory = trigger["parameters"].get("directory")
del self.triggers[directory]
class DirectoryMonitor:
"""
Helper class. Watches a single directory for changes.
"""
def __init__(self, logger, trigger):
self._logger = logger
self.trigger = trigger
self.path = Path(self.trigger["parameters"].get("directory"))
self.seen = set()
def check(self, sensor_service) -> [Path]:
"""
Check for new files.
:param sensor_service: used to dispatch findings if there are any
:return: nothing
"""
files_found = set(file for file in self.path.glob("*")) - self.seen
# do we have any files to report? if not, we can simply return here
# since we haven't triggered, there's no need to update the state value
if not files_found:
self._logger.info(f"DirectoryWatcher: No new files found in {self.path}")
return
# let's log
self._logger.info(f"DirectoryWatcher: {len(files_found)} new files found in {self.path}")
# report what we have found
result = dict(directory=str(self.path), files=[str(file.absolute()) for file in files_found])
sensor_service.dispatch(trigger=self.trigger, payload=result)
# save what we have found
self.seen |= files_found
return files_found
@staticmethod
def latest_change(file: Path) -> float:
"""
Return the last change time for this file, whether that's a ctime or an mtime.
Return as seconds since the epoch, with nanoseconds.
:param file: file to test
:return: change timestamp as float
"""
stat = file.stat()
return max(stat.st_mtime_ns, stat.st_ctime_ns)
---
class_name: "DirectoryWatcher"
entry_point: "directory_watcher.py"
description: "Watches a directory and emits events whenever the contents of that directory change."
trigger_types:
- name: new_files
description: "New files have arrived in the directory"
parameters_schema:
type: object
properties:
directory:
description: The directory to monitor for changes
type: string
required: true
additionalProperties: false
payload_schema:
type: object
description: A directory and the files that have appeared or changed in that directory since the last check
properties:
directory:
description: "The directory where the files appeared"
type: string
required: true
files:
description: "The list of files that are now in the directory"
type: array
items:
type: string
minItems: 1
uniqueItems: true
required: true