Skip to content
Snippets Groups Projects
Commit afca70d8 authored by Daniel Nemergut's avatar Daniel Nemergut
Browse files

Merge branch 'delete_stackstorm_prototype' into 'main'

Delete stackstorm prototype

See merge request !1457
parents 20289da1 63c14135
No related branches found
No related tags found
1 merge request!1457Delete stackstorm prototype
Pipeline #11798 passed
---
ref: ws
name: Workspaces Integration
description: Actions and sensors for integrating with the NRAO archive and workspaces systems
keywords:
- nrao
version: 3.7.0
python_versions:
- "3"
dependencies:
- core
author: Daniel Lyons
email: dlyons@nrao.edu
# Optional list of additional contributors to the pack.
contributors:
- "Nathan Bockisch <nbockisc@nrao.edu>"
- "Charlotte Hausman <chausman@nrao.edu>"
- "Jim Sheckard <jsheckar@nrao.edu>"
#
# Copyright (C) 2021 Associated Universities, Inc. Washington DC, USA.
#
# This file is part of NRAO Workspaces.
#
# Workspaces is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Workspaces is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
from pathlib import Path
from st2reactor.sensor.base import PollingSensor
class DirectoryWatcher(PollingSensor):
"""
A Sensor that watches a directory for changes.
Works by keeping track of what files it has alerted for already, in memory (so will realert on restart).
"""
def setup(self):
self._logger = self._sensor_service.get_logger(__name__)
# We keep a dictionary of all the watchers in here so that we can update/remove them easily
self.triggers = {}
def poll(self):
# If we have no triggers, there's nothing for us to do but return
if not self.triggers:
self._logger.info("DirectoryWatcher: No triggers are configured; nothing to do.")
# If we have triggers, examine the filesystem for each one
for monitor in self.triggers.values():
monitor.check(self._sensor_service)
def cleanup(self):
# This is called when the st2 system goes down. We don't currently need
# to release any resources here
pass
def add_trigger(self, trigger):
# This method is called when someone creates a new trigger based on the ws.new_files trigger.
self._logger.info("DirectoryWatcher: Adding trigger")
directory = trigger["parameters"].get("directory")
self.triggers[directory] = DirectoryMonitor(self._logger, trigger)
def update_trigger(self, trigger):
# This method is called when trigger is updated. This usually doesn't matter to us.
directory = trigger["parameters"].get("directory")
self.triggers[directory] = DirectoryMonitor(self._logger, trigger)
def remove_trigger(self, trigger):
# This method is called when trigger is deleted
self._logger.info("DirectorWatcher: Removing trigger")
directory = trigger["parameters"].get("directory")
del self.triggers[directory]
class DirectoryMonitor:
"""
Helper class. Watches a single directory for changes.
"""
def __init__(self, logger, trigger):
self._logger = logger
self.trigger = trigger
self.path = Path(self.trigger["parameters"].get("directory"))
self.seen = set()
def check(self, sensor_service) -> [Path]:
"""
Check for new files.
:param sensor_service: used to dispatch findings if there are any
:return: nothing
"""
files_found = set(file for file in self.path.glob("*")) - self.seen
# do we have any files to report? if not, we can simply return here
# since we haven't triggered, there's no need to update the state value
if not files_found:
self._logger.info(f"DirectoryWatcher: No new files found in {self.path}")
return
# let's log
self._logger.info(f"DirectoryWatcher: {len(files_found)} new files found in {self.path}")
# report what we have found
result = dict(directory=str(self.path), files=[str(file.absolute()) for file in files_found])
sensor_service.dispatch(trigger=self.trigger, payload=result)
# save what we have found
self.seen |= files_found
return files_found
@staticmethod
def latest_change(file: Path) -> float:
"""
Return the last change time for this file, whether that's a ctime or an mtime.
Return as seconds since the epoch, with nanoseconds.
:param file: file to test
:return: change timestamp as float
"""
stat = file.stat()
return max(stat.st_mtime_ns, stat.st_ctime_ns)
---
class_name: "DirectoryWatcher"
entry_point: "directory_watcher.py"
description: "Watches a directory and emits events whenever the contents of that directory change."
trigger_types:
- name: new_files
description: "New files have arrived in the directory"
parameters_schema:
type: object
properties:
directory:
description: The directory to monitor for changes
type: string
required: true
additionalProperties: false
payload_schema:
type: object
description: A directory and the files that have appeared or changed in that directory since the last check
properties:
directory:
description: "The directory where the files appeared"
type: string
required: true
files:
description: "The list of files that are now in the directory"
type: array
items:
type: string
minItems: 1
uniqueItems: true
required: true
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment