Skip to content
Snippets Groups Projects
Commit 0199481b authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

setup for directory annihilator

parent aec13a74
No related branches found
No related tags found
1 merge request!844setup for directory annihilator
Pipeline #4611 passed
......@@ -86,3 +86,10 @@ edu.nrao.carta.redisPassword = password
# Sentry.io Settings
edu.nrao.workspaces.SentrySettings.sentry_key = local
# WS Annihilator Settings
edu.nrao.workspaces.AnnihilatorSettings.keepSpoolForDays = 10
edu.nrao.workspaces.AnnihilatorSettings.keepStagingForDays = 10
edu.nrao.workspaces.AnnihilatorSettings.keepStorageForDays = 10
edu.nrao.workspaces.AnnihilatorSettings.keepCacheForDays = 10
......@@ -22,6 +22,7 @@ import json
import logging
import os
from pathlib import Path
from typing import List
import sentry_sdk
import transaction
......@@ -70,8 +71,8 @@ def lookup_workflow(request: Request) -> Workflow:
"""
Look up the workflow definition for this workflow name.
:param request: a workflow request
:return: the workflow info
:param request: an HTTP request
:return: a Workflow object
"""
return request.info.lookup_workflow_definition(request.matchdict["name"])
......@@ -80,8 +81,8 @@ def lookup_request(request: Request) -> WorkflowRequest:
"""
Look up the workflow info for this request
:param request: a workflow request
:return: the workflow info
:param request: an HTTP request
:return: a WorkflowRequest object
"""
return request.info.lookup_workflow_request(request.matchdict["request_id"])
......@@ -90,8 +91,8 @@ def lookup_file(request: Request) -> WorkflowRequestFile:
"""
Get the file info for this request
:param request: a workflow request
:return: JSON blob containing filename and file content for this request, if any
:param request: an HTTP request
:return: a WorkflowRequestFile object containing filename and file content for this request, if any
"""
for file in lookup_request(request).files:
......@@ -491,6 +492,18 @@ class WorkflowRequestRestService:
return Response(json_body={"htcondor_job_id": str(requested_workflow.htcondor_job_id)})
@view_config(request_method="GET", route_name="list_stale_requests")
def get_stale_requests(self):
"""
Pyramid view that returns a list of workflow request processing directories that
have completed processing and are marked for annihilation
:return: HTTP Response with list of directories to annihilate
"""
keep = self.request.matchdict["days"]
return self.request.info.lookup_stale_requests(int(keep))
@view_defaults(route_name="workflow_request_files", renderer="json")
class WorkflowFilesRestService:
......@@ -738,6 +751,11 @@ def main(global_config, **settings):
factory=lookup_request,
)
config.add_route(
"list_stale_requests",
"/workflows/requests/stale/{days}",
)
# Route for healthcheck when bringing up containers
config.add_route(
"get_healthcheck",
......
"""add directory clean up tracking to workflow requests
Revision ID: 0c62d7748b11
Revises: 90a1c99bde38
Create Date: 2022-03-10 14:03:06.004264
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "0c62d7748b11"
down_revision = "90a1c99bde38"
branch_labels = None
depends_on = None
def upgrade():
op.add_column(
"workflow_requests",
sa.Column(
"cleaned",
sa.Boolean,
server_default="False",
nullable=False,
comment="has this processing directory been annihilated?",
),
)
def downgrade():
op.drop_column("workflow_requests", "cleaned")
......@@ -187,6 +187,13 @@ class WorkflowRequest(JSONSerializable):
server_onupdate=sa.func.now(),
nullable=False,
)
cleaned = sa.Column(
"cleaned",
sa.Boolean,
default=False,
comment="has this processing directory been annihilated?",
nullable=False,
)
@property
def created_at_ago(self) -> str:
......@@ -232,6 +239,7 @@ class WorkflowRequest(JSONSerializable):
files=self.files,
created_at=self.created_at.isoformat(),
updated_at=self.updated_at.isoformat(),
cleaned=self.cleaned,
)
@classmethod
......
......@@ -15,6 +15,7 @@
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
import datetime
import logging
from typing import Dict, List, Union
......@@ -53,6 +54,22 @@ class WorkflowInfo(WorkflowInfoIF):
"""
return self.session.query(WorkflowTemplate).filter_by(workflow_name=workflow_name).all()
def lookup_stale_requests(self, keep_days: int) -> List[str]:
"""
Queries the workflow_requests table for all requests that have completed, have not
been cleaned up, and are older that the number of days specified by keep_days
:param keep_days: the number of days to keep directories
:return: a list of strings representing processing directories
"""
return (
self.session.query(WorkflowRequest)
.filter((WorkflowRequest.updated_at + datetime.timedelta(days=keep_days)) < datetime.datetime.now())
.filter_by(cleaned=False)
.all()
)
def all_workflows(self) -> List[Workflow]:
return self.session.query(Workflow).all()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment