Skip to content
Snippets Groups Projects
Commit c31d7009 authored by Janet Goldstein's avatar Janet Goldstein
Browse files

Updated documentation on my contributions to the system (minus build

system since I figure that's on its way out the door)
parent fc442f36
No related branches found
No related tags found
1 merge request!2Ssa 6727 package tester rebased
Pipeline #241 failed
Showing
with 610 additions and 121 deletions
......@@ -63,4 +63,5 @@ build/pkgs
**/condor.log
services/capability/capability.log
services/capability/workflow.log
services/workflow/workflow.log
\ No newline at end of file
services/workflow/workflow.log
**/.coverage
\ No newline at end of file
steps:
- pydoc-markdown --build --site-dir $SITE_DIR
# .readthedocs.yml
# Read the Docs configuration file
# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details
# Required
version: 0.1
# Build documentation in the docs/ directory with Sphinx
sphinx:
configuration: docs/conf.py
# Optionally set the version of Python and requirements required to build your docs
version: 2
mkdocs: {} # tell readthedocs to use mkdocs
python:
version: 3.7
version: 3.8
install:
- requirements: docs/requirements.txt
\ No newline at end of file
- method: pip
path: .
- requirements: .readthedocs-requirements.txt
......@@ -10,12 +10,31 @@ import sys
import tempfile
from pathlib import Path
sys.path.insert(0, str(Path('.').absolute()))
sys.path.insert(0, str(Path('..').absolute()))
# TODO: Some Fine Day: this duplicates same function in package tester.
# CAVEAT PROGRAMMOR: attempts to centralize it have resulted in tears.
def get_project_root() -> Path:
"""
Get the root of this project.
:return:
"""
my_path = Path(__file__)
path = my_path
while not path.name.endswith('data') and not path.name.endswith('code'):
path = path.parent
return path
import pytest
from pycapo import CapoConfig
# pylint: disable=C0115, C0116, C0200, R0902, R0903, R0914, R1721, W0212, W0613, W0621, W0703, W1203
sys.path.insert(0, str(get_project_root()))
from shared.workspaces.test.utilities import get_locations_report, \
get_test_data_dir
......@@ -347,3 +366,4 @@ def confirm_retrieve_mode_copy(servers_report: dict) -> None:
for server in servers_report:
entry = servers_report[server]
assert entry['retrieve_method'].value == RetrievalMode.COPY.value
......@@ -3,6 +3,12 @@
from pathlib import Path
import pytest
import sys
sys.path.insert(0, str(Path('.').absolute()))
from .df_pytest_utils import get_project_root
project_root = get_project_root()
sys.path.insert(0, str(project_root))
# pylint: disable=C0115, C0116, C0200, C0415, R0801, R0902, R0903, R0914, R1721, W0212, W0611, W0613, W0621, W0703, W1203
......
......@@ -2,9 +2,9 @@
import os
import json
import pika
import pycapo
import sys
from channels.amqp_helpers import Channel, CapabilityEventChannel
def epilogue():
......@@ -28,12 +28,12 @@ def epilogue():
password = capo.getstring('edu.nrao.archive.configuration.AmqpServer.password')
# connect
credentials = pika.PlainCredentials(username, password)
conn = pika.BlockingConnection(
pika.ConnectionParameters(hostname, credentials=credentials))
channel = conn.channel()
#credentials = pika.PlainCredentials(username, password)
#conn = pika.BlockingConnection(
# pika.ConnectionParameters(hostname, credentials=credentials))
#channel = conn.channel()
with Channel() as channel:
try:
# make the exchange we need
channel.exchange_declare('archive.job-status', 'topic', durable=True)
......@@ -46,6 +46,3 @@ def epilogue():
# dispatch it
channel.basic_publish(exchange='archive.job-status', routing_key=job_id, body=message)
finally:
channel.close()
conn.close()
\ No newline at end of file
......@@ -7,8 +7,9 @@ from setuptools import setup
VERSION = open('src/null/_version.py').readlines()[-1].split()[-1].strip("\"'")
README = Path('README.md').read_text()
tests_require = [
'pytest>=5.4,<6.0'
requires = [
'pytest>=5.4,<6.0',
'pytest-mock==3.3.1',
]
setup(
name='ssa-' + Path().absolute().name,
......@@ -19,7 +20,7 @@ setup(
author_email='dms-ssa@nrao.edu',
url='TBD',
license="GPL",
tests_require=tests_require,
install_requires=requires,
keywords=[],
packages=['null'],
package_dir={'': 'src'},
......
""" Module for the null executable. Performs some very basic actions
and utilizes pymygdala's LogHandler for logging. """
"""
This is the null executable, a baseline test of the functionality of the Workspaces system.
"""
import os
import sys
......@@ -10,20 +11,31 @@ import argparse
from ._version import ___version___ as version
_DESCRIPTION = """Workspaces null executable, a status capture test of the system. Version {}"""
_DESCRIPTION = """Workspaces null executable, a status capture test of the Workspaces system. Version {}"""
ERRORS = {
1: "ERROR: An error has been purposefully induced.",
2: "ERROR: Unable to breach the mainframe.",
3: "ERROR: Your standard run-of-the-mill error.",
4: "ERROR: Very special and specific heisenbug-caused error that will take several days and several programmers to debug.",
5: "ERROR: [insert hilarious error joke here]",
}
logger = logging.getLogger("null")
logger.setLevel(logging.INFO)
handler = logging.StreamHandler(stream=sys.stdout)
def change_to_error_handler():
logger.removeHandler(handler)
err_handler = logging.StreamHandler(stream=sys.stderr)
logger.addHandler(err_handler)
def print_error():
"""
Logs an error message to stderr.
"""
logger.removeHandler(handler)
err_handler = logging.StreamHandler(stream=sys.stderr)
logger.addHandler(err_handler)
change_to_error_handler()
logger.error("ERROR: This is an error.")
......@@ -37,18 +49,20 @@ def print_greeting():
def exit_with_failure():
"""
Exits with status code -1
Exits with status code 1
"""
logger.error("Error purposefully induced. Exiting with status code -1...")
sys.exit(-1)
change_to_error_handler()
logger.error(ERRORS[1])
sys.exit(1)
def exit_randomly():
"""
Exits with a random status code between -50 and 50
"""
status_code = random.randint(-50, 50)
logger.debug("Exiting with status code {}".format(status_code))
change_to_error_handler()
status_code = random.choice([*ERRORS.keys()])
logger.error("Exiting with status code {}: {}".format(status_code, ERRORS[status_code]))
sys.exit(status_code)
......@@ -73,17 +87,18 @@ class Null:
"""
Null executable that executes null functionality based on arguments given
"""
def __init__(self, args: argparse.Namespace, verbose: bool):
self.args = args
if verbose:
logger.setLevel(logging.DEBUG)
self.args_to_funcs = {
'print-error': print_error,
'greeting': print_greeting,
'exit-fail': exit_with_failure,
'exit-random': exit_randomly,
'nap': take_nap,
'dump': dump_core
"print_error": print_error,
"greeting": print_greeting,
"exit_fail": exit_with_failure,
"exit_random": exit_randomly,
"nap": take_nap,
"dump": dump_core,
}
def execute(self):
......@@ -98,34 +113,81 @@ class Null:
def make_arg_parser() -> argparse.ArgumentParser:
"""
Creates an argparse arguments parser with appropriate options
:return: Said argument parser
"""
parser = argparse.ArgumentParser(description=_DESCRIPTION.format(version),
formatter_class=argparse.RawTextHelpFormatter)
options = parser.add_argument_group('options', 'settings for altering program behavior')
options.add_argument('-v', '--verbose', action='store_true',
required=False, dest='verbose', default=False,
help='allow the program the gift of speech')
parser = argparse.ArgumentParser(
description=_DESCRIPTION.format(version),
formatter_class=argparse.RawTextHelpFormatter,
)
options = parser.add_argument_group(
"options", "settings for altering program behavior"
)
options.add_argument(
"-v",
"--verbose",
action="store_true",
required=False,
dest="verbose",
default=False,
help="allow the program the gift of speech",
)
functions = parser.add_mutually_exclusive_group(required=False)
functions.add_argument('-pe', '--print-error', action='store_true',
required=False, dest='print-error', default=False,
help='print out aggressive message to stderr')
functions.add_argument('-g', '--greeting', action='store_true',
required=False, dest='greeting', default=False,
help='print out a friendly greeting to stdout')
functions.add_argument('-ef', '--exit-fail', action='store_true',
required=False, dest='exit-fail', default=False,
help='print error message and exit with status code -1')
functions.add_argument('-er', '--exit-random', action='store_true',
required=False, dest='exit-random', default=False,
help='print error message and exit with random status code within [-50, 50]')
functions.add_argument('-n', '--nap', action='store_true',
required=False, dest='nap', default=False,
help='take a short nap')
functions.add_argument('-d', '--dump', action='store_true',
required=False, dest='dump', default=False,
help='abort program and dump core')
functions.add_argument(
"-pe",
"--print-error",
action="store_true",
required=False,
dest="print_error",
default=False,
help="print out aggressive message to stderr",
)
functions.add_argument(
"-g",
"--greeting",
action="store_true",
required=False,
dest="greeting",
default=False,
help="print out a friendly greeting to stdout",
)
functions.add_argument(
"-ef",
"--exit-fail",
action="store_true",
required=False,
dest="exit_fail",
default=False,
help="print error message and exit with status code -1",
)
functions.add_argument(
"-er",
"--exit-random",
action="store_true",
required=False,
dest="exit_random",
default=False,
help="print error message and exit with random status code within [-50, 50]",
)
functions.add_argument(
"-n",
"--nap",
action="store_true",
required=False,
dest="nap",
default=False,
help="take a short nap",
)
functions.add_argument(
"-d",
"--dump",
action="store_true",
required=False,
dest="dump",
default=False,
help="abort program and dump core",
)
return parser
......@@ -138,5 +200,5 @@ def main():
executable.execute()
if __name__ == '__main__':
if __name__ == "__main__":
main()
"""
Testing suite for the null executable.
"""
import pytest
import argparse
from null.null import Null
from pytest_mock import MockerFixture
from null.null import Null, ERRORS
@pytest.fixture()
def null():
null = Null(argparse.Namespace(), True)
return null
@pytest.fixture
def args():
return argparse.Namespace(
dump=False, exit_fail=False, exit_random=False, greeting=False, nap=False, print_error=False, verbose=False
)
def test_print_error(null, caplog):
null.print_error()
assert 'ERROR: This is an error.' in caplog.text
def test_print_error(args, caplog):
args.print_error = True
Null(args, True).execute()
assert "ERROR: This is an error." in caplog.text
def test_print_greeting(null, caplog):
null.print_greeting()
assert 'Hello, world!' in caplog.text
assert 'And goodbye, world...' in caplog.text
def test_print_greeting(args, caplog):
args.greeting = True
Null(args, True).execute()
assert "Hello, world!" in caplog.text
assert "And goodbye, world..." in caplog.text
def test_exit_with_failure(null, caplog):
with pytest.raises(SystemExit) as e:
null.exit_with_failure()
assert 'Error purposefully induced. Exiting with status code -1...' in caplog.text
assert e.value.code == -1
def test_take_nap(args, caplog, mocker):
mocker.patch("time.sleep", return_value=None)
args.nap = True
Null(args, True).execute()
assert "Going to sleep..." in caplog.text
assert "Waking up." in caplog.text
def test_exit_randomly(null, caplog):
def test_exit_randomly(args, caplog):
args.exit_random = True
with pytest.raises(SystemExit) as e:
null.exit_randomly()
assert 'Exiting with status code' in caplog.text
assert -50 <= e.value.code <= 50
Null(args, True).execute()
assert "Exiting with status code" in caplog.text
assert e.value.code in ERRORS.keys()
def test_take_nap(null, caplog):
null.take_nap()
assert 'Going to sleep...' in caplog.text
assert 'Waking up.' in caplog.text
def test_exit_with_failure(args, caplog):
args.exit_fail = True
with pytest.raises(SystemExit) as e:
Null(args, True).execute()
assert ERRORS[1] in caplog.text
assert e.value.code == 1
# tmpdir_eraser
```
usage: tmpdir_eraser [-h] [-p PATH_TO_DIRS]
Temporary directory eraser. Removes directories created by executing workflows that are no longer in use.
Defaults to checking $(HOME)/workspaces_tmp for temporary dirs.
optional arguments:
-h, --help show this help message and exit
-p PATH_TO_DIRS, --path-to-dirs PATH_TO_DIRS
path to Workspaces temp directories
```
\ No newline at end of file
#!/usr/bin/python
# -*- coding: utf-8 -*-
from pathlib import Path
from setuptools import setup
VERSION = open('src/tmpdir_eraser/_version.py').readlines()[-1].split()[-1].strip("\"'")
README = Path('README.md').read_text()
requires = [
"sqlalchemy",
"ssa-workspaces",
"pycapo",
]
setup(
name='ssa-' + Path().absolute().name,
version=VERSION,
description='Eraser for temporary directories created by workflow executions.',
long_description=README,
author='NRAO SSA Team',
author_email='dms-ssa@nrao.edu',
url='TBD',
license="GPL",
install_requires=requires,
keywords=[],
packages=['tmpdir_eraser'],
package_dir={'': 'src'},
classifiers=[
'Programming Language :: Python :: 3.8'
],
entry_points={
'console_scripts': ['tmpdir_eraser = tmpdir_eraser.tmpdir_eraser:main']
},
)
""" Version information for this package, don't put anything else here. """
___version___ = '4.0.0a1.dev1'
import os
import sys
import shutil
import logging
import argparse
from pathlib import Path
from typing import List, Iterable
from pycapo import CapoConfig
from sqlalchemy import create_engine
from sqlalchemy.exc import ArgumentError
from sqlalchemy.orm import sessionmaker, Session
from workspaces.schema import WorkflowRequest
_DESCRIPTION = """
Temporary directory eraser. Removes directories created by executing workflows that are no longer in use.
Defaults to checking $(HOME)/workspaces_tmp for temporary dirs.
"""
logger = logging.getLogger("tmpdir_eraser")
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(stream=sys.stdout))
def get_database_url() -> str:
"""
Get URL for Workspaces database
:return: Database URL
"""
capo = CapoConfig().settings("metadataDatabase")
url = capo.jdbcUrl.replace("jdbc:", "").replace(
"://", f"://{capo.jdbcUsername}:{capo.jdbcPassword}@"
)
return url
def connect_to_database(db_url: str = get_database_url()) -> Session:
"""
Connect to Workspaces database
:param db_url: URL of the database
:return: Database session
"""
try:
engine = create_engine(db_url)
except ArgumentError as e:
logger.error(f"ERROR: Invalid database URL {db_url}.")
sys.exit(1)
session = sessionmaker(bind=engine)
return session()
def query_for_tmpdirs(session: Session) -> List[str]:
"""
Query the database for tmpdirs that are still in use by their WorkflowRequests
:param session: Database session
:return: List of dirs still in use
"""
dirs = session.query(WorkflowRequest.results_dir).all()
session.close()
return dirs
def list_difference(l1: Iterable, l2: Iterable) -> Iterable:
"""
Return the contents of l1 with the contents of l2 removed from it
:param l1: List to subtract from
:param l2: List of items to subtract from l1
:return: Subtracted list
"""
return [item for item in l1 if item not in l2]
def get_unused_dirs(
path_to_tmpdirs: str = str(Path.home()) + "/workspaces_tmp",
) -> List[str]:
"""
Get dirs that are not used by any WorkflowRequests
:param path_to_tmpdirs: Path to temporary directories
:return: List of unused dirs
"""
all_dirs = os.listdir(path_to_tmpdirs)
used_dirs = [
d[0].replace(path_to_tmpdirs + "/", "")
for d in query_for_tmpdirs(connect_to_database())
]
# Remove used dirs from list of all dirs
unused_dirs = list_difference(all_dirs, used_dirs)
return [f"{path_to_tmpdirs}/{d}" for d in unused_dirs]
def erase_dirs(dirs_to_erase: List[str]):
"""
Erase given directories, operating-system-willing
:param dirs_to_erase: Directories to be erased
:raises OSError: If dir is not found or otherwise not able to be deleted
"""
for d in dirs_to_erase:
try:
shutil.rmtree(d, ignore_errors=False, onerror=None)
except OSError as e:
# Print error but continue trying to erase
logger.error(e)
def make_arg_parser():
parser = argparse.ArgumentParser(
"tmpdir_eraser",
description=_DESCRIPTION,
formatter_class=argparse.RawTextHelpFormatter,
)
parser.set_defaults(func=get_unused_dirs)
parser.add_argument(
"-p",
"--path-to-dirs",
nargs=1,
type=str,
required=False,
help="path to Workspaces temp directories",
)
return parser
def main():
args = make_arg_parser().parse_args()
if args.path_to_dirs:
dirs = get_unused_dirs(args.path_to_dirs[0])
else:
dirs = get_unused_dirs()
erase_dirs(dirs)
if __name__ == "__main__":
main()
import os
from typing import List
import pytest
from tmpdir_eraser.tmpdir_eraser import *
def mock_get_unused_dirs() -> List[str]:
"""
Function that mocks tmpdir_eraser.get_unused_dirs()
:return: Static list of throwaway directories
"""
dirs = ["test/test1", "test/test2", "test/test3"]
for d in dirs:
try:
os.makedirs(d)
except FileExistsError:
pass
return dirs
def test_erase_dirs():
"""
Tests that the tool correctly deletes directories
"""
dirs_to_erase = mock_get_unused_dirs()
erase_dirs(dirs_to_erase)
for d in dirs_to_erase:
assert not os.path.exists(d)
def test_erase_dirs_error(caplog):
"""
Tests that erase_dirs correctly errors out when attempting to erase a non-existant directory
"""
erase_dirs(["dir_that_doesnt_exist"])
assert ("[Errno 2] No such file or directory" in record for record in caplog.records)
def test_get_database_url():
"""
Tests that the database URL is well-formed
"""
assert [s in get_database_url() for s in ["postgresql", "nrao.edu/archive"]]
def test_query_for_tmpdirs():
"""
Tests that the database is properly queried for tmpdirs
"""
query_for_tmpdirs(connect_to_database())
def test_get_unused_dirs():
"""
Tests that the function finds all unused dirs
"""
dirs = mock_get_unused_dirs()
assert (d in get_unused_dirs("test") for d in dirs)
erase_dirs(dirs)
def test_list_difference():
"""
Tests that list_difference correctly calculates the difference between two lists
"""
assert list_difference([1, 2, 3], [2, 3]) == [1]
assert list_difference([3], []) == [3]
assert list_difference([], [1, 2, 3]) == []
assert list_difference([1], [1, 2, 3]) == []
assert list_difference(["hello", "there"], [1, 2, 3]) == ["hello", "there"]
def test_connect_to_database(caplog):
"""
Tests that the DB connection is well-configured
"""
with pytest.raises(SystemExit):
connect_to_database("invalid_url")
assert ("Invalid database URL" in record for record in caplog.records)
# Vulture
Vulture is a fake HTCondor. It doesn't submit jobs to a cluster, instead it runs them locally.
Vulture is a fake HTCondor. It doesn't submit jobs to a cluster, instead it runs them locally. This tool is intended to bypass HTCondor for integration testing purposes.
This tool is intended to bypass HTCondor for integration testing purposes.
```
usage: vulture [-h] {dag,job,spy} ...
Tool to run HTCondor job specs and DAG workflows locally
positional arguments:
{dag,job,spy}
dag DAG operations
job Job operations
spy Spy on executions
optional arguments:
-h, --help show this help message and exit
```
\ No newline at end of file
"""
Business logic for Vulture's mock logging, jobs, and dags
"""
import logging
import re
from contextlib import contextmanager
from pprint import pprint
import pendulum
import subprocess
from pathlib import Path
from typing import NamedTuple, List, Dict
from collections import namedtuple
import sys
from workspaces.schema import WorkflowEventType
......@@ -13,17 +22,54 @@ class MockLogger:
Class representing a logger for Vulture that will simulate HTCondor logs
"""
def __init__(self, log_path: Path):
def __init__(self, log_path: Path, out: str, err: str):
self.out = out
self.err = err
self.logger = logging.getLogger("vulture")
self.logger.setLevel(logging.INFO)
self.logger.addHandler(logging.FileHandler(f"{log_path}", mode="a+"))
self.def_handler = logging.FileHandler(f"{log_path}", mode="a+")
self.logger.addHandler(self.def_handler)
@contextmanager
def switch_logger_file(self, log_file: str):
"""
Switches logger FileHandler to write to a new file; operates as a context manager
:param log_file: File name of the log you want to switch to
"""
new_handler = logging.FileHandler(log_file, mode="w+")
self.logger.removeHandler(self.def_handler)
self.logger.addHandler(new_handler)
yield
self.logger.removeHandler(new_handler)
self.logger.addHandler(self.def_handler)
def write_out(self, which_out: str, output: str):
"""
Writes output to an auxiliary log file (any file that is not the main condor.log)
:param which_out: Which log is to be written to (out/error)
:param output: Contents of the file
:raises ValueError: If which_out is not a supported log type
"""
if which_out == "out":
with self.switch_logger_file(self.out):
self.logger.info(output)
elif which_out == "err":
with self.switch_logger_file(self.err):
self.logger.info(output)
else:
# If adding new logging option, add to supported list
supported = ["out", "err"]
raise ValueError(f"Output '{which_out}' not supported. Use [{'|'.join(supported)}].")
def log_event(self, event_type: WorkflowEventType, event_body: str = None):
"""
Log a mock HTCondor event to the log file
:param event_type: Type of the event
:param event_body: Body of the event log (can be empty)
:return:
"""
header = self.create_header(event_type)
# Indent each line of the body
......@@ -42,10 +88,13 @@ class MockLogger:
def create_header(event_type: WorkflowEventType) -> str:
"""
Create mock HTCondor event log header
:param event_type: Type of event being logged
:return: The header as a string
EXAMPLE:
000 (3983.000.000) 08/26 11:06:06 Job submitted from host: <10.64.1.178:9618?addrs=10.64.1.178-9618&noUDP&sock=4050180_7e37_3>
:param event_type: Type of event being logged
:raises ValueError: If event_type is not yet supported
:return: The header as a string
"""
mock_host = "<0.0.0.0:7777>"
event_descs = {
......@@ -58,10 +107,18 @@ class MockLogger:
# TODO: Add calculable job number
job_num = f"({4242:04d}.000.000)"
timestamp = f"{pendulum.now().format('MM/DD HH:mm:ss')}"
event_desc = f"{event_descs[event_type]}"
if event_type in event_descs.keys():
event_desc = f"{event_descs[event_type]}"
else:
# If this gets triggered, add event type to event_descs dict above
raise ValueError(f"Event type {event_type} not supported.")
return " ".join([event_num, job_num, timestamp, event_desc])
class MalformedJobFile(ValueError):
pass
class Job:
"""
Class representing a faked HTCondor job
......@@ -71,33 +128,56 @@ class Job:
self.fields, self.command = self.parse(file)
self.write_log = write_log
if self.write_log:
self.logger = MockLogger(self.fields.get("log", "./condor.log"))
self.logger.log_event(
WorkflowEventType.SUBMITTED, f"Job node: {self.fields['executable']}"
)
if self.fields.get("executable"):
# Collect logging file names from job file if they exist; else give them sane defaults
exec_name = self.fields["executable"]
self.logger = MockLogger(
log_path=self.fields.get("log", "./condor.log"),
out=self.fields.get("output", f"{exec_name}.out"),
err=self.fields.get("error", f"{exec_name}.error"),
)
# Job is submitted, write an event to signify that
self.logger.log_event(
WorkflowEventType.SUBMITTED, f"Job node: {self.fields['executable']}"
)
else:
raise MalformedJobFile(f"Condor job file {file} malformed. No executable found in fields {self.fields}.")
def parse(self, file: Path) -> NamedTuple:
"""
Parse condor job file for relevant fields
TODO: Make parsing more robust
:param file: Path to job file
:return: NamedTuple of job data
"""
# Parse file looking for executable and arguments
JobData = namedtuple("JobData", ["field_data", "command"])
section_sep = "\n\n"
field_sep = "\n"
with open(file, "r") as f:
# Split file into data and action sections
data, command = f.read().split(section_sep)
fields = data.split(field_sep)
fields = []
try:
with open(file, "r") as f:
# Split file into fields and command
for line in f.readlines():
if re.search(r" = ", line):
fields.append(line.rstrip())
else:
command = line.rstrip()
except IOError as e:
print(f"Could not open/read '{file}'.")
sys.exit(1)
if len(fields) > 0:
field_dict = self.parse_fields(fields)
return JobData(field_dict, command)
else:
raise MalformedJobFile(f"Condor job file '{file}' is malformed. No fields found.")
return JobData(field_dict, command)
@staticmethod
def parse_fields(fields: List[str]) -> Dict[str, str]:
"""
Take in list of fields parsed from input file and return them as a dictionary
:param fields: List of fields
:return: Dictionary of fields
"""
......@@ -108,21 +188,43 @@ class Job:
return field_dict
def execute(self):
"""
Execute given process with given arguments
TODO: Buff return code processing
"""
result_strings = {
0: "Normal termination (return value 0)",
# This is 255 instead of -1 because that's what subprocess returns instead of -1
255: "Error in execution (return value -1)",
}
capture_output = False
if self.write_log:
self.logger.log_event(WorkflowEventType.EXECUTING)
process = subprocess.run([self.fields["executable"], self.fields["arguments"]])
exec_results = f"{result_strings.get(process.returncode, f'Something weird happened (return value {process.returncode})')}"
capture_output = True
# Execute
if self.fields.get("executable"):
process = subprocess.run(
[self.fields["executable"], self.fields["arguments"]],
capture_output=capture_output,
text=True,
)
else:
raise MalformedJobFile(f"Condor job file malformed. No executable found in fields {self.fields}.")
exec_results = f"{result_strings.get(process.returncode, f'Something weird happened (return value {process.returncode})')}"
if self.write_log:
self.logger.log_event(WorkflowEventType.TERMINATED, exec_results)
self.logger.write_out("out", process.stdout)
self.logger.write_out("err", process.stderr)
class Dag:
"""
Class representing a faked HTCondor dag
TODO: Implement
"""
@staticmethod
def parse(file):
dag = Dag()
......
......@@ -13,8 +13,8 @@ from .condorlite import Job
def execute_dag(ns: ap.Namespace):
"""
Execute the DAG in the given file.
:param file:
:return:
:param ns: Namespace containing command-line argument input
"""
for file in ns.file:
dag = Dag.parse(file)
......@@ -24,8 +24,8 @@ def execute_dag(ns: ap.Namespace):
def execute_job(ns: ap.Namespace):
"""
Execute the job in the given file.
:param file:
:return:
:param ns: Namespace containing command-line argument input
"""
for file in ns.file:
job = Job(file)
......@@ -33,6 +33,12 @@ def execute_job(ns: ap.Namespace):
def spy(ns: ap.Namespace):
"""
Listen for workflow events and print any found to stdout
:param ns: Namespace containing command-line argument input
:return:
"""
workflow_events.listen(lambda event: print(event))
......
parsing this file will fail
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment