Skip to content
Snippets Groups Projects
Commit 46d7d511 authored by Daniel Lyons's avatar Daniel Lyons Committed by Daniel Lyons
Browse files

A prototype and test of another capability queue concept.

parent 9159a5b1
No related branches found
No related tags found
Loading
Pipeline #2698 passed
import abc
from typing import NamedTuple, List
from workspaces.capability.schema_interfaces import CapabilityExecutionIF
class QueueReport(NamedTuple):
"""
I'm a report about what queues exist, how many executions are waiting in the queue,
how many executions are executing in the queue, and how many concurrent executions
are supported.
"""
name: str
submitted: int
executing: int
concurrency_allowed: int
class QueueReporterIF(abc.ABC):
"""
An interface that defines a few report queries that are needed by the queue manager.
"""
@abc.abstractmethod
def report(self) -> List[QueueReport]:
"""
Retrieve a fresh report about what queues are in what states.
"""
pass
@abc.abstractmethod
def next_waiting_execution(self, queue: str) -> CapabilityExecutionIF:
"""
Retrieve the next execution waiting in the named queue.
:param queue: the queue to check
:return: the highest-priority CapabilityExecution
"""
pass
class ExecutionManagerIF(abc.ABC):
"""
A one-shot interface simply for telling another system to begin executing some execution.
"""
@abc.abstractmethod
def execute(self, execution: CapabilityExecutionIF):
pass
class CapabilityQueueManager:
"""
This is a prototype of a new kind of capability queue manager that does not hold onto any
internal state itself. All of its calculations are performed against a certain database
query, which is re-run whenever something "interesting" happens. The API of the class
is basically to provide a way to send signals when "interesting" things happen, so it
can react by checking to see if we're ready to execute something or not.
"""
def __init__(self, queue_reporter: QueueReporterIF, executor: ExecutionManagerIF):
self.queue_reporter = queue_reporter
self.executor = executor
self.startup()
def execution_requested(self):
self.launch_if_availability()
def execution_complete(self):
self.launch_if_availability()
def launch_if_availability(self):
for queue in self.queue_reporter.report():
# how many jobs do we start? if we have an arbitrarily large number, we can have
# as many as there are allowed by the concurrency limit. But some are probably
# already executing, so we deduct the executing number.
#
# What if we don't have enough jobs to fulfill the demand? That's OK, we won't
# launch more jobs than we have available.
#
# Thus, the minimum of what is still allowed and what is available
for _ in range(min(queue.concurrency_allowed - queue.executing, queue.submitted)):
self.executor.execute(self.queue_reporter.next_waiting_execution(queue.name))
def reconfigure(self):
self.startup()
def startup(self):
self.launch_if_availability()
class FakeQueueReporterExecutionManager(QueueReporterIF, ExecutionManagerIF):
"""
A testing mock that pretends to be both the queue reporter and execution manager.
Our notions of both are pretty limited, basically moving things from one list to
another so that we can get proper reports, but otherwise not doing anything real.
"""
submitted = []
executing = []
completed = []
allowed = 3
def report(self) -> List[QueueReport]:
return [QueueReport("fake", len(self.submitted), len(self.executing), self.allowed)]
def next_waiting_execution(self, queue: str) -> CapabilityExecutionIF:
return next(iter(self.submitted))
def submit(self, execution: CapabilityExecutionIF):
self.submitted.append(execution)
def execute(self, execution: CapabilityExecutionIF):
self.submitted.remove(execution)
self.executing.append(execution)
def complete(self, execution: CapabilityExecutionIF):
self.executing.remove(execution)
self.completed.append(execution)
def test_capability_queue_manager():
# let's start by putting five things into the execution manager
test_reporter_executor = FakeQueueReporterExecutionManager()
for i in range(1, 6):
test_reporter_executor.submit(i)
# now let's make our queue manager and see what the report tells us
queue = CapabilityQueueManager(test_reporter_executor, test_reporter_executor)
# we should now have 3 executing and 2 submitted
assert test_reporter_executor.report()[0].executing == 3
assert test_reporter_executor.report()[0].submitted == 2
# now let's finish a couple of them
test_reporter_executor.complete(1)
queue.execution_complete()
test_reporter_executor.complete(2)
queue.execution_complete()
# we should now have 3 executing and 0 submitted
assert test_reporter_executor.report()[0].executing == 3
assert test_reporter_executor.report()[0].submitted == 0
# let's complete a couple more, this time no notification in between
test_reporter_executor.complete(3)
test_reporter_executor.complete(4)
queue.execution_complete()
# we should now have 1 executing and 0 submitted
assert test_reporter_executor.report()[0].executing == 1
assert test_reporter_executor.report()[0].submitted == 0
# let's add four more
for i in range(6, 10):
test_reporter_executor.submit(i)
queue.execution_requested()
# now check again and we should have 3 executing and 2 submitted again
assert test_reporter_executor.report()[0].executing == 3
assert test_reporter_executor.report()[0].submitted == 2
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment