Skip to content
Snippets Groups Projects
Commit 7e0c087c authored by Daniel Lyons's avatar Daniel Lyons
Browse files

Considerable rework here:

 - New DestinationTempFile for writing tempfiles into the destination which eventually get added
 - All of the close() methods are now streaming
 - Implemented ChecksumDecorator
 - Implemented FetchFile decorator
parent 8fff450b
No related branches found
No related tags found
No related merge requests found
......@@ -58,7 +58,6 @@ class DeliveryContext(DeliveryContextIF):
"""
raise NotImplementedError
@property
def token(self) -> str:
"""
If a delivery only requires one token, just use this property
......@@ -88,15 +87,24 @@ class DeliveryContext(DeliveryContextIF):
def create_destination(self) -> Destination:
builder = DestinationBuilder(self)
# so the layer-cake has to be built in kind of reverse order because it's a stack
# at the bottom we want the physical destination, which is either local or web delivery
# first handle the local destination argument
if self.local_destination:
builder.local(self.local_destination)
else:
# FIXME: this is gross
settings = CapoConfig().settings("edu.nrao.archive.workflow.config.DeliverySettings")
builder.local(pathlib.Path(settings.nraoDownloadDirectory) / "anonymous" / self.token)
builder.web()
# always do a checksum at the end
builder.checksums()
# make a CURL file before that, but only if we're doing a web delivery
if not self.local_destination:
builder.curlfile()
# then handle the tar argument
# tar goes here, so that we generate a checksum for the tar file
# and not for all the files in the tar file
if self.tar:
builder.tar()
......@@ -160,7 +168,7 @@ class DeliveryContext(DeliveryContextIF):
return DeliveryContext(
source=ns.source,
tar=ns.tar,
local_destination=ns.local_destination,
local_destination=pathlib.Path(ns.local_destination) if ns.local_destination else None,
use_piperesults=ns.use_piperesults,
rawdata=ns.rawdata,
)
......@@ -4,10 +4,17 @@
#
# -------------------------------------------------------------------------
import abc
import hashlib
import os
import pathlib
import shutil
import tarfile
import tempfile
from abc import ABC, abstractmethod
from types import TracebackType
from typing import BinaryIO, Optional, Type, Iterator, AnyStr, Iterable
from pycapo import CapoConfig
class DeliveryContextIF(ABC):
......@@ -20,6 +27,37 @@ class DeliveryContextIF(ABC):
pass
class DestinationTempFile(ABC):
"""
A DestinationFile is a file that you can create in the destination. Initially a temporary file,
it will be added to the destination when you are finished with it. This is a way to create files
you can write to during delivery, which are still routed through the destination mechanism at the end.
"""
@abstractmethod
def close(self):
"""
Close the file (and add it to the destination at your desired path)
"""
pass
@abstractmethod
def file(self) -> BinaryIO:
"""
Access the raw file for writing
:return: a file for writing
"""
pass
@abstractmethod
def filename(self) -> str:
"""
Access the temporary path of this file during construction
:return: the path to the temporary file
"""
pass
class Destination(abc.ABC):
"""
Destinations are locations that files can be copied into. They might not
......@@ -28,21 +66,54 @@ class Destination(abc.ABC):
The destination API is very simply, consisting just of adding files.
"""
def __init__(self, context: DeliveryContextIF, path: pathlib.Path):
def __init__(self, context: DeliveryContextIF):
self.context = context
self.path = path
@abc.abstractmethod
def add_file(self, file: pathlib.Path, relative_path: str):
"""
Add a file to the destination at the given relative path.
:param file: the file (whose contents we are delivering)
:param relative_path: the relative path to that file (in the delivery root)
"""
pass
@abc.abstractmethod
def create_file(self, relative_path: str) -> DestinationTempFile:
"""
Create a file in the destination. When the file is closed, it will be added
to the destination via the add_file method at the specified relative path.
:param relative_path: the relative path where the file should eventually be placed
"""
pass
def add_directory(self, directory: pathlib.Path, relative_path: str):
"""
Add a directory and its contents recursively to the destination at the given path.
Do not override this method! add_file must be called for every file that gets delivered.
:param directory: the directory (whose files we will deliver)
:param relative_path: the relative path to this directory (in the delivery root)
"""
for entry in directory.iterdir():
if entry.is_file():
self.add_file(entry, relative_path + "/" + entry.name)
else:
self.add_directory(entry, relative_path + "/" + entry.name)
def close(self):
"""
Close the destination, signalling to this and possibly destinations in the stack
that we are finished adding new files to the destination.
"""
pass
@abc.abstractmethod
# For tar; may not be necessary
def close(self):
def result_url(self) -> str:
"""
Returns a URL to the results.
:return: URL pointing to the results
"""
pass
def __enter__(self):
......@@ -57,19 +128,67 @@ class Destination(abc.ABC):
class DestinationDecorator(Destination):
def __init__(self, underlying: Destination):
self.underlying = underlying
super().__init__(underlying.context, underlying.path)
super().__init__(underlying.context)
def add_file(self, file: pathlib.Path, relative_path: str):
self.underlying.add_file(file, relative_path)
def add_directory(self, directory: pathlib.Path, relative_path: str):
self.underlying.add_directory(directory, relative_path)
def create_file(self, relative_path: str) -> DestinationTempFile:
# so this is a bit of a hack, but there's only one thing that makes temporary files
# if we don't have the following lines of code, adding the tempfile at the end of processing
# goes directly to the LocalDestination rather than passing through all the layers of the layer cake
temporary_file = self.underlying.create_file(relative_path)
temporary_file.destination = self
return temporary_file
def close(self):
self.underlying.close()
def result_url(self) -> str:
return self.underlying.result_url()
class ChecksumDecorator(DestinationDecorator):
"""
Generate a SHA1SUMS file in the content root with an entry for every file that got delivered.
"""
def __init__(self, underlying: Destination):
super().__init__(underlying)
self.sumsfile = underlying.create_file("SHA1SUMS")
def add_file(self, file: pathlib.Path, relative_path: str):
# add the file to the archive the normal way
super().add_file(file, relative_path)
# generate the hash and keep it
self.sumsfile.file().writelines([self.hash_file_line(file, relative_path).encode("utf8")])
def hash_file_line(self, file: pathlib.Path, relative_path: str):
"""
Generate a line for the checksum file.
:param file: file to hash
:param relative_path: relative path to show in checksum file
:return: a line like "<hash> <relative_path>"
"""
return f"{self.hash_file(file)} {relative_path}\n"
@staticmethod
def hash_file(file: pathlib.Path):
# code courtesy of Stack Overflow: https://stackoverflow.com/a/59056796/812818
with open(file, "rb") as f:
file_hash = hashlib.sha1()
while chunk := f.read(8192):
file_hash.update(chunk)
return file_hash.hexdigest() # to get a printable str instead of bytes
def close(self):
self.sumsfile.close()
super().close()
class TarDecorator(DestinationDecorator):
class TarArchiver(DestinationDecorator):
"""
This decorator creates a local tar archive. Calls to add_file
are intercepted and instead the file contents are added to the
......@@ -80,75 +199,28 @@ class TarDecorator(DestinationDecorator):
def __init__(self, context: DeliveryContextIF, underlying: Destination):
super().__init__(underlying)
self.context = context
self.archive, self.archive_tempfile = None, None
def add_file(self, file: pathlib.Path, relative_path: str):
self.underlying.add_file(file, relative_path)
# ensure we have created the tar archive
self.ensure_archive_created(relative_path)
def add_directory(self, directory: pathlib.Path, relative_path: str):
self.underlying.add_directory(directory, relative_path)
# add the file at the offset
self.archive.add(file, relative_path)
def ensure_archive_created(self, relative_path: str):
# if we don't have the archive property yet, we must create the archive now
if not self.archive:
self.archive_tempfile = self.underlying.create_file(
relative_path.split("/")[0] + ".tar"
)
self.archive = tarfile.open(mode="w:", fileobj=self.archive_tempfile.file())
def close(self):
self.archive.close()
self.archive_tempfile.close()
self.underlying.close()
# I beleive this gives us the request id, but I may be wrong
# I also assume that request id is what we want, may not be true
tar_name = os.path.basename(os.path.normpath(self.context.source))
shutil.make_archive(
str(self.underlying.path) + "/" + tar_name, "gztar", self.underlying.path
)
# clear out the directory, leaving the tar
for filename in os.listdir(self.underlying.path):
file_path = os.path.join(self.underlying.path, filename)
try:
if filename != tar_name + ".tar.gz":
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
raise e
# print('Failed to delete %s. Reason: %s' % (file_path, e))
# class ChecksumDecorator(DestinationDecorator):
# """
# This decorator ensures that an MD5SUM file appears in the underlying
# destination after all the files are added, and the contents of that
# file are, as one would expect, the MD5SUMs of the files added to the
# destination, in the format that ``md5sum -c`` expects.
# """
#
# def __init__(self, context: "DeliveryContext", underlying: Destination):
# super().__init__(underlying)
# self.context = context
#
# def add_file(self, file: pathlib.Path, relative_path: str):
# self.underlying.add_file(file, relative_path)
#
# def add_directory(self, directory: pathlib.Path, relative_path: str):
# self.underlying.add_directory(directory, relative_path)
#
# def close(self):
# self.underlying.close()
# hasher = hashlib.md5()
# hasher = (fname, self.hash_bytestr_iter(self.file_as_blockiter(open(fname, 'rb')), hasher)) /
# for fname in glob.iglob(self.underlying.path + '**/**', recursive=True)
# checksum_file = open(self.underlying.path / "MD5SUMS", "w")
# checksum_file.write(hasher.hexdigest())
# checksum_file.close()
#
## the following is reported to be a memory efficient way to get md5 checksum
## https://stackoverflow.com/questions/3431825/generating-an-md5-checksum-of-a-file > post by Omnifarious
# def hash_bytestr_iter(bytesiter, hasher):
# for block in bytesiter:
# hasher.update(block)
# return hasher
#
# def file_as_blockiter(afile, blocksize=65536):
# with afile:
# block = afile.read(blocksize)
# while len(block) > 0:
# yield block
# block = afile.read(blocksize)
# class SubdirectoryDecorator(DestinationDecorator):
# def __init__(self, underlying: Destination, subdirectory: str):
......@@ -164,37 +236,91 @@ class LocalDestination(Destination):
LocalDestination is for delivering to a local directory on the filesystem.
"""
def __init__(self, context: DeliveryContextIF, path: pathlib.Path):
super().__init__(context)
self.path = path
def add_file(self, file: pathlib.Path, relative_path: str):
"""
Copy contents of file to new file with path relative_path; creates directories if they don't exis
Copy contents of file to new file with path relative_path; creates directories if they don't exist
:param file: Source file whose contents are getting copied
:param relative_path: Relative path to new file in destination
"""
try:
# if don't care about file metadata we could use copy()
shutil.copy2(file, relative_path)
except shutil.Error as err:
raise err
# ensure that the directory exist
if not (self.path / relative_path).parent.exists():
(self.path / relative_path).parent.mkdir(parents=True)
# now copy the file
# if we don't care about file metadata we could use copy() instead
shutil.copy2(file, self.path / relative_path)
def create_file(self, relative_path: str) -> DestinationTempFile:
# make a temporary file in the local destination
rawfile = tempfile.NamedTemporaryFile(
prefix=relative_path.split("/")[-1], dir=self.path, delete=False
)
return LocalDestinationTempFile(self, relative_path, rawfile)
def add_directory(self, directory: pathlib.Path, relative_path: str):
def result_url(self) -> str:
"""
Add each file in a directory to destination directory; creates directories if they don't exist
:param directory: Source directory whose files are getting copied
:param relative_path: Relative path to new directory location in destination
Return a file:/// URL for this location
"""
try:
shutil.copytree(directory, str(self.path) + "/" + relative_path, dirs_exist_ok=True)
except shutil.Error as err:
raise err
return pathlib.Path(self.path.absolute()).as_uri()
class LocalDestinationTempFile(DestinationTempFile):
def __init__(
self,
destination: LocalDestination,
relative_path: str,
tempfile: tempfile.NamedTemporaryFile,
):
self.destination = destination
self.relative_path = relative_path
self.tempfile = tempfile
def close(self):
"""
Nothing special actually needs to be done for local deliveries
when we close the destination.
"""
pass
self.tempfile.close()
self.destination.add_file(pathlib.Path(self.tempfile.name), self.relative_path)
os.unlink(self.tempfile.name)
def file(self) -> BinaryIO:
return self.tempfile
def filename(self) -> str:
return self.tempfile.name
class WgetFileGenerator(DestinationDecorator):
def __init__(self, underlying: Destination):
super().__init__(underlying)
self.fetch_script = underlying.create_file("fetch-all.sh")
self.fetch_script.file().writelines([b"#!/bin/sh\n", b"\n"])
def add_file(self, file: pathlib.Path, relative_path: str):
super().add_file(file, relative_path)
self.fetch_script.file().writelines(
[f"wget {self.underlying.result_url()}/{relative_path}\n".encode("utf8")]
)
def close(self):
self.fetch_script.close()
super().close()
class SharedWebDestination(DestinationDecorator):
def __init__(self, context: DeliveryContextIF):
settings = CapoConfig().settings("edu.nrao.archive.workflow.config.DeliverySettings")
ld = LocalDestination(
context,
pathlib.Path(settings.nraoDownloadDirectory) / "anonymous" / context.token(),
)
super().__init__(ld)
self.download_url = f"{settings.nraoDownloadUrl}/anonymous/{context.token}"
def result_url(self) -> str:
return self.download_url
class DestinationBuilder:
......@@ -211,9 +337,21 @@ class DestinationBuilder:
self._destination = LocalDestination(self.context, path)
return self
def web(self):
self._destination = SharedWebDestination(self.context)
return self
def checksums(self):
self._destination = ChecksumDecorator(self._destination)
return self
def curlfile(self):
self._destination = WgetFileGenerator(self._destination)
return self
def tar(self):
"""Add the tar decorator to the destination"""
self._destination = TarDecorator(self.context, self._destination)
self._destination = TarArchiver(self.context, self._destination)
return self
def build(self):
......
......@@ -40,16 +40,13 @@ class Delivery:
# the last thing we should do is return the URL to the delivered stuff.
# for a local delivery, this will be a file URL; otherwise it will be an http URL.
return destination.path
return destination.result_url()
def main(args=None):
"""CLI entry point"""
context = DeliveryContext.parse_commandline(args)
try:
print(Delivery().deliver(context))
except Exception as err:
print("ERROR: " + str(err))
print(Delivery().deliver(context))
if __name__ == "__main__":
......
import filecmp
import pathlib
from os import path
import pytest
from delivery.deliverer import LocalDestination
def test_local_delivery_add_file(tmpdir):
"""Ensure that local delivery does something"""
@pytest.fixture
def dest_dir(tmpdir) -> pathlib.Path:
# generate the destination directory
dest_dir = pathlib.Path(tmpdir) / "after"
dest_dir.mkdir()
return dest_dir
@pytest.fixture
def file_to_deliver(tmpdir) -> pathlib.Path:
# rewrap the tmpdir as a pathlib.Path instance
tmpdir = pathlib.Path(tmpdir)
# generate the test file in prior
(tmpdir / "prior").mkdir()
file_name = "test.txt"
temp_file = tmpdir.mkdir("prior").join(file_name) # place a file into the source
temp_file = tmpdir / "prior" / file_name # place a file into the source
temp_file.write_text("content", encoding=None) # write something into that file
dest_dir = tmpdir.mkdir("after") # create a destination
# secretly generate a second file
temp_file2 = tmpdir / "prior" / "test2.txt" # make another file in source
temp_file2.write_text("more content", encoding=None) # add some content to the second file
return temp_file
def test_local_delivery_add_file(tmpdir, file_to_deliver: pathlib.Path, dest_dir: pathlib.Path):
"""
Ensure that local delivery works
"""
# generate the destination
local_dest = LocalDestination(None, dest_dir)
local_dest.add_file(temp_file, str(dest_dir / file_name))
# add the file to it
local_dest.add_file(file_to_deliver, file_to_deliver.name)
# see if the source file eneded up in the destination
assert path.exists(dest_dir / file_name)
assert (dest_dir / file_to_deliver.name).exists()
# see if the content of the file is intact
assert (dest_dir / file_name).read_text(encoding=None) == "content"
assert (dest_dir / file_to_deliver.name).read_text(encoding=None) == "content"
def test_local_delivery_add_directory(tmpdir):
def test_local_delivery_add_directory(
tmpdir: pathlib.Path, file_to_deliver: pathlib.Path, dest_dir: pathlib.Path
):
"""Ensure that local delivery does something"""
source_dir = tmpdir.mkdir("prior") # create a source
temp_file = source_dir.join("test.txt") # add a file to the source dir
temp_file.write_text("content", encoding=None) # add some content to the file
temp_file2 = source_dir.join("test2.txt") # make another file in source
temp_file2.write_text("more content", encoding=None) # add some content to the second file
local_dest = LocalDestination(None, str(tmpdir))
local_dest.add_directory(source_dir, "after") # destination is defined here
compare_dirs = filecmp.dircmp(source_dir, tmpdir / "after")
tmpdir = pathlib.Path(tmpdir)
local_dest = LocalDestination(None, dest_dir)
local_dest.add_directory(file_to_deliver.parent, file_to_deliver.parent.name)
compare_dirs = filecmp.dircmp(file_to_deliver.parent, dest_dir / "prior")
# see if the destination got all the files from source
assert (
len(compare_dirs.left_only) == 0
......
......@@ -19,15 +19,11 @@ def test_local_rawdata_no_tar(tmpdir_factory):
eb_name = "17A-109.sb33151331.eb33786546.57892.65940042824"
main(["-r", "-l", temp_directory, test_data_path])
# compare the source and destination
compare_dirs = filecmp.dircmp(
temp_directory + "/" + eb_name, (test_data_path + eb_name)
)
compare_dirs = filecmp.dircmp(temp_directory + "/" + eb_name, (test_data_path + eb_name))
# did the comparison report they are the same
assert (
len(compare_dirs.left_only) == 0
and len(compare_dirs.right_only) == 0
and len(compare_dirs.funny_files) == 0
)
assert len(compare_dirs.left_only) == 0
assert len(compare_dirs.right_only) == 0
assert len(compare_dirs.funny_files) == 0
def test_local_rawdata_with_tar(tmpdir_factory):
......@@ -35,27 +31,31 @@ def test_local_rawdata_with_tar(tmpdir_factory):
test_data_path = "../../../../shared/workspaces/test/test_data/spool/724126739/"
main(["-r", "-t", "-l", temp_directory, test_data_path])
eb_name = "17A-109.sb33151331.eb33786546.57892.65940042824"
tar_path = temp_directory + "/724126739.tar.gz"
tar_path = temp_directory + "/17A-109.sb33151331.eb33786546.57892.65940042824.tar"
# does a tar exist where we think
assert os.path.exists(tar_path)
# is it the only thing there (did cleanup work)
assert len(os.listdir(temp_directory)) == 1
# do we only have it and the SHA1SUMS
assert len(os.listdir(temp_directory)) == 2
# is it actually a tar
assert tarfile.is_tarfile(tar_path)
# lets unpack it
shutil.unpack_archive(tar_path, temp_directory + "/extracted")
# did it output what we expect
assert os.path.exists(temp_directory + "/extracted/" + eb_name)
# compare the extracted results with the source
compare_dirs = filecmp.dircmp(
temp_directory + "/extracted/" + eb_name, (test_data_path + eb_name)
)
# is the source and extracted the same
assert (
len(compare_dirs.left_only) == 0
and len(compare_dirs.right_only) == 0
and len(compare_dirs.funny_files) == 0
)
assert len(compare_dirs.left_only) == 0
assert len(compare_dirs.right_only) == 0
assert len(compare_dirs.funny_files) == 0
# @pytest.mark.skip(reason="Test needs more dev time")
......@@ -72,15 +72,11 @@ def test_web_rawdata_no_tar(tmpdir_factory):
assert temp_directory == mocked_capo_settings().nraoDownloadDirectory
destination_path = Delivery().deliver(test_context)
# compare the source and destination
compare_dirs = filecmp.dircmp(
destination_path / eb_name, f"{test_data_path}{eb_name}"
)
compare_dirs = filecmp.dircmp(destination_path / eb_name, f"{test_data_path}{eb_name}")
# did the comparison report they are the same
assert (
len(compare_dirs.left_only) == 0
and len(compare_dirs.right_only) == 0
and len(compare_dirs.funny_files) == 0
)
assert len(compare_dirs.left_only) == 0
assert len(compare_dirs.right_only) == 0
assert len(compare_dirs.funny_files) == 0
def test_web_rawdata_with_tar(tmpdir_factory):
......@@ -108,8 +104,6 @@ def test_web_rawdata_with_tar(tmpdir_factory):
temp_directory + "/extracted/" + eb_name, (test_data_path + eb_name)
)
# is the source and extracted the same
assert (
len(compare_dirs.left_only) == 0
and len(compare_dirs.right_only) == 0
and len(compare_dirs.funny_files) == 0
)
assert len(compare_dirs.left_only) == 0
assert len(compare_dirs.right_only) == 0
assert len(compare_dirs.funny_files) == 0
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment