Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
Commits on Source (6)
Showing
with 767 additions and 319 deletions
......@@ -49,7 +49,7 @@ build base image:
- docker build -t ${REGISTRY_URL}/ops/base:${PROJECT_NAME} -f Dockerfile.base .
- docker tag ${REGISTRY_URL}/ops/base:${PROJECT_NAME} ${REGISTRY_URL}/ops/base:${CI_COMMIT_SHORT_SHA}
rules:
- if: '$CI_PIPELINE_SOURCE == "merge_request_event" && $CI_COMMIT_MESSAGE =~ /\A(?i)-ops/'
- if: '$CI_PIPELINE_SOURCE == "merge_request_event"'
changes:
- Dockerfile.base
- docker.properties
......@@ -60,9 +60,8 @@ push base image:
script:
- echo "$HARBOR_PASSWORD" | docker login -u "$HARBOR_USER" --password-stdin $REGISTRY_URL
- docker push ${REGISTRY_URL}/ops/base:${PROJECT_NAME}
- docker push ${REGISTRY_URL}/ops/base:${CI_COMMIT_SHORT_SHA}
rules:
- if: '$CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH && $CI_COMMIT_MESSAGE =~ /\A(?i)-ops/'
- if: '$CI_COMMIT_BRANCH == $CI_DEFAULT_BRANCH'
changes:
- Dockerfile.base
- docker.properties
......
......@@ -7,8 +7,6 @@ import argparse
import pathlib
import secrets
from pycapo import CapoConfig
from .deliverer import DeliveryContextIF, Destination, DestinationBuilder
from .finder import HeuristicProductFinder, ProductFinder
......@@ -48,17 +46,6 @@ class DeliveryContext(DeliveryContextIF):
def __repr__(self):
return f"<DeliveryContext {self.__dict__}>"
def create_tempfile(self, prefix: str, suffix: str) -> pathlib.Path:
"""
Create a temporary file, using the given prefix and suffix.
:param prefix: prefix for the tempfile name
:param suffix: suffix for the tempfile name
:return: the path to the temporary file
"""
raise NotImplementedError
@property
def token(self) -> str:
"""
If a delivery only requires one token, just use this property
......@@ -75,6 +62,7 @@ class DeliveryContext(DeliveryContextIF):
def generate_token() -> str:
"""
Generates a random token suitable for use in paths and URLs.
:return: a random token
"""
return secrets.token_hex(16)
......@@ -88,15 +76,24 @@ class DeliveryContext(DeliveryContextIF):
def create_destination(self) -> Destination:
builder = DestinationBuilder(self)
# so the layer-cake has to be built in kind of reverse order because it's a stack
# at the bottom we want the physical destination, which is either local or web delivery
# first handle the local destination argument
if self.local_destination:
builder.local(self.local_destination)
else:
# FIXME: this is gross
settings = CapoConfig().settings("edu.nrao.archive.workflow.config.DeliverySettings")
builder.local(pathlib.Path(settings.nraoDownloadDirectory) / "anonymous" / self.token)
builder.web()
# always do a checksum at the end
builder.checksums()
# make a CURL file before that, but only if we're doing a web delivery
if not self.local_destination:
builder.fetchfile()
# then handle the tar argument
# tar goes here, so that we generate a checksum for the tar file
# and not for all the files in the tar file
if self.tar:
builder.tar()
......@@ -160,7 +157,7 @@ class DeliveryContext(DeliveryContextIF):
return DeliveryContext(
source=ns.source,
tar=ns.tar,
local_destination=ns.local_destination,
local_destination=pathlib.Path(ns.local_destination) if ns.local_destination else None,
use_piperesults=ns.use_piperesults,
rawdata=ns.rawdata,
)
......@@ -3,203 +3,19 @@
# D E S T I N A T I O N S Y S T E M
#
# -------------------------------------------------------------------------
import abc
import os
import pathlib
import shutil
from abc import ABC, abstractmethod
class DeliveryContextIF(ABC):
@abstractmethod
def create_tempfile(self, prefix: str, suffix: str) -> pathlib.Path:
pass
@abstractmethod
def token(self) -> str:
pass
class Destination(abc.ABC):
"""
Destinations are locations that files can be copied into. They might not
always be on a local disk; FTP or Globus could also be destinations.
The destination API is very simply, consisting just of adding files.
"""
def __init__(self, context: DeliveryContextIF, path: pathlib.Path):
self.context = context
self.path = path
@abc.abstractmethod
def add_file(self, file: pathlib.Path, relative_path: str):
pass
@abc.abstractmethod
def add_directory(self, directory: pathlib.Path, relative_path: str):
pass
@abc.abstractmethod
# For tar; may not be necessary
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# ensure that if we are used as a context manager ('with' statement)
# that the destinations do get properly closed
self.close()
class DestinationDecorator(Destination):
def __init__(self, underlying: Destination):
self.underlying = underlying
super().__init__(underlying.context, underlying.path)
def add_file(self, file: pathlib.Path, relative_path: str):
self.underlying.add_file(file, relative_path)
def add_directory(self, directory: pathlib.Path, relative_path: str):
self.underlying.add_directory(directory, relative_path)
def close(self):
self.underlying.close()
class TarDecorator(DestinationDecorator):
"""
This decorator creates a local tar archive. Calls to add_file
are intercepted and instead the file contents are added to the
tar archive. When close() is called, we finalize the tar file
and place it in the delivery area.
"""
def __init__(self, context: DeliveryContextIF, underlying: Destination):
super().__init__(underlying)
self.context = context
def add_file(self, file: pathlib.Path, relative_path: str):
self.underlying.add_file(file, relative_path)
def add_directory(self, directory: pathlib.Path, relative_path: str):
self.underlying.add_directory(directory, relative_path)
def close(self):
self.underlying.close()
# I beleive this gives us the request id, but I may be wrong
# I also assume that request id is what we want, may not be true
tar_name = os.path.basename(os.path.normpath(self.context.source))
shutil.make_archive(
str(self.underlying.path) + "/" + tar_name, "gztar", self.underlying.path
)
# clear out the directory, leaving the tar
for filename in os.listdir(self.underlying.path):
file_path = os.path.join(self.underlying.path, filename)
try:
if filename != tar_name + ".tar.gz":
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
raise e
# print('Failed to delete %s. Reason: %s' % (file_path, e))
# class ChecksumDecorator(DestinationDecorator):
# """
# This decorator ensures that an MD5SUM file appears in the underlying
# destination after all the files are added, and the contents of that
# file are, as one would expect, the MD5SUMs of the files added to the
# destination, in the format that ``md5sum -c`` expects.
# """
#
# def __init__(self, context: "DeliveryContext", underlying: Destination):
# super().__init__(underlying)
# self.context = context
#
# def add_file(self, file: pathlib.Path, relative_path: str):
# self.underlying.add_file(file, relative_path)
#
# def add_directory(self, directory: pathlib.Path, relative_path: str):
# self.underlying.add_directory(directory, relative_path)
#
# def close(self):
# self.underlying.close()
# hasher = hashlib.md5()
# hasher = (fname, self.hash_bytestr_iter(self.file_as_blockiter(open(fname, 'rb')), hasher)) /
# for fname in glob.iglob(self.underlying.path + '**/**', recursive=True)
# checksum_file = open(self.underlying.path / "MD5SUMS", "w")
# checksum_file.write(hasher.hexdigest())
# checksum_file.close()
#
## the following is reported to be a memory efficient way to get md5 checksum
## https://stackoverflow.com/questions/3431825/generating-an-md5-checksum-of-a-file > post by Omnifarious
# def hash_bytestr_iter(bytesiter, hasher):
# for block in bytesiter:
# hasher.update(block)
# return hasher
#
# def file_as_blockiter(afile, blocksize=65536):
# with afile:
# block = afile.read(blocksize)
# while len(block) > 0:
# yield block
# block = afile.read(blocksize)
# class SubdirectoryDecorator(DestinationDecorator):
# def __init__(self, underlying: Destination, subdirectory: str):
# super().__init__(underlying)
# self.subdirectory = subdirectory
#
# def add_file(self, relative_path: str, file: pathlib.Path):
# self.underlying.add_file(self.subdirectory + "/" + relative_path, file)
class LocalDestination(Destination):
"""
LocalDestination is for delivering to a local directory on the filesystem.
"""
def add_file(self, file: pathlib.Path, relative_path: str):
"""
Copy contents of file to new file with path relative_path; creates directories if they don't exis
:param file: Source file whose contents are getting copied
:param relative_path: Relative path to new file in destination
"""
try:
# if don't care about file metadata we could use copy()
shutil.copy2(file, relative_path)
except shutil.Error as err:
raise err
def add_directory(self, directory: pathlib.Path, relative_path: str):
"""
Add each file in a directory to destination directory; creates directories if they don't exist
:param directory: Source directory whose files are getting copied
:param relative_path: Relative path to new directory location in destination
"""
try:
shutil.copytree(directory, str(self.path) + "/" + relative_path, dirs_exist_ok=True)
except shutil.Error as err:
raise err
def close(self):
"""
Nothing special actually needs to be done for local deliveries
when we close the destination.
"""
pass
from .destinations.interfaces import DestinationDecorator, DeliveryContextIF, Destination
from .destinations.tar import TarArchiver
from .destinations.local import LocalDestination
from .destinations.checksum import ChecksumDecorator
from .destinations.sharedweb import SharedWebDestination
from .destinations.fetchfile import FetchFileGenerator
class DestinationBuilder:
"""
To facilitate building a stack of destination and its decorators.
Facilitates building a stack of destination and its decorators.
"""
def __init__(self, context: DeliveryContextIF):
......@@ -207,17 +23,30 @@ class DestinationBuilder:
self._destination = None
def local(self, path: pathlib.Path):
"""Add a local destination with the given path"""
"""Base the delivery on a local destination with the given path"""
self._destination = LocalDestination(self.context, path)
return self
def web(self):
"""Base the delivery on a shared web destination"""
self._destination = SharedWebDestination(self.context)
return self
def checksums(self):
"Add a SHA1SUMS file to the delivery"
self._destination = ChecksumDecorator(self._destination)
return self
def fetchfile(self):
"""Add the fetch-all.sh script to the delivery."""
self._destination = FetchFileGenerator(self._destination)
return self
def tar(self):
"""Add the tar decorator to the destination"""
self._destination = TarDecorator(self.context, self._destination)
self._destination = TarArchiver(self.context, self._destination)
return self
def build(self):
"""Create the destination"""
# We *always* want checksums, so that goes on last here
# return ChecksumDecorator(self.context, self._destination)
return self._destination
import json
import pathlib
from typing import Dict
from .context import DeliveryContext
from .finder import HeuristicProductFinder, ProductFinder
......@@ -23,7 +25,7 @@ class Delivery:
else:
return HeuristicProductFinder(source)
def deliver(self, context: DeliveryContext):
def deliver(self, context: DeliveryContext) -> Dict:
"""
Primary entrypoint to delivery process
......@@ -36,20 +38,26 @@ class Delivery:
# loop over the products and deliver them
with context.create_destination() as destination:
for product in finder.find_products():
print(f"Delivering {product} to {destination}")
product.deliver_to(destination)
# the last thing we should do is return the URL to the delivered stuff.
# for a local delivery, this will be a file URL; otherwise it will be an http URL.
return destination.path
return destination.results()
def main(args=None):
"""CLI entry point"""
# parse the arguments
context = DeliveryContext.parse_commandline(args)
try:
print(Delivery().deliver(context))
except Exception as err:
print("ERROR: " + str(err))
# perform the delivery
delivery = Delivery().deliver(context)
# write the results to a file
with open("delivery.json", "w") as result_file:
# indent=2 causes the file to be pretty-printed for humans
json.dump(delivery, result_file, indent=2)
if __name__ == "__main__":
......
import hashlib
import pathlib
from .interfaces import DestinationDecorator, Destination
class ChecksumDecorator(DestinationDecorator):
"""
Generate a SHA1SUMS file in the content root with an entry for every file that got delivered.
"""
def __init__(self, underlying: Destination):
super().__init__(underlying)
self.sumsfile = underlying.create_file("SHA1SUMS")
def add_file(self, file: pathlib.Path, relative_path: str):
# add the file to the archive the normal way
super().add_file(file, relative_path)
# generate the hash and keep it
self.sumsfile.file().writelines([self.hash_file_line(file, relative_path).encode("utf8")])
def hash_file_line(self, file: pathlib.Path, relative_path: str):
"""
Generate a line for the checksum file.
:param file: file to hash
:param relative_path: relative path to show in checksum file
:return: a line like "<hash> <relative_path>"
"""
return f"{self.hash_file(file)} {relative_path}\n"
@staticmethod
def hash_file(file: pathlib.Path):
"""
Hash the supplied file
:param file: file to hash
:return: string in SHA1SUMS format (hexdigest)
"""
# You would expect performance to be worse than calling a C program here, but in my own testing I found
# that the ensuing block is somewhat *faster* than calling "shasum" directly. I was able to hash a 1 GB
# random file in 1.2 seconds with the following code, versus 1.8 seconds with shasum. This is across about 10
# trials of each, with a warm cache in each case. So I would not refactor this code to shell out without
# doing more performance tests
#
# code courtesy of Stack Overflow: https://stackoverflow.com/a/59056796/812818
with open(file, "rb") as f:
file_hash = hashlib.sha1()
while chunk := f.read(8192):
file_hash.update(chunk)
return file_hash.hexdigest() # to get a printable str instead of bytes
def close(self):
# first close the hash file
self.sumsfile.close()
# now proceed
super().close()
def __str__(self):
return str(self.underlying) + " with SHA1SUM"
import pathlib
from .interfaces import DestinationDecorator, Destination
class FetchFileGenerator(DestinationDecorator):
"""
Generates a shell script which can be used to fetch all the files related to a request.
The script uses "wget".
"""
def __init__(self, underlying: Destination):
super().__init__(underlying)
# generate the file
self.fetch_script = underlying.create_file("fetch-all.sh")
# write some lines to it
self.fetch_script.file().writelines([b"#!/bin/sh\n", b"\n"])
def add_file(self, file: pathlib.Path, relative_path: str):
super().add_file(file, relative_path)
# add a line to the fetch script for this file
url = self.underlying.results()["url"]
self.fetch_script.file().writelines([f"wget {url}/{relative_path}\n".encode("utf8")])
def close(self):
# first close the script
self.fetch_script.close()
# proceed
super().close()
def __str__(self):
return str(self.underlying) + " with fetch-all.sh"
import pathlib
from abc import ABC, abstractmethod
from typing import BinaryIO, Dict
class DeliveryContextIF(ABC):
"""
The DeliveryContext is something that is available to destinations during
processing for shared utility functions.
"""
@abstractmethod
def token(self) -> str:
"""
Returns a unique token for this delivery. Guaranteed to be the same across multiple calls.
:return: a string token
"""
pass
class DestinationTempFile(ABC):
"""
A DestinationFile is a file that you can create in the destination. Initially a temporary file,
it will be added to the destination when you are finished with it. This is a way to create files
you can write to during delivery, which are still routed through the destination mechanism at the end.
"""
@abstractmethod
def close(self):
"""
Close the file (and add it to the destination at your desired path)
"""
pass
@abstractmethod
def file(self) -> BinaryIO:
"""
Access the raw file for writing
:return: a file for writing
"""
pass
@abstractmethod
def filename(self) -> str:
"""
Access the temporary path of this file during construction
:return: the path to the temporary file
"""
pass
class Destination(ABC):
"""
Destinations are locations that files can be copied into. They might not
always be on a local disk; FTP or Globus could also be destinations.
The destination API is very simply, consisting just of adding files.
"""
def __init__(self, context: DeliveryContextIF):
self.context = context
@abstractmethod
def add_file(self, file: pathlib.Path, relative_path: str):
"""
Add a file to the destination at the given relative path.
:param file: the file (whose contents we are delivering)
:param relative_path: the relative path to that file (in the delivery root)
"""
pass
@abstractmethod
def create_file(self, relative_path: str) -> DestinationTempFile:
"""
Create a file in the destination. When the file is closed, it will be added
to the destination via the add_file method at the specified relative path.
:param relative_path: the relative path where the file should eventually be placed
"""
pass
def add_directory(self, directory: pathlib.Path, relative_path: str):
"""
Add a directory and its contents recursively to the destination at the given path.
Do not override this method! add_file must be called for every file that gets delivered.
:param directory: the directory (whose files we will deliver)
:param relative_path: the relative path to this directory (in the delivery root)
"""
for entry in directory.iterdir():
if entry.is_file():
self.add_file(entry, relative_path + "/" + entry.name)
else:
self.add_directory(entry, relative_path + "/" + entry.name)
def close(self):
"""
Close the destination, signalling to this and possibly destinations in the stack
that we are finished adding new files to the destination.
"""
pass
@abstractmethod
def results(self) -> Dict:
"""
Returns some result information, to be returned to the caller. Expected keys include:
``delivered_to``
A filesystem location where the delivery placed files
``url``
A URL to the delivery location, if it can be accessed via a browser
:return: result information
"""
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# ensure that if we are used as a context manager ('with' statement)
# that the destinations do get properly closed
self.close()
class DestinationDecorator(Destination):
"""
This is a useful base class for destinations that augment the functionality of other destinations.
In general, if you add functionality to a destination, you should do it through the decorator facility
and then make corresponding changes to the builder to support it.
"""
def __init__(self, underlying: Destination):
"""
Create this destination wrapping an underlying destination.
:param underlying: the underlying destination that does real-er work
"""
self.underlying = underlying
super().__init__(underlying.context)
def add_file(self, file: pathlib.Path, relative_path: str):
"""
Add a file to the destination. In most cases, your decorator should intercept this call,
do something useful with the file and then propagate it to the underlying destination.
:param file: file to add to the destination
:param relative_path: path to the file in the result
"""
self.underlying.add_file(file, relative_path)
def create_file(self, relative_path: str) -> DestinationTempFile:
"""
Create a temporary file with the underlying destination. In most cases your decorator should leave this call
alone; if you override, be sure to read the comments in the method and copy the hack.
:param relative_path: path to the eventual home of this temporary file
:return: a DestinationTempFile that can be written to
"""
# This is a bit of a hack, but there's only one thing that makes temporary files properly now
# and it's necessary for the rest of the machinery to work. Better ideas for solving this are welcome.
# if we don't have the following lines of code, adding the tempfile at the end of processing
# goes directly to the LocalDestination rather than passing through all the layers of the layer cake
temporary_file = self.underlying.create_file(relative_path)
temporary_file.destination = self
return temporary_file
def close(self):
"""
Close the underlying destination. In most cases, your decorator will want to intercept this call to do any
finalization work that might be needed, such as closing DestinationTempFiles. It's essential that you remember
to propagate the close call to the underlying destination, or the rest of delivery will fail.
"""
self.underlying.close()
def results(self) -> Dict:
"""
In most cases you should leave this alone unless you want to modify the URL that gets
returned to the console at the end of delivery.
"""
return self.underlying.results()
def __str__(self):
return str(self.underlying)
import os
import shutil
import tempfile
from typing import BinaryIO, Dict
from .interfaces import Destination, DeliveryContextIF, DestinationTempFile
import pathlib
class LocalDestination(Destination):
"""
LocalDestination is for delivering to a local directory on the filesystem.
"""
def __init__(self, context: DeliveryContextIF, path: pathlib.Path):
super().__init__(context)
self.path = path
def add_file(self, file: pathlib.Path, relative_path: str):
"""
Copy contents of file to new file with path relative_path; creates directories if they don't exist
:param file: Source file whose contents are getting copied
:param relative_path: Relative path to new file in destination
"""
# ensure that the directory exists. we must check, because we get an exception
# if we try to create one that already exists
if not (self.path / relative_path).parent.exists():
(self.path / relative_path).parent.mkdir(parents=True)
# now copy the file
# if we don't care about file metadata we could use copy() instead
shutil.copy2(file, self.path / relative_path)
def create_file(self, relative_path: str) -> DestinationTempFile:
"""
Creates a temporary file in the destination which will eventually be added at the supplied relative path.
:param relative_path: the path to the eventual file
:return: a DestinationTempFile you can write to
"""
# make a temporary file in the local destination
# the temporary file will have a prefix of just the filename
prefix_name = relative_path.split("/")[-1]
# make the named temporary file
rawfile = tempfile.NamedTemporaryFile(prefix=prefix_name, dir=self.path, delete=False)
# hand off to the LocalDestinationTempFile, which handles the rest
return LocalDestinationTempFile(self, relative_path, rawfile)
def results(self) -> Dict:
"""
Return a file:/// URL for this location
"""
# we could also supply a file:/// URL, which would be constructed like so:
# "url": pathlib.Path(self.path.absolute()).as_uri()
return {"delivered_to": str(self.path)}
def __str__(self):
return f"local destination {self.path}"
class LocalDestinationTempFile(DestinationTempFile):
"""
Implements the DestinationTempFile functionality for local destinations. Presumably if we wind up with other
kinds of destination (globus or streaming or something like that) we will need to greatly reconsider how to
implement this.
"""
def __init__(
self,
destination: LocalDestination,
relative_path: str,
tempfile: tempfile.NamedTemporaryFile,
):
self.destination = destination
self.relative_path = relative_path
self.tempfile = tempfile
def close(self):
# The key idea here is that after we close the temp file but before we delete it, we add it to the destination
self.tempfile.close()
# now that the file is finalized, we can add it to the destination using the path to the named temporary file
# and the relative path the user originally requested
self.destination.add_file(pathlib.Path(self.tempfile.name), self.relative_path)
# now that the file has been delivered, we can remove it safely
os.unlink(self.tempfile.name)
def file(self) -> BinaryIO:
return self.tempfile
def filename(self) -> str:
return self.tempfile.name
import pathlib
from typing import Dict
from .interfaces import DestinationDecorator, DeliveryContextIF
from .local import LocalDestination
from pycapo import CapoConfig
class SharedWebDestination(DestinationDecorator):
"""
Users who do not specify a local destination receive the standard shared web destination automatically.
This is a location in a shared filesystem where a web service is configured to allow access files.
Otherwise, it behaves like a local destination.
"""
def __init__(self, context: DeliveryContextIF):
# The implementation trick here is basically that the constructor makes it look like a final destination,
# but internally it is a decorator for a local destination with some slightly different
# initialization parameters, which come from Capo
capo = CapoConfig().settings("edu.nrao.archive.workflow.config.DeliverySettings")
# eventually, we will need to plumb the username into here instead of anonymous
username = "anonymous"
# determine the destination directory
self.destination_dir = pathlib.Path(capo.nraoDownloadDirectory) / username / context.token()
# LocalDestination won't create a directory, so we'll do that here
self.destination_dir.mkdir(parents=True)
# we're ready to build the local destination and thread it through our super constructor
super().__init__(LocalDestination(context, self.destination_dir))
# generate the download URL for later usage
self.download_url = f"{capo.nraoDownloadUrl}/{username}/{context.token()}"
def results(self) -> Dict:
return {"delivered_to": str(self.destination_dir), "url": self.download_url}
def __str__(self):
return f"shared web root at {self.destination_dir}"
import pathlib
from .interfaces import DestinationDecorator, Destination
class SubdirectoryDecorator(DestinationDecorator):
"""
A wrapper for making it easy to descend to subdirectories during delivery.
"""
def __init__(self, underlying: Destination, subdirectory: str):
super().__init__(underlying)
self.subdirectory = subdirectory
def add_file(self, file: pathlib.Path, relative_path: str):
self.underlying.add_file(file, self.subdirectory + "/" + relative_path)
import pathlib
import tarfile
from .interfaces import DestinationDecorator, Destination, DeliveryContextIF
class TarArchiver(DestinationDecorator):
"""
This decorator creates a local tar archive. Calls to add_file
are intercepted and instead the file contents are added to the
tar archive. When close() is called, we finalize the tar file
and it is placed in the delivery area.
"""
def __init__(self, context: DeliveryContextIF, underlying: Destination):
super().__init__(underlying)
self.context = context
self.archive, self.archive_tempfile = None, None
def add_file(self, file: pathlib.Path, relative_path: str):
# ensure we have created the tar archive
self.ensure_archive_created(relative_path)
# add the file at the offset
self.archive.add(file, relative_path)
def ensure_archive_created(self, relative_path: str):
"""
This method guarantees that we have an open archive for writing.
If we do have an archive open, nothing happens; if we do not,
the archive is opened with a name derived from the supplied relative_path.
:param relative_path: path to the first file going into the archive
"""
# if we don't have the archive property yet, we must create the archive now
if not self.archive:
# the filename we generate will be the path to the first thing we're delivering with ".tar" appended
# in practice, this is going to be a directory name and it will work out nicely
# in the future, we may need to revisit this practice if we want more control over the filenames
archive_filename = relative_path.split("/")[0] + ".tar"
# create the archive as a temporary file
self.archive_tempfile = self.underlying.create_file(archive_filename)
# open the file
self.archive = tarfile.open(mode="w:", fileobj=self.archive_tempfile.file())
def close(self):
# the tarfile documentation is explicit that closing the archive does NOT close the underlying file
# therefore, both steps are necessary: the first to finalize the archive,
# the second to flush and close the file
self.archive.close()
self.archive_tempfile.close()
# now we can proceed
self.underlying.close()
def __str__(self):
return f"tar archive in " + str(self.underlying)
......@@ -12,8 +12,17 @@ from .products import SpooledProduct, ExecutionBlock
class ProductFinder(abc.ABC):
"""
Locates products for the delivery to deliver
"""
@abc.abstractmethod
def find_products(self) -> Iterator[SpooledProduct]:
"""
Find products and return an iterator of them
:return: Iterator of SpooledProducts
"""
pass
......
......@@ -45,10 +45,16 @@ class ExecutionBlock(SpooledProduct):
eventually.
"""
@property
def eb_name(self):
return self.path.absolute().name
def deliver_to(self, destination: Destination):
eb_name = self.path.absolute().name
# let's use our directory name as the relative path
destination.add_directory(self.path, eb_name)
destination.add_directory(self.path, self.eb_name)
def __str__(self):
return f"execution block {self.eb_name}"
# Future types of product that might be needed:
......
import filecmp
import pathlib
from os import path
import pytest
from delivery.deliverer import LocalDestination
def test_local_delivery_add_file(tmpdir):
"""Ensure that local delivery does something"""
@pytest.fixture
def dest_dir(tmpdir) -> pathlib.Path:
"""
Generate a destination directory.
:param tmpdir: the tmpdir fixture we depend on
:return: path to the destination directory
"""
# generate the destination directory
dest_dir = pathlib.Path(tmpdir) / "after"
dest_dir.mkdir()
return dest_dir
@pytest.fixture
def file_to_deliver(tmpdir) -> pathlib.Path:
"""
Generate some files in a test directory for delivery.
:param tmpdir: pytest fixture for the temporary directory
:return: a file in the test directory
"""
# rewrap the tmpdir as a pathlib.Path instance
tmpdir = pathlib.Path(tmpdir)
# generate the test file in prior
(tmpdir / "prior").mkdir()
file_name = "test.txt"
temp_file = tmpdir.mkdir("prior").join(file_name) # place a file into the source
temp_file = tmpdir / "prior" / file_name # place a file into the source
temp_file.write_text("content", encoding=None) # write something into that file
dest_dir = tmpdir.mkdir("after") # create a destination
# secretly generate a second file
temp_file2 = tmpdir / "prior" / "test2.txt" # make another file in source
temp_file2.write_text("more content", encoding=None) # add some content to the second file
return temp_file
def test_local_delivery_add_file(tmpdir, file_to_deliver: pathlib.Path, dest_dir: pathlib.Path):
"""
Ensure that local delivery works
"""
# generate the destination
local_dest = LocalDestination(None, dest_dir)
local_dest.add_file(temp_file, str(dest_dir / file_name))
# add the file to it
local_dest.add_file(file_to_deliver, file_to_deliver.name)
# see if the source file eneded up in the destination
assert path.exists(dest_dir / file_name)
assert (dest_dir / file_to_deliver.name).exists()
# see if the content of the file is intact
assert (dest_dir / file_name).read_text(encoding=None) == "content"
assert (dest_dir / file_to_deliver.name).read_text(encoding=None) == "content"
# see if the results are alright
results = local_dest.results()
assert len(results.keys()) == 1
assert results["delivered_to"] == str(dest_dir)
def test_local_delivery_add_directory(tmpdir):
def test_local_delivery_add_directory(
tmpdir: pathlib.Path, file_to_deliver: pathlib.Path, dest_dir: pathlib.Path
):
"""Ensure that local delivery does something"""
source_dir = tmpdir.mkdir("prior") # create a source
temp_file = source_dir.join("test.txt") # add a file to the source dir
temp_file.write_text("content", encoding=None) # add some content to the file
temp_file2 = source_dir.join("test2.txt") # make another file in source
temp_file2.write_text("more content", encoding=None) # add some content to the second file
local_dest = LocalDestination(None, str(tmpdir))
local_dest.add_directory(source_dir, "after") # destination is defined here
compare_dirs = filecmp.dircmp(source_dir, tmpdir / "after")
tmpdir = pathlib.Path(tmpdir)
local_dest = LocalDestination(None, dest_dir)
local_dest.add_directory(file_to_deliver.parent, file_to_deliver.parent.name)
compare_dirs = filecmp.dircmp(file_to_deliver.parent, dest_dir / "prior")
# see if the destination got all the files from source
assert (
len(compare_dirs.left_only) == 0
and len(compare_dirs.right_only) == 0
and len(compare_dirs.funny_files) == 0
)
assert len(compare_dirs.left_only) == 0
assert len(compare_dirs.right_only) == 0
assert len(compare_dirs.funny_files) == 0
# see if the results are alright
results = local_dest.results()
assert len(results.keys()) == 1
assert results["delivered_to"] == str(dest_dir)
# Testing the CLI
import filecmp
import os
import json
import pathlib
import shutil
import tarfile
from unittest.mock import patch
......@@ -9,107 +10,143 @@ from delivery.context import DeliveryContext
from delivery.delivery import Delivery, main
def test_local_rawdata_no_tar(tmpdir_factory):
def verify_extracted_directory(
subdirectory: str,
tar_path: pathlib.Path,
extraction_target: pathlib.Path,
original_data_path: str,
):
"""
Verify that an extracted directory has the same contents as the supplied temporary directory.
Useful for testing tar-related functionality
:param subdirectory: subdirectory to look for inside extraction area
:param tar_path: path to the tarfile to examine
:param extraction_target: location to extract to
:param original_data_path: location of the original files to compare to
:return:
"""
# is it actually a tar?
assert tarfile.is_tarfile(tar_path)
# let's unpack it
shutil.unpack_archive(tar_path, extraction_target / "extracted")
# did it output what we expect?
assert (extraction_target / "extracted" / subdirectory).exists()
# compare the extracted results with the source
assert_directories_are_same(
extraction_target / "extracted" / subdirectory, (original_data_path + subdirectory)
)
def assert_directories_are_same(left, right):
"""
Test that a directory in the source is copied to a directory in the destination in the manner expected.
Check that the contents of two directories are the same as far as we care
:param left:
:param right:
:return:
"""
compare_dirs = filecmp.dircmp(left, right)
# did the comparison report they are the same
assert len(compare_dirs.left_only) == 0
assert len(compare_dirs.right_only) == 0
assert len(compare_dirs.funny_files) == 0
def test_local_rawdata_no_tar(tmpdir_factory):
"""
Test that local delivery works without tar (the simplest case)
"""
temp_directory = str(tmpdir_factory.mktemp("test_basic_rawdata_no_tar"))
test_data_path = "../../../../shared/workspaces/test/test_data/spool/724126739/"
eb_name = "17A-109.sb33151331.eb33786546.57892.65940042824"
main(["-r", "-l", temp_directory, test_data_path])
# compare the source and destination
compare_dirs = filecmp.dircmp(
temp_directory + "/" + eb_name, (test_data_path + eb_name)
)
# did the comparison report they are the same
assert (
len(compare_dirs.left_only) == 0
and len(compare_dirs.right_only) == 0
and len(compare_dirs.funny_files) == 0
)
assert_directories_are_same(temp_directory + "/" + eb_name, (test_data_path + eb_name))
# ensure that we actually got a delivery file with the proper contents
with open("delivery.json", "r") as delivery_results_file:
results = json.load(delivery_results_file)
assert len(results.keys()) == 1
assert results["delivered_to"] == temp_directory
def test_local_rawdata_with_tar(tmpdir_factory):
temp_directory = str(tmpdir_factory.mktemp("test_basic_rawdata_with_tar"))
"""
Test that local delivery works with tar
"""
temp_directory = pathlib.Path(tmpdir_factory.mktemp("test_basic_rawdata_with_tar"))
test_data_path = "../../../../shared/workspaces/test/test_data/spool/724126739/"
main(["-r", "-t", "-l", temp_directory, test_data_path])
main(["-r", "-t", "-l", str(temp_directory), test_data_path])
eb_name = "17A-109.sb33151331.eb33786546.57892.65940042824"
tar_path = temp_directory + "/724126739.tar.gz"
tar_path = temp_directory / "17A-109.sb33151331.eb33786546.57892.65940042824.tar"
# does a tar exist where we think
assert os.path.exists(tar_path)
# is it the only thing there (did cleanup work)
assert len(os.listdir(temp_directory)) == 1
# is it actually a tar
assert tarfile.is_tarfile(tar_path)
# lets unpack it
shutil.unpack_archive(tar_path, temp_directory + "/extracted")
# did it output what we expect
assert os.path.exists(temp_directory + "/extracted/" + eb_name)
# compare the extracted results with the source
compare_dirs = filecmp.dircmp(
temp_directory + "/extracted/" + eb_name, (test_data_path + eb_name)
)
# is the source and extracted the same
assert (
len(compare_dirs.left_only) == 0
and len(compare_dirs.right_only) == 0
and len(compare_dirs.funny_files) == 0
)
assert tar_path.exists()
# do we only have it and the SHA1SUMS
assert len(list(temp_directory.iterdir())) == 2
verify_extracted_directory(eb_name, tar_path, temp_directory, test_data_path)
with open("delivery.json", "r") as delivery_results_file:
results = json.load(delivery_results_file)
assert len(results.keys()) == 1
assert results["delivered_to"] == str(temp_directory)
# @pytest.mark.skip(reason="Test needs more dev time")
def test_web_rawdata_no_tar(tmpdir_factory):
"""
Test that a directory in the source is copied to a directory in the destination in the manner expected.
Test that delivery works to a web destination without tar
"""
temp_directory = str(tmpdir_factory.mktemp("test_web_rawdata_no_tar"))
temp_directory = pathlib.Path(tmpdir_factory.mktemp("test_web_rawdata_no_tar"))
test_data_path = "../../../../shared/workspaces/test/test_data/spool/724126739/"
eb_name = "17A-109.sb33151331.eb33786546.57892.65940042824"
test_context = DeliveryContext.parse_commandline(["-r", test_data_path])
with patch("delivery.context.CapoConfig.settings") as mocked_capo_settings:
mocked_capo_settings.return_value.nraoDownloadDirectory = temp_directory
assert temp_directory == mocked_capo_settings().nraoDownloadDirectory
destination_path = Delivery().deliver(test_context)
with patch("delivery.destinations.sharedweb.CapoConfig.settings") as mocked_capo_settings:
mocked_capo_settings.return_value.nraoDownloadDirectory = str(temp_directory)
mocked_capo_settings.return_value.nraoDownloadUrl = "http://testing"
assert str(temp_directory) == mocked_capo_settings().nraoDownloadDirectory
results = Delivery().deliver(test_context)
# check the relationship between the delivery root and the URL
assert str(temp_directory / results["url"].lstrip("http://testing")) == results["delivered_to"]
actual_delivery_dir = pathlib.Path(results["delivered_to"])
# compare the source and destination
compare_dirs = filecmp.dircmp(
destination_path / eb_name, f"{test_data_path}{eb_name}"
)
# did the comparison report they are the same
assert (
len(compare_dirs.left_only) == 0
and len(compare_dirs.right_only) == 0
and len(compare_dirs.funny_files) == 0
)
assert_directories_are_same(actual_delivery_dir / eb_name, f"{test_data_path}{eb_name}")
def test_web_rawdata_with_tar(tmpdir_factory):
temp_directory = str(tmpdir_factory.mktemp("test_web_rawdata_with_tar"))
"""
Test that delivery works to a web destination with tar
"""
temp_directory = pathlib.Path(tmpdir_factory.mktemp("test_web_rawdata_with_tar"))
test_data_path = "../../../../shared/workspaces/test/test_data/spool/724126739/"
test_context = DeliveryContext.parse_commandline(["-r", "-t", test_data_path])
with patch("delivery.context.CapoConfig.settings") as mocked_capo_settings:
with patch("delivery.destinations.sharedweb.CapoConfig.settings") as mocked_capo_settings:
mocked_capo_settings.return_value.nraoDownloadDirectory = temp_directory
mocked_capo_settings.return_value.nraoDownloadUrl = "http://testing"
assert temp_directory == mocked_capo_settings().nraoDownloadDirectory
destination_path = Delivery().deliver(test_context)
results = Delivery().deliver(test_context)
eb_name = "17A-109.sb33151331.eb33786546.57892.65940042824"
tar_path = destination_path / "724126739.tar.gz"
# check the relationship between the delivery root and the URL
assert str(temp_directory / results["url"].lstrip("http://testing")) == results["delivered_to"]
actual_delivery_dir = pathlib.Path(results["delivered_to"])
# does a tar exist where we think
assert os.path.exists(tar_path)
tar_path = actual_delivery_dir / (eb_name + ".tar")
assert tar_path.exists()
# is it the only thing there (did cleanup work)
assert len(os.listdir(temp_directory)) == 1
# is it actually a tar
assert tarfile.is_tarfile(tar_path)
# lets unpack it
shutil.unpack_archive(tar_path, temp_directory + "/extracted")
# did it output what we expect
assert os.path.exists(temp_directory + "/extracted/" + eb_name)
# compare the extracted results with the source
compare_dirs = filecmp.dircmp(
temp_directory + "/extracted/" + eb_name, (test_data_path + eb_name)
)
# is the source and extracted the same
assert (
len(compare_dirs.left_only) == 0
and len(compare_dirs.right_only) == 0
and len(compare_dirs.funny_files) == 0
)
assert len(list(actual_delivery_dir.iterdir())) == 3
verify_extracted_directory(eb_name, tar_path, temp_directory, test_data_path)
......@@ -20,10 +20,10 @@ server {
}
location ~ ^/dl/(?:(.*))/(?:(.*)) {
alias /tmp/workspaces_tmp/$1/$2;
alias /var/www/$1/$2;
autoindex on;
autoindex_exact_size off;
autoindex_format html;
autoindex_localtime on;
}
}
\ No newline at end of file
}
......@@ -11,6 +11,7 @@ services:
volumes:
- ./apps/web/ws-nginx.local.conf:/etc/nginx/conf.d/default.conf
- ./tmp/:/tmp/workspaces_tmp
- ./delivery_root:/var/www
db:
image: marconi.aoc.nrao.edu/ops/ci/db:workspaces
restart: always
......@@ -50,6 +51,7 @@ services:
- ./apps:/packages/apps
- ./testing:/packages/testing
- ./tmp/:/tmp/workspaces_tmp
- ./delivery_root:/var/www
capability:
build:
......
......@@ -14,6 +14,9 @@ edu.nrao.archive.configuration.AmqpServer.port = 5672
#
edu.nrao.archive.workspaces.CapabilitySettings.serviceUrl = http://capability:3457
edu.nrao.archive.workspaces.WorkflowSettings.serviceUrl = http://workflow:3456
edu.nrao.archive.workspaces.NotificationSettings.serviceUrl = http://notification:3458
edu.nrao.archive.workspaces.UISettings.serviceUrl = http://localhost:4444/workspaces
#
# Processing Settings
......@@ -23,7 +26,8 @@ edu.nrao.archive.workspaces.ProcessingSettings.useCondor = false
#
# Delivery settings
#
edu.nrao.archive.workflow.config.DeliverySettings.nraoDownloadDirectory = /root/downloads
edu.nrao.archive.workflow.config.DeliverySettings.nraoDownloadDirectory = /var/www
edu.nrao.archive.workflow.config.DeliverySettings.nraoDownloadUrl = http://localhost:4444/dl
#
# Data fetcher settings
......
......@@ -25,5 +25,9 @@ env_setup:
%: Makefile env_setup
@$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O)
# regenerate the API documentation
regenerate:
./regenerate.sh
clean:
rm -rf $(BUILDDIR)