Skip to content
Snippets Groups Projects
Commit aa0e8e47 authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Merge branch 'downloadIndication' into '2.8.2-DEVELOPMENT'

Add the FETCH stage completion state to the workflow_metadata field on the capability version

See merge request !1417
parents 35e2e202 6afa8f38
No related branches found
No related tags found
2 merge requests!1452Merge 2.8.2 to main,!1417Add the FETCH stage completion state to the workflow_metadata field on the capability version
Pipeline #11323 passed
......@@ -86,6 +86,7 @@
<span><i class="text-dark small fas fa-filter"></i></span>
</button>
</th>
<th>NGAS Download Status</th>
<th>SDM ID</th>
<th>Bands</th>
<th>Array Configuration</th>
......@@ -148,6 +149,7 @@
<td [ngClass]="{'stage1-status': getExecutionStatusName(request) === 'Awaiting QA',
'execution-status': getExecutionStatusName(request) === 'Executing',
'stage2-status': getExecutionStatusName(request) === 'Stage 2 Review'}">{{ getExecutionStatusName(request) }}</td>
<td>{{ getFetchStatus(request) }}</td>
<td>{{ getMetadata(request).sdm_id }}</td>
<td>{{ getMetadata(request).bands ? getMetadata(request).bands.split(' ').join(', ') : "" }}</td>
<td>{{ getMetadata(request).array_config }}</td>
......
......@@ -716,4 +716,19 @@ export class ActiveCapabilityRequestsComponent implements OnInit, OnDestroy {
trackActiveRequests(index: number, activeRequest: any) {
return activeRequest.id;
}
getFetchStatus(request: CapabilityRequest): string {
// figure out the latest version
if (request.versions.length == 0)
return "Not started";
let version = request.versions[request.versions.length - 1];
if (version.workflow_metadata && version.workflow_metadata.fetch_end_time)
return "Complete";
else if (version.workflow_metadata && version.workflow_metadata.fetch_start_time)
return "In Progress";
else
return "";
}
}
......@@ -274,6 +274,47 @@ class CapabilityService(CapabilityServiceIF):
"""
logger.debug(f"Received {message['type']} (no action taken): {message}")
@on_message(service="workflow", type="workflow-updated")
def update_fetch_stage(self, **message: Dict):
# In here, we want to track whether or not the fetch start time is complete
# There are going to be various messages from the workflow. Most of the time we won't care about them.
msg_subject = message["subject"]
if msg_subject["type"] == "WorkflowRequest":
# Figure out what stage we're discussing—that will determine whether we care about the log message
parse_log = message["condor_metadata"]["log"].split("Stage", 1)[1].split("has", 1)
stage = parse_log[0].strip()
logger.debug("Got an update for stage %s", stage)
# If it's the FETCH stage, we care
if stage == "FETCH":
logger.debug("Updating the fetch start or stop time")
# Since we care, we should look up the execution for this
workflow_request_id = msg_subject["workflow_request_id"]
execution = self.capability_info.lookup_execution_by_workflow_request_id(workflow_request_id)
version = execution.version
# Pull out the metadata, or make a blank dictionary because we're about to add to it
# sqlalchemy does not detect in-place mutations of JSON
# https://docs.sqlalchemy.org/en/14/core/type_basics.html#sqlalchemy.types.JSON
# to get past this we use a deepcopy of version.workflow_metadata to create a "new" JSON obj
metadata = copy.deepcopy(version.workflow_metadata) if version.workflow_metadata else {}
# Figure out which message "type" this is (Started or Ended)
msg_type = parse_log[1].split("at", 1)[0].strip()
# retrieve the timestamp
timestamp = message["condor_metadata"]["timestamp"]
# record it
metadata[f"fetch_{'start' if msg_type == 'Started' else 'end'}_time"] = timestamp
logger.info("Updating FETCH times in metadata %s on capability version %s", metadata, version)
# update the metadata (this is a no-op unless we just created the dictionary)
version.workflow_metadata = metadata
# save it
self.capability_info.save_version(version)
class CapabilityLauncher:
"""
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment