Skip to content
Snippets Groups Projects
Commit afbaaba0 authored by Nathan Hertz's avatar Nathan Hertz
Browse files

Didn't mean to commit wksp0 project; removed it

parent bd5183de
No related branches found
No related tags found
No related merge requests found
Showing
with 0 additions and 1280 deletions
#!/bin/sh
echo "Hello, world!"
exit 1
JOB username grep.condor
VARS username search="username" file="/home/casa/capo/nmtest.properties"
JOB password grep.condor
VARS password search="password" file="/home/casa/capo/nmtest.properties"
JOB sort sort.condor
PARENT username password CHILD sort
executable = grep.sh
arguments = "$(search) $(file) $(search).grep-out"
should_transfer_files = YES
transfer_input_files = $(file)
log_file = condor.log
queue
#!/bin/sh
grep $1 $2 > $3
executable = sort.sh
arguments = "combined.txt"
should_transfer_files = YES
transfer_input_files = username.grep-out,password.grep-out
log_file = condor.log
queue
#!/bin/sh
cat *.grep-out | sort > $1
executable = uniq.sh
arguments = raw-usernames.txt
output = unique-usernames.txt
error = uniq.err
log = condor.log
queue
#!/bin/sh
cut -f2 -d= "$1" | sed 's/[ \t]*\([^ \t]*\).*/\1/g' | sort | uniq
#!/usr/bin/env python
from setuptools import setup, find_packages
setup(name='wksp0',
version='1.0rc1',
description='NRAO Archive Workspace Subsystem Prototype 0',
author='Daniel K Lyons',
author_email='dlyons@nrao.edu',
url='https://open-bitbucket.nrao.edu/projects/SSA/repos/wksp0/browse',
packages=find_packages(),
test_suite='tests',
install_requires=[
'injector >= 0.17',
'htcondor >= 8.9',
'pystache >= 0.5'
],
extras_require={
'dev': ['sphinx >= 2.2', 'sphinx_rtd_theme']
},
entry_points={
'console_scripts': [
'run = wksp.run_capability:main',
'run_workflow = wksp.run_workflow:main'
],
})
import unittest
import inspect
from typing import List
import wksp.ifaces
def interface_methods(klass) -> List[str]:
"""
Return the names of methods on the supplied class
:param klass: the class to get methods from
:return: the names of the methods on klass
"""
return [x for (x, _) in inspect.getmembers(klass, predicate=inspect.isfunction)]
class TestInterfaces(unittest.TestCase):
"""
Ensure that the interfaces have exactly the methods we think they have, and no more or less.
"""
def test_capability_info(self):
methods = interface_methods(wksp.ifaces.CapabilityInfo)
self.assertEqual(len(methods), 2, "CapabilityInfo should have two methods")
self.assertIn("lookup_capability", methods, "CapabilityInfo should have a method lookup_capability")
self.assertIn("lookup_capability_request", methods,
"CapabilityInfo should have a method lookup_capability_request")
import pathlib
import tempfile
import unittest
from unittest.mock import patch, MagicMock
from injector import Injector
import wksp.workflow as wf
class TestWorkflowService(unittest.TestCase):
"""
Ensure that the interfaces have exactly the methods we think they have, and no more or less.
"""
@patch('wksp.workflow.subprocess')
def test_workflow_service(self, subprocess):
"""
Tests the hardcoded grep-uniq workflow by faking out the condor_submit_dag execution.
:param subprocess: a fake subprocess module
:return: whether the test passes or not
"""
# for the purposes of this test, we are using
# the hardcoded workflow info and the DAGman workflow service
def configure(binder):
binder.bind(wf.WorkflowInfo, to=wf.DirectoryWorkflowInfo(pathlib.Path('workflows')))
binder.bind(wf.WorkflowService, to=wf.HTCondorWorkflowService)
# set up the injector
injector = Injector(configure)
# get the service
workflow_service = injector.get(wf.WorkflowService)
# The plan here is to fake out the subprocess.run call and the mkdtemp call so that we
# know a-priori what the temp directory produced is going to be. Then we can check that
# the arguments to the condor_submit_dag call match exactly what we expect.
#
# We could go further here and inspect the files in the temp directory as well.
# make a real temp directory
with tempfile.TemporaryDirectory() as temp_dir:
with patch('wksp.workflow.mkdtemp', new=MagicMock(return_value=temp_dir)):
# run a workflow
workflow_name = 'grep-uniq'
workflow_service.execute(workflow_name,
{'search': 'username'},
[pathlib.Path('/home/casa/capo/nmtest.properties')])
# did subprocess.run get executed or not?
subprocess.run.assert_called_with(['condor_submit_dag',
f'{temp_dir}/{workflow_name}.dag'],
cwd=temp_dir)
# here's a moment to inspect the files in /tmp, if we choose to
pass
import unittest
from wksp.capability import *
class CapabilityParsingTests(unittest.TestCase):
def test_parsing(self):
capability = DirectoryCapability(pathlib.Path('../../capabilities/grep-uniq'))
self.assertEquals(5, len(capability.sequence))
class ConsoleParameterReaderTests(unittest.TestCase):
def test_reading_parameters(self):
inst = ConsoleParameterReader.obtain_parameter(QaStatus)
print(inst)
inst = ConsoleParameterReader.obtain_parameter(SearchParameters)
print(inst)
from queue import Queue
from threading import Semaphore
from injector import ClassAssistedBuilder, inject
from wksp.ifaces import *
import pathlib
class SearchParameters(Parameter):
search: str
@staticmethod
def fields() -> Dict[FieldName, FieldLabel]:
return {'search': 'Search'}
def json(self):
return {'search': self.search}
def load(self, json):
self.search = json['search']
def __repr__(self):
return f"<SearchParameters search='{self.search}'>"
class QaStatus(Parameter):
status: bool
@staticmethod
def fields() -> Dict[FieldName, FieldLabel]:
return {'qa-pass': 'Passes QA'}
def json(self):
return {'qa-pass': self.status}
def load(self, json):
self.status = json['qa-pass'].strip().lower() in ['yes', 'y', 'true']
def __repr__(self):
return f"<QaStatus {'pass' if self.status else 'fail'}>"
ParameterRegistry = {'search-parameters': SearchParameters,
'qa-status': QaStatus}
class ConsoleParameterReader:
@staticmethod
def obtain_parameter(parameter_type: Type[Parameter]) -> Parameter:
json = {}
for field, label in parameter_type.fields().items():
json[field] = input(label + '?> ')
result = parameter_type()
result.load(json)
return result
class DirectoryCapability(Capability):
"""
Implements a capability by reading files off the filesystem (rather than from a database or whatnot).
"""
max_jobs: int
sequence: List[CapabilityStep]
def create_request(self, locators: List[ProductLocator]):
return CapabilityRequest(capability=self, locators=locators, files=[], id=None, parameters=[])
def __init__(self, path: pathlib.Path):
self.path = path
self.name = path.name
self.sequence = self.parse(self.path / 'sequence.txt')
self.max_jobs = 2
def __hash__(self):
return hash({'name': 'DirectoryCapability', 'path': self.path})
@staticmethod
def parse(path: pathlib.Path):
sequence = []
with path.open('r') as f:
for line in f:
if line.startswith('AWAIT PRODUCTS'):
sequence.append(AwaitProduct())
elif line.startswith('AWAIT PRODUCT '):
sequence.append(AwaitProduct(line.split('AWAIT PRODUCT ')[1].strip()))
elif line.startswith('AWAIT PARAMETER '):
sequence.append(AwaitParameter(ParameterRegistry[line.split('AWAIT PARAMETER ')[1].strip()]))
elif line.startswith('PREPARE AND RUN WORKFLOW '):
sequence.append(PrepareAndRunWorkflow(line.split('PREPARE AND RUN WORKFLOW ')[1].strip()))
return sequence
class DirectoryCapabilityInfo(CapabilityInfo):
"""
Finds information about capabilities on the filesystem. Stores requests in memory (in a list).
"""
def __init__(self, path: pathlib.Path):
self.path = path
self.requests = []
self.n_requests = 0
def lookup_capability(self, capability_name: str) -> Capability:
return DirectoryCapability(self.path / capability_name)
def lookup_capability_request(self, capability_request_id: int) -> CapabilityRequest:
return self.requests[capability_request_id]
def save_request(self, request: CapabilityRequest) -> int:
# 1. Record this request in our list of requests
self.requests.append(request)
self.n_requests += 1
# 2. Record the ID on the request itself
request.id = self.n_requests
# return it
return request.id
class PrototypeCapabilityQueue(CapabilityQueue):
"""
Implements the CapabilityQueue API, backed by a simple thread-safe queue.
"""
items: Queue
@inject
def __init__(self, capability: Capability, runner_builder: ClassAssistedBuilder["PrototypeQueueRunner"]):
self.items = Queue()
self.runner = runner_builder.build(queue=self.items, max_jobs=capability.max_jobs)
self.runner.start()
def enqueue(self, request: CapabilityRequest):
# 1. place this request into some kind of queue
self.items.put(request)
class PrototypeCapabilityEngine(CapabilityEngine):
request: CapabilityRequest
responder: CapabilityEngineResponder
@inject
def __init__(self, request: CapabilityRequest, responder: CapabilityEngineResponder):
self.request = request
self.responder = responder
def execute(self, request):
for step in request.capability.sequence:
self._execute_step(step)
def _execute_step(self, step: CapabilityStep):
step.execute_against(self.request, self.responder)
class PrototypeCapabilityEngineResponder(CapabilityEngineResponder):
workflow_service: WorkflowService
product_service: ProductService
@inject
def __init__(self, workflow_service: WorkflowService, product_service: ProductService):
self.workflow_service = workflow_service
self.product_service = product_service
self.console = ConsoleParameterReader()
def prepare_and_run_workflow(self, step: CapabilityStep, name: str, param: Parameter, files: List[Path]):
# in here I need to find the WorkflowService
return self.workflow_service.execute(name, param.json(), files)
def await_product(self, step: CapabilityStep, product_locator: ProductLocator):
return self.product_service.locate_product(product_locator)
def await_parameter(self, step: CapabilityStep, parameter_type: Type[Parameter]) -> Parameter:
return self.console.obtain_parameter(parameter_type)
class PrototypeQueueRunner(QueueRunner, Thread):
engines: Dict[CapabilityRequest, CapabilityEngine]
@inject
def __init__(self, queue: Queue, max_jobs: int, engine_builder: ClassAssistedBuilder[PrototypeCapabilityEngine]):
super().__init__()
self.queue = queue
self.semaphore = Semaphore(max_jobs)
self.engines = {}
self.engine_builder = engine_builder
def run(self) -> None:
while True:
# obtain the semaphore
self.semaphore.acquire()
# now get a job from the queue
request = self.queue.get()
# now build an engine and start executing that request
self.engines[request.id] = self.engine_builder.build(request=request)
# execute the first step of this capability
self.engines[request.id].execute(request)
# release the semaphore
self.semaphore.release()
def complete(self, request):
"""
Sent by the engine when it is done executing a capability
:return:
"""
del self.engines[request.id]
self.semaphore.release()
class PrototypeCapabilityService(CapabilityService):
queues: Dict[CapabilityName, CapabilityQueue]
@inject
def __init__(self, info: CapabilityInfo, queue_builder: ClassAssistedBuilder[PrototypeCapabilityQueue]):
self.queues = {}
self.info = info
self.queue_builder = queue_builder
def send_request(self, name: CapabilityName, locators: List[ProductLocator]) -> CapabilityRequest:
# 1. Locate the capability
capability = self.info.lookup_capability(name)
# 2. Create a request
request = capability.create_request(locators)
# 3. Persist the request
self.info.save_request(request)
# 4. Return it
return request
def _locate_queue(self, request: CapabilityRequest) -> CapabilityQueue:
# 1. Create a queue for this capability, if we don't have one currently
if request.capability.name not in self.queues:
self.queues[request.capability.name] = self.queue_builder.build(capability=request.capability)
# 2. Return the queue for this capability
return self.queues[request.capability.name]
def execute(self, request: CapabilityRequest) -> None:
# 1. Locate the proper queue for this request
queue = self._locate_queue(request)
# 2. Submit the request to that queue
queue.enqueue(request)
class HardcodedProductService(ProductService):
def __init__(self):
self.products = {'nmtest-capo': pathlib.Path('/home/casa/capo/nmtest.properties'),
'readme': pathlib.Path('README.md')}
def locate_product(self, product_locator: ProductLocator) -> Path:
return self.products[product_locator]
import abc
import argparse
import pathlib
# -------------------------------------------------------------------------
#
# D E S T I N A T I O N S Y S T E M
#
# -------------------------------------------------------------------------
import secrets
import sys
from typing import Iterator, List
class Destination(abc.ABC):
"""
Destinations are locations that files can be copied into. They might not
always be on a local disk; FTP or Globus could also be destinations.
The destination API is very simply, consisting just of adding files.
"""
@abc.abstractmethod
def add_file(self, file: pathlib.Path, relative_path: str):
pass
@abc.abstractmethod
def close(self):
pass
def __exit__(self, exc_type, exc_val, exc_tb):
# ensure that if we are used as a context manager ('with' statement)
# that the destinations do get properly closed
self.close()
class DestinationDecorator(Destination):
def __init__(self, underlying: Destination):
self.underlying = underlying
def add_file(self, file: pathlib.Path, relative_path: str):
self.underlying.add_file(file, relative_path)
def close(self):
self.underlying.close()
class TarDecorator(DestinationDecorator):
"""
This decorator creates a local tar archive. Calls to add_file
are intercepted and instead the file contents are added to the
tar archive. When close() is called, we finalize the tar file
and place it in the delivery area.
"""
pass
class ChecksumDecorator(DestinationDecorator):
"""
This decorator ensures that an MD5SUM file appears in the underlying
destination after all the files are added, and the contents of that
file are, as one would expect, the MD5SUMs of the files added to the
destination, in the format that ``md5sum -c`` expects.
"""
pass
class SubdirectoryDecorator(DestinationDecorator):
def __init__(self, underlying: Destination, subdirectory: str):
super().__init__(underlying)
self.subdirectory = subdirectory
def add_file(self, file: pathlib.Path, relative_path: str):
self.underlying.add_file(file, self.subdirectory + "/" + relative_path)
class LocalDestination(Destination):
"""
LocalDestination is for delivering to a local directory on the filesystem.
"""
def __init__(self, path: pathlib.Path):
self.path = path
def add_file(self, file: pathlib.Path, relative_path: str):
raise NotImplementedError
def close(self):
"""
Nothing special actually needs to be done for local deliveries
when we close the destination.
"""
pass
class DestinationBuilder:
"""
To facilitate building a stack of destination and its decorators.
"""
def __init__(self):
self._destination = None
def local(self, path: pathlib.Path):
"""Add a local destination with the given path"""
self._destination = LocalDestination(path)
return self
def tar(self):
"""Add the tar decorator to the destination"""
self._destination = TarDecorator(self._destination)
return self
def build(self):
"""Create the destination"""
return self._destination
# -------------------------------------------------------------------------
#
# P R O D U C T D E L I V E R Y S Y S T E M
#
# -------------------------------------------------------------------------
class DeliveryContext:
"""
The delivery context provides access to some environmental functions that
are not really the responsibility of any particular component, but which
many components need to share information about, such as:
- Creating and removing temporary files
- Creating and retaining tokens
"""
def __init__(self):
self._token = None
def create_tempfile(self, prefix: str, suffix: str) -> pathlib.Path:
"""
Create a temporary file, using the given prefix and suffix.
:param prefix: prefix for the tempfile name
:param suffix: suffix for the tempfile name
:return: the path to the temporary file
"""
raise NotImplementedError
@property
def token(self) -> str:
"""
If a delivery only requires one token, just use this property
to get it. It will be created once and remain the same throughout
the lifetime of this object.
:return: the current token
"""
if self._token is None:
self._token = self.generate_token()
return self._token
def generate_token(self) -> str:
"""
Generates a random token suitable for use in paths and URLs.
:return: a random token
"""
return secrets.token_hex(16)
def __exit__(self, exc_type, exc_val, exc_tb):
# possible: remove all the generated tempfiles here
pass
class SpooledProduct(abc.ABC):
"""
A SpooledProduct is something interesting enough to deliver to an end-user.
SpooledProducts might be science products or auxilliary products, but they
might just be some temporary thing that came out of some processing, never
to be seen again.
SpooledProducts have a type and a path. More specific SpooledProducts may
have other specific metadata unto themselves.
"""
@abc.abstractmethod
def deliver_to(self, deliverer: "ProductDeliverer"):
"""
Deliver sets up a double dispatch so we can get the product type into the
method itself, and behave differently depending on the type of product
being delivered.
:param deliverer: the deliverer to deliver to
"""
pass
# Basic delivery types that others are derived from (are these needed?)
class DirectoryProduct(SpooledProduct):
def deliver_to(self, deliverer: "ProductDeliverer"):
deliverer.deliver_directory(self)
class FileProduct(SpooledProduct):
def deliver_to(self, deliverer: "ProductDeliverer"):
deliverer.deliver_file(self)
class ExecutionBlock(DirectoryProduct):
def __init__(self, date):
self.date = date
@property
def pipeline_spec(self):
"For execution blocks, $PIPELINE_SPEC is observation.$DATE"
return f'observation.{self.date}'
def deliver_to(self, deliverer: "ProductDeliverer"):
deliverer.deliver_execution_block(self)
# These types are derived from what appears in the piperesults file;
# there may be duplicates
class ASDM(DirectoryProduct):
def deliver_to(self, deliverer: "ProductDeliverer"):
deliverer.deliver_asdm(self)
class PipeRequest(FileProduct):
def deliver_to(self, deliverer: "ProductDeliverer"):
deliverer.deliver_piperequest(self)
class CalibrationTables(FileProduct):
def __init__(self, date: str):
self.date = date
def pipeline_spec(self):
"For calibrations, $PIPELINE_SPEC is calibration_pipeline.$DATE"
return f'calibration_pipeline.{self.date}'
def deliver_to(self, deliverer: "ProductDeliverer"):
deliverer.deliver_calibration_tables(self)
class Flags(FileProduct):
def deliver_to(self, deliverer: "ProductDeliverer"):
deliverer.deliver_flags(self)
class ApplyCommands(FileProduct):
def deliver_to(self, deliverer: "ProductDeliverer"):
deliverer.deliver_apply_commands(self)
class Weblog(FileProduct):
def deliver_to(self, deliverer: "ProductDeliverer"):
deliverer.deliver_weblog(self)
class CasaCommandLog(FileProduct):
def deliver_to(self, deliverer: "ProductDeliverer"):
deliverer.deliver_casa_command_log(self)
class RestoreScript(FileProduct):
def deliver_to(self, deliverer: "ProductDeliverer"):
deliverer.deliver_restore_script(self)
class Image(FileProduct):
def __init__(self, date):
self.date = date
def pipeline_spec(self):
"For images, $PIPELINE_SPEC is imaging_pipeline.$DATE"
return f'imaging_pipeline.{self.date}'
def deliver_to(self, deliverer: "ProductDeliverer"):
deliverer.deliver_image(self)
class OUS(DirectoryProduct):
"""
OUSes have a great deal of sub-products within them.
"""
def __init__(self, id):
self.id = id
self._products = []
def add_product(self, product):
"""
Add a product to this OUS.
:param product: the product to add
"""
self._products.append(product)
@property
def products(self):
return self._products
def deliver_to(self, deliverer: "ProductDeliverer"):
deliverer.deliver_ous(self)
class ProductDeliverer:
def __init__(self, destination: Destination):
self.destination = destination
def deliver_product(self, product):
"""
Primary user-interface for this class. Provide a product to be delivered.
Under the hood, engages the double dispatch mechanism to
effect typed delivery.
:param product: the product to deliver
:return:
"""
product.deliver_to(self)
def deliver_asdm(self, asdm: ASDM):
"""
Deliver a measurement set
:param measurement_set: the measurement set to deliver
"""
# FIXME: example implementation; refine
print(f'Delivering measurement set {asdm}')
self.deliver_directory(asdm)
def deliver_ous(self, ous: OUS):
"""
Deliver an OUS, which has products inside it.
:param ous: the OUS to deliver
"""
# FIXME: example implementation; refine
# The trick here is to basically "cd" into the OUS directory and then
# proceed as normal. The subdirectory decorator will ensure that the
# products have the same structure, but inside the OUS directory
# instead of the level above.
ous_destination = ProductDeliverer(SubdirectoryDecorator(self.destination, ous.id))
for product in ous.products:
product.deliver_to(ous_destination)
def deliver_directory(self, dir_product: DirectoryProduct):
raise NotImplementedError
def deliver_file(self, file_product: FileProduct):
raise NotImplementedError
def deliver_piperequest(self, ppr: PipeRequest):
"""
An example of how to deliver something specific, which is really just
a file under the hood.
:param ppr:
:return:
"""
print('Delivering PPR')
self.deliver_file(ppr)
def deliver_image(self, img: Image):
# similar to deliver_piperequest
raise NotImplementedError
def deliver_restore_script(self, restore_script: RestoreScript):
# similar to deliver_piperequest
raise NotImplementedError
def deliver_casa_command_log(self, log: CasaCommandLog):
# similar to deliver_piperequest
raise NotImplementedError
def deliver_calibration_tables(self, caltables: CalibrationTables):
# similar to deliver_piperequest
raise NotImplementedError
def deliver_flags(self, flags: Flags):
# similar to deliver_piperequest
raise NotImplementedError
def deliver_apply_commands(self, cmds: ApplyCommands):
# similar to deliver_piperequest
raise NotImplementedError
def deliver_weblog(self, weblog: Weblog):
# similar to deliver_piperequest
raise NotImplementedError
# -------------------------------------------------------------------------
#
# P R O D U C T F I N D I N G
#
# -------------------------------------------------------------------------
class ProductFinder(abc.ABC):
@abc.abstractmethod
def find_products(self) -> Iterator[SpooledProduct]:
pass
class PiperesultsProductFinder(ProductFinder):
def __init__(self, path: pathlib.Path):
self.path = path
def find_products(self) -> Iterator[SpooledProduct]:
raise NotImplementedError
class HeuristicProductFinder(ProductFinder):
def __init__(self, path: pathlib.Path):
self.path = path
def find_products(self) -> Iterator[SpooledProduct]:
raise NotImplementedError
# -------------------------------------------------------------------------
#
# C O M M A N D L I N E A R G U M E N T S
#
# -------------------------------------------------------------------------
class DeliverySettings:
def __init__(self, source: pathlib.Path, tar=False, local_destination=None):
self.source = source
self.tar = tar
self.local_destination = local_destination
def create_destination(self) -> Destination:
builder = DestinationBuilder()
# first handle the local destination argument
if self.local_destination:
builder.local(self.local_destination)
else:
builder.local("/lustre/aoc/whatever")
# then handle the tar argument
if self.tar:
builder.tar()
return builder.build()
@classmethod
def parse_commandline(cls, args=None) -> "DeliverySettings":
parser = argparse.ArgumentParser()
parser.add_argument('-l', '--local-destination', type=str, default=None,
help="Deliver to this local directory instead of the appropriate web root")
parser.add_argument('-t', '--tar', action='store_true', default=False, help='Archive the delivered items as a tar file')
parser.add_argument('source', type=pathlib.Path, metavar="SOURCE_DIRECTORY",
help="The directory where the products to be delivered are located")
ns = parser.parse_args(args)
return DeliverySettings(source=ns.source, tar=ns.tar, local_destination=ns.local_destination)
# -------------------------------------------------------------------------
#
# D E L I V E R Y
#
# -------------------------------------------------------------------------
class Delivery:
def __init__(self):
self.context = DeliveryContext()
def has_piperesults(self, path: pathlib.Path) -> bool:
raise NotImplementedError
def create_product_finder(self, source: pathlib.Path) -> ProductFinder:
"""
Based on the contents of the source/ folder, make the right flavor
of product finder
:param source: directory to examine
:return: a product finder
"""
# if there is a piperesults file, use that finder
if self.has_piperesults(source):
return PiperesultsProductFinder(source)
else:
return HeuristicProductFinder(source)
def deliver(self, settings: DeliverySettings):
# make the destination
destination = settings.create_destination()
# find the products
finder = self.create_product_finder(settings.source)
# the ensuing probably needs some kind of reference to the options,
# so we know how to create the destination
# make the delivery system
deliverer = ProductDeliverer(destination)
# loop over the products and deliver them
for product in finder.find_products():
deliverer.deliver_product(product)
def main():
"""CLI entry point"""
settings = DeliverySettings.parse_commandline()
Delivery().deliver(settings)
if __name__ == '__main__':
main()
from abc import ABC, abstractmethod
from dataclasses import dataclass
from pathlib import Path
from threading import Thread
from typing import List, Iterator, Iterable, Dict, Type, Optional
import inspect
"""
Interfaces for the Workspace system live in this module.
Python doesn't really have interfaces, but it does have ABCs: abstract base classes.
The idea here is, pending input from SSA-5944, to at least document the interfaces as
I understand them, and hopefully this will be enough of a "model" that IntelliJ will
let me know if I misuse them.
"""
ProductLocator = str
CapabilityName = str
@dataclass
class ScienceProduct(ABC):
product_locator: ProductLocator
"""
A science product from the archive
"""
pass
@dataclass
class Capability(ABC):
name: CapabilityName
max_jobs: int
"""
A capability
"""
def create_request(self, locators: List[ProductLocator]):
"""
Create a capability request for this capability
:param locators: product locators for the new request
:return: a capability request
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class ProductService(ABC):
"""
Locate products and realize them on disk (haha).
"""
@abstractmethod
def locate_product(self, product_locator: ProductLocator) -> Path:
"""
Locates a given product and produces a file path to it.
:param product_locator: the locator to this product
:return: a path to this product
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@dataclass
class CapabilityRequest:
"""
A particular capability request
"""
capability: Capability
locators: List[ProductLocator]
id: Optional[int]
parameters: List["Parameter"]
files: List[Path]
@property
def last_parameter(self) -> "Parameter":
return self.parameters[-1]
class CapabilityInfo(ABC):
"""
Interface to stored capability information.
"""
@abstractmethod
def lookup_capability(self, capability_name: str) -> Capability:
"""
Look up the definition of a capability.
:param capability_name: the name of the capability to find
:return: a capability
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def lookup_capability_request(self, capability_request_id: int) -> CapabilityRequest:
"""
Look up a particular request
:param capability_request_id: the request identifier
:return: a capability request
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def save_request(self, request: CapabilityRequest) -> int:
"""
Save a capability request and return an integer identifier for it.
:param request: the request to save
:return: the request identifier
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class QueueRunner(Thread, ABC):
pass
class CapabilityQueue(ABC):
"""
Holds capability requests until they can be executed.
"""
@abstractmethod
def enqueue(self, request: CapabilityRequest):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityService(ABC):
"""
The capability service: clients access this to request capability runs
"""
@abstractmethod
def send_request(self, name: CapabilityName, locators: List[ProductLocator]) -> CapabilityRequest:
"""
Start a capability request with the given capability name and product locators.
:param name: the capability name to look things up with
:param locators: the products to start the capability with
:return: a new capability request
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def execute(self, request: CapabilityRequest) -> None:
"""
Begin executing a capability request
:param request: the request to execute
:return: None
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class ArchiveService(ABC):
"""
Abstracts services that are needed from the archive system.
"""
@abstractmethod
def lookup_product(self, locator: ProductLocator) -> ScienceProduct:
"""
Look up a science product by its locator
:param locator: science product locator for this product
:return: science product
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
FieldName = str
FieldLabel = str
class Parameter(ABC):
"""
Abstracts parameters needed for running capabilities.
"""
@staticmethod
def fields() -> Dict[FieldName, FieldLabel]:
raise NotImplementedError(f'Parameter.{inspect.stack()[0][3]}')
def json(self) -> Dict[str, str]:
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
def load(self, json: Dict[str, str]) -> None:
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityStep(ABC):
"""
A step in a capability sequence
"""
@abstractmethod
def execute_against(self, request: CapabilityRequest, responder: "CapabilityEngineResponder"):
"""
Execute this capability step
:return: None
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilityEngineResponder(ABC):
"""
Abstracts the callbacks for a capability engine
"""
@abstractmethod
def await_parameter(self, step: CapabilityStep, parameter_type: Type[Parameter]) -> Parameter:
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def await_product(self, step: CapabilityStep, product_locator: ProductLocator):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@abstractmethod
def prepare_and_run_workflow(self, step: CapabilityStep, name: str, param: Parameter, files: List[Path]):
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class CapabilitySequence(ABC):
"""
Represents the sequence of events in a capability.
"""
pass
class CapabilityEngine(ABC):
"""
Executes a capability.
"""
@abstractmethod
def execute(self, request):
pass
class AwaitProduct(CapabilityStep, ABC):
"""
Wait for a product to become available.
"""
product: ProductLocator
def __init__(self, product: Optional[ProductLocator]=None):
self.product = product
def execute_against(self, request: CapabilityRequest, responder: CapabilityEngineResponder):
# if we have a product, await it
if self.product:
request.files.append(responder.await_product(self, self.product))
# if we do not, await the locators on the request itself
else:
for locator in request.locators:
request.files.append(responder.await_product(self, locator))
class AwaitParameter(CapabilityStep, ABC):
"""
Wait for a certain parameter to arrive (probably from the UI).
"""
parameter_type: Type[Parameter]
def __init__(self, parameter_type: Type[Parameter]):
self.parameter_type = parameter_type
def execute_against(self, request: CapabilityRequest, responder: CapabilityEngineResponder):
request.parameters.append(responder.await_parameter(self, self.parameter_type))
class PrepareAndRunWorkflow(CapabilityStep, ABC):
"""
Render templates and execute a workflow, awaiting its completion.
"""
workflow_name: str
def __init__(self, workflow_name: str):
self.workflow_name = workflow_name
def execute_against(self, request: CapabilityRequest, responder: CapabilityEngineResponder):
responder.prepare_and_run_workflow(self, self.workflow_name, request.last_parameter, request.files)
class WorkflowEvent(ABC):
"""
Represents an event on the workflow.
"""
pass
class WorkflowEventStream(ABC, Iterable[WorkflowEvent]):
"""
Represents an event stream from a workflow execution.
"""
@abstractmethod
def __iter__(self) -> Iterator[WorkflowEvent]:
"""
Get the events from the workflow event stream.
:return: all the events for this workflow execution
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class WorkflowService(ABC):
"""
Executes workflows; should be a freestanding service.
"""
@abstractmethod
def execute(self, workflow_name: str, argument: Dict, files: List[Path]) -> WorkflowEventStream:
"""
Execute this workflow against these files.
:param workflow_name: name of the workflow to run
:param argument: extra argument (a JSON object)
:param files: some extra files the workflow should consider
:return: a stream of events from this workflow
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
@dataclass
class Workflow(ABC):
name: str
dagman_template: str
tasks: List[str]
@abstractmethod
def render_templates(self, argument: Dict, files: List[Path]) -> Dict[str, str]:
"""
Render the templates associated with this workflow
:param argument: the workflow argument JSON
:param files: the files to be processed
:return: a list of rendered templates
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
class WorkflowInfo(ABC):
"""
Holds information about workflows.
"""
@abstractmethod
def lookup_workflow_definition(self, name: str) -> Workflow:
"""
Look up the workflow with this name.
:param name: Workflow name
:return: Workflow instance
"""
raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment