Skip to content
Snippets Groups Projects
Commit 8a4f1b84 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

imaging template rendering

parent b6f2414e
No related branches found
No related tags found
1 merge request!319imaging template rendering
Pipeline #2078 failed
......@@ -29,14 +29,21 @@ def arg_parser() -> argparse.ArgumentParser:
nargs=1,
action="store",
required=False,
help="deliver standard type capability directories to analyst QA area",
help="deliver standard calibration directories to analyst QA area",
)
parser.add_argument(
"--retrieve",
nargs=1,
action="store",
required=False,
help="return standard type capability directories to original parent directory",
help="return standard calibration directories to original parent directory",
)
parser.add_argument(
"--deliver-image",
nargs=1,
action="store",
required=False,
help="deliver standard image results to analyst image QA area",
)
return parser
......@@ -48,6 +55,9 @@ def _get_settings(filename: str) -> Dict[str, str]:
weblog_cache_area = (
CapoConfig().settings("edu.nrao.archive.workspaces.DeliverySettings").cacheWeblogDirectory
)
image_qa_area = (
CapoConfig().settings("edu.nrao.archive.workspaces.DeliverySettings").standardImageDelivery
)
workspaces_lustre_root_dir = (
CapoConfig().settings("edu.nrao.archive.workspaces.ProcessingSettings").rootDirectory
)
......@@ -60,6 +70,7 @@ def _get_settings(filename: str) -> Dict[str, str]:
return {
"qa_delivery_area": delivery_area,
"weblog_cache_area": weblog_cache_area,
"image_qa_area": image_qa_area,
"workspaces_lustre_root_dir": workspaces_lustre_root_dir,
"current_root_directory": current_root_directory,
"current_subdirectory": current_subdirectory,
......@@ -74,13 +85,17 @@ def main():
conveyor = None
if args.deliver is not None:
action = "Delivery"
action = "Calibration Delivery"
settings = _get_settings(args.deliver[0])
conveyor = DeliveryConveyor(settings)
elif args.retrieve is not None:
action = "Retrieval"
action = "Calibration Retrieval"
settings = _get_settings(args.retrieve[0])
conveyor = RetrievalConveyor(settings)
elif args.deliver_image is not None:
action = "Image Delivery"
settings = _get_settings(args.deliver_image[0])
conveyor = DeliveryConveyor(settings, action)
conveyor.convey()
logger.info(f"Standard Calibration {action} is complete!")
logger.info(f"Standard {action} is complete!")
......@@ -22,9 +22,15 @@ class WrestWorkflowMetadata:
Class for extracting metadata required for workflow execution
"""
def __init__(self, connection: MDDBConnector, spl: str, fileset_id: str = None):
def __init__(
self, connection: MDDBConnector, spl: str = None, fileset_id: str = None, sdm_id: str = None
):
self.logger = logging.getLogger("aat_wrest")
self.conn = connection
if fileset_id is not None:
self.fileset_id = fileset_id
if sdm_id is not None:
self.sdm_id = sdm_id
if not spl and fileset_id:
self.spl = self.wrest_obs_metadata_from_fileset_id(fileset_id)["spl"]
else:
......@@ -77,6 +83,53 @@ class WrestWorkflowMetadata:
self.conn.close()
return make_json
def wrest_standard_image_info(self) -> json:
"""
Given an execution block science product locator, returns the required metadata to run
the standard calibration workflow
:return:
"""
sql = f"""
SELECT science_product_locator as spl,
e.project_code as projectCode,
p.title as title,
e.starttime as startTime,
(a.firstname || ' ' || a.lastname) as observer,
telescope as telescope
FROM execution_blocks e
JOIN projects p on e.project_code = p.project_code
JOIN authors a on p.project_code = a.project_code
WHERE ngas_fileset_id = %(sdmId)s AND a.is_pi = true
"""
make_json = {}
try:
cursor = self.conn.cursor()
cursor.execute(sql, {"sdmId": self.sdm_id})
data = cursor.fetchall()
if data:
make_json = json.dumps(
{
"product_locator": data[0][0],
"projectCode": data[0][1],
"title": data[0][2],
"startTime": data[0][3],
"observer": data[0][4],
"telescope": data[0][5],
"created_at": str(
pendulum.now().in_timezone(TIME_ZONE).format(PENDULUM_FORMAT)
),
}
)
else:
self.logger.error(
f"ERROR: aat-wrest query returned no results!"
f" The database appears to be missing information for sdm id: {self.sdm_id}!"
)
finally:
self.conn.close()
return make_json
def wrest_obs_metadata_from_fileset_id(self, fileset_id: str) -> str:
"""
Given a fileset_id, query the Metadata DB and return the corresponding science_product_locator
......@@ -121,6 +174,14 @@ def parser() -> argparse.ArgumentParser:
required=False,
help="Find workflow metadata for standard calibrations with provided product locator",
)
arg_parser.add_argument(
"-si",
"--stdimg",
nargs=1,
action="store",
required=False,
help="Find workflow metadata for standard CMS imaging with provided SDM id",
)
arg_parser.add_argument(
"-obs",
"--observation",
......@@ -134,9 +195,11 @@ def parser() -> argparse.ArgumentParser:
def determine_wrester(connection: MDDBConnector, args: argparse.Namespace):
if args.stdcals:
data = WrestWorkflowMetadata(connection, args.stdcals[0]).wrest_standard_cal_info()
data = WrestWorkflowMetadata(connection, spl=args.stdcals[0]).wrest_standard_cal_info()
if args.stdimg:
data = WrestWorkflowMetadata(connection, sdm_id=args.stdimg[0]).wrest_standard_image_info()
if args.observation:
data = ObservationWrester(connection, args.observation[0]).wrest_observation_info()
data = ObservationWrester(connection, spl=args.observation[0]).wrest_observation_info()
print(data)
......
......@@ -17,18 +17,30 @@ logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
args = argparse.Namespace(stdcals="uid://evla/calibration/143a3ef9-21fb-46d8-8312-91de54b0cc49")
result = (
args_cal = argparse.Namespace(
stdcals="uid://evla/execblock/125dac14-c53b-4489-9c43-7d6e2739ec32", stdimg=None
)
result_cal = (
'{"sdmId": "17B-197.sb34663512.eb34806505.58108.78427738426", "projectCode": "17B-197", '
'"title": "Radio-Active CoWS: Extended FIRST Sources in MaDCoWS Clusters", '
'"startTime": 58108.7842916667, "observer": "Anthony Gonzalez", "telescope": "EVLA",'
' "created_at": ' + str(pendulum.now().in_timezone("UTC").format("YYYY-MM-DDTHH:mm:ss")) + "}"
' "created_at": 2021-01-01T00:00:00}'
)
args_image = argparse.Namespace(stdcals=None, stdimg="brain_000.58099.67095825232")
result_img = (
'{"product_locator": "uid://evla/execblock/ec082e65-452d-4fec-ad88-f5b4af1f9e36",'
' "projectCode": "Operations", "title": "", "startTime": 58099.6710792824, '
'"observer": "VLA Operations", "telescope": "EVLA", "created_at": "2021-06-30T20:18:21"}'
)
def mock_wrester(args: argparse.Namespace) -> WrestWorkflowMetadata:
with patch("psycopg2.connect") as mock_connect:
return WrestWorkflowMetadata(connection=mock_connect, spl=args.stdcals)
if args.stdcals is not None:
return WrestWorkflowMetadata(connection=mock_connect, spl=args.stdcals)
if args.stdimg is not None:
return WrestWorkflowMetadata(connection=mock_connect, sdm_id=args.stdimg)
@pytest.fixture
......@@ -44,7 +56,8 @@ def fake_obs_metadata() -> List[Tuple[str]]:
]
wrester = mock_wrester(args)
cal_wrester = mock_wrester(args_cal)
img_wrester = mock_wrester(args_image)
class TestAatWrest:
......@@ -59,9 +72,9 @@ class TestAatWrest:
)
assert wrester_no_spl.spl == "uid://evla/execblock/8fbfb54b-d141-42fe-b079-609339a69cba"
@patch("json.dumps", MagicMock(return_value=result))
@patch("json.dumps", MagicMock(return_value=result_cal))
def test_wrest_standard_cal_info(self):
wrester.conn.cursor.return_value.fetchall.return_value = [
cal_wrester.conn.cursor.return_value.fetchall.return_value = [
"17B-197.sb34663512.eb34806505.58108.78427738426",
"17B-197",
"Radio-Active CoWS: Extended FIRST Sources in MaDCoWS Clusters",
......@@ -70,23 +83,41 @@ class TestAatWrest:
"EVLA",
str(pendulum.now().in_timezone("UTC").format("YYYY-MM-DDTHH:mm:ss")),
]
assert args.stdcals == "uid://evla/calibration/143a3ef9-21fb-46d8-8312-91de54b0cc49"
value = wrester.wrest_standard_cal_info()
assert args_cal.stdcals == "uid://evla/execblock/125dac14-c53b-4489-9c43-7d6e2739ec32"
value = cal_wrester.wrest_standard_cal_info()
assert (
value
== '{"sdmId": "17B-197.sb34663512.eb34806505.58108.78427738426", "projectCode": "17B-197", '
'"title": "Radio-Active CoWS: Extended FIRST Sources in MaDCoWS Clusters", '
'"startTime": 58108.7842916667, "observer": "Anthony Gonzalez", "telescope": "EVLA",'
' "created_at": '
+ str(pendulum.now().in_timezone("UTC").format("YYYY-MM-DDTHH:mm:ss"))
+ "}"
' "created_at": 2021-01-01T00:00:00}'
)
@patch("json.dumps", MagicMock(return_value=result_img))
def test_wrest_standard_image_info(self):
img_wrester.conn.cursor.return_value.fetchall.return_value = [
"uid://evla/execblock/ec082e65-452d-4fec-ad88-f5b4af1f9e36",
"Operations",
"",
58099.6710792824,
"VLA Operations",
"EVLA",
"2021-06-30T20:18:21",
]
assert args_image.stdimg == "brain_000.58099.67095825232"
value = img_wrester.wrest_standard_image_info()
assert (
value
== '{"product_locator": "uid://evla/execblock/ec082e65-452d-4fec-ad88-f5b4af1f9e36",'
' "projectCode": "Operations", "title": "", "startTime": 58099.6710792824, '
'"observer": "VLA Operations", "telescope": "EVLA", "created_at": "2021-06-30T20:18:21"}'
)
def test_wrest_obs_metadata_from_fileset_id(self, fake_obs_metadata: List[Tuple[str]]):
wrester.conn.cursor.return_value.__enter__.return_value.fetchall.return_value = (
cal_wrester.conn.cursor.return_value.__enter__.return_value.fetchall.return_value = (
fake_obs_metadata
)
metadata = wrester.wrest_obs_metadata_from_fileset_id(
metadata = cal_wrester.wrest_obs_metadata_from_fileset_id(
"17B-197.sb34663512.eb34806505.58108.78427738426"
)
......
"""correcting std cms imaging templates
Revision ID: 6231bb30d95d
Revises: a832fcb74a8c
Create Date: 2021-06-30 11:04:35.628129
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "6231bb30d95d"
down_revision = "a832fcb74a8c"
branch_labels = None
depends_on = None
condor_content = """executable = std_cms_imaging.sh
arguments = metadata.json PPR.xml
output = imaging.out
error = imaging.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
CMS_PATH = {{cms_path}}
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/conveyor, ./PPR.xml, ./metadata.json, $(CMS_PATH)/{{sdmId}}.ms
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
script_content = """#!/bin/sh
export HOME=$TMPDIR
./framework.sh -d .
chmod 770 .
mv {{sdmId}}.ms ./rawdata
./casa_envoy --standard-cmsimage $1 $2
./conveyor --deliver-image $1
"""
metadata_content = """{
"fileSetIds": "{{sdmId}}",
"workflowName": "std_cms_imaging",
"systemId": "{{request_id}}",
"productLocator": "{{product_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}",
"calibrationSourceDirectory":"{{cms_path}}",
"cmsName":"{{sdmId}}.ms"
}
"""
def upgrade():
op.execute(
f"""
UPDATE capabilities
SET capability_steps='prepare-and-run-workflow std_cms_imaging\nawait-workflow\nawait-qa'
WHERE capability_name='std_cms_imaging'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{condor_content}' WHERE filename='std_cms_imaging.condor'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{script_content}' WHERE filename='std_cms_imaging.sh'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{metadata_content}' WHERE filename='metadata.json' AND workflow_name='std_cms_imaging'
"""
)
def downgrade():
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_condor_content}' WHERE filename='std_cms_imaging.condor'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_script_content}' WHERE filename='std_cms_imaging.sh'
"""
)
op.execute(
f"""
UPDATE workflow_templates
SET content=E'{old_metadata_content}' WHERE filename='metadata.json' AND workflow_name='std_cms_imaging'
"""
)
old_condor_content = """executable = std_cms_imaging.sh
arguments = metadata.json PPR.xml
output = imaging.out
error = imaging.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
CMS_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/spool/{{relative_path}}/working
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher,
$(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/conveyor, ./PPR.xml, ./metadata.json, $(CMS_PATH)/{{cms_name}}
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
old_script_content = """#!/bin/sh
export HOME=$TMPDIR
./framework.sh -d .
chmod 770 .
mv {{cms_name}} ./rawdata
./casa_envoy --standard-cmsimage $1 $2
./conveyor --deliver-image $1
"""
old_metadata_content = """{
"fileSetIds": "{{sdmId}}",
"workflowName": "std_cms_imaging",
"systemId": "{{request_id}}",
"productLocator": "{{product_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}"
"calibrationSourceDirectory":"{{cal_directory}}"
}
"""
......@@ -9,7 +9,7 @@ from unittest.mock import patch
import pytest
from workspaces.workflow.schema import WorkflowRequest, WorkflowRequestFile
from workspaces.workflow.schema import WorkflowRequest, WorkflowRequestFile, Workflow
from workspaces.workflow.services.workflow_info import WorkflowInfo
from workspaces.workflow.services.workflow_service import WorkflowService
......@@ -20,6 +20,29 @@ Tests for WorkflowService
"""
files = [
WorkflowRequestFile(
workflow_request_id=-1,
filename="null.condor",
content=b"executable = null\n"
+ b"arguments = {{arguments}}\n"
+ b"error = null.err\n"
+ b"log = condor.log\n\n\n"
+ b"queue",
),
WorkflowRequestFile(
workflow_request_id=-1,
filename="null.dag",
content=b"JOB null null.condor",
),
WorkflowRequestFile(
workflow_request_id=-1,
filename="null.sh",
content=b"#!/bin/sh\n\n" + b"null $*",
),
]
@pytest.mark.usefixtures("mock_workflow_info", "mock_workflow_service", "test_router")
class TestWorkflowService:
"""Our test class"""
......@@ -67,34 +90,16 @@ class TestWorkflowService:
@patch("pathlib.Path.write_bytes")
@patch("os.chown")
def test_prepare_files_for_condor(self, mock_chown, mock_write, mock_workflow_service: WorkflowService):
def test_prepare_files_for_condor(
self, mock_chown, mock_write, mock_workflow_service: WorkflowService
):
"""
Make sure files are prepared for Condor.
:param mock_workflow_service:
:return:
"""
files = [
WorkflowRequestFile(
workflow_request_id=1,
filename="null.condor",
content=b"executable = null\n"
+ b"arguments = {{arguments}}\n"
+ b"error = null.err\n"
+ b"log = condor.log\n\n\n"
+ b"queue",
),
WorkflowRequestFile(
workflow_request_id=1,
filename="null.dag",
content=b"JOB null null.condor",
),
WorkflowRequestFile(
workflow_request_id=1,
filename="null.sh",
content=b"#!/bin/sh\n\n" + b"null $*",
),
]
with patch("os.mkdir") as make_dir:
temp = mock_workflow_service._make_temp_directory()
path = mock_workflow_service._prepare_files_for_condor(files, temp)
......@@ -131,3 +136,37 @@ class TestWorkflowService:
mock_workflow_service.on_workflow_event(**message)
mock_update.assert_called_once()
assert request.state == "Complete"
@patch("workspaces.workflow.schema.Workflow.render_templates")
@patch("messaging.router.Router.send_message")
@patch("json.loads")
@patch("subprocess.Popen")
def test_render_with_metadata(
self,
mock_subprocess,
mock_loads,
mock_router,
mock_render,
mock_workflow_service: WorkflowService,
mock_workflow_requests: List[WorkflowRequest],
):
definition = Workflow(workflow_name="std_calibration")
request = mock_workflow_requests[1]
tempdir = pathlib.Path("/lustre/aoc/cluster/pipeline/docker/workspaces/spool/tmp1234abcd")
mock_workflow_service._render_with_metadata(request, tempdir, definition)
assert mock_subprocess.call_count == 1
assert mock_loads.call_count == 1
assert mock_router.call_count == 0
assert mock_render.call_count == 1
@patch("workspaces.workflow.services.workflow_info.WorkflowInfo.save_file")
def test_determine_usable_files(
self,
mock_save,
mock_workflow_requests: List[WorkflowRequest],
mock_workflow_service: WorkflowService,
):
request = mock_workflow_requests[0]
usable = mock_workflow_service._determine_usable_files(request, files)
assert len(usable) == 3
......@@ -174,7 +174,9 @@ class CapabilityService(CapabilityServiceIF):
logger.info(f"RECEIVED CAPABILITY MESSAGE: {message}")
subject = message["subject"]
execution = self.capability_info.lookup_execution_by_workflow_request_id(subject["workflow_request_id"])
execution = self.capability_info.lookup_execution_by_workflow_request_id(
subject["workflow_request_id"]
)
self.notify.notify_qa_ready(execution.capability_request)
# sending step complete notification until QA is properly implemented
......
......@@ -172,62 +172,12 @@ class WorkflowService(WorkflowServiceIF):
# render all the templates
if request.argument["need_project_metadata"] is True:
with subprocess.Popen(
["aat_wrest", "-sc", request.argument["product_locator"]],
stdout=subprocess.PIPE,
) as wf_metadata:
wf_json = wf_metadata.communicate()[0]
if "ERROR" not in wf_json.decode():
merged_args = {
**json.loads(wf_json),
**request.argument,
"relative_path": str(temp_folder).split("/")[-1],
"root_directory": self.processing_settings.rootDirectory,
"ramInGb": self.processing_settings.ramInGb,
}
templated_files = definition.render_templates(merged_args)
else:
logger.error(wf_json.decode())
logger.info("SENDING WORKFLOW FAIL MESSAGE!")
failed_msg = {
"service": "workflow",
"routing_key": "workflow",
"subject": request,
"type": "workflow-failed",
}
self.message_router.send_message(**failed_msg)
return request
templated_files = self._render_with_metadata(request, temp_folder, definition)
else:
templated_files = definition.render_templates(request.argument)
# 3. Combine the templated with the request files.
# Override templates if user supplied file has same name and is a valid input file
usable_templates = []
usable_files = []
forbidden_templates = self._get_forbidden_templates_list(request.workflow_name)
for template in templated_files:
# check if there is not a newer version supplied, add the template
if template.filename not in request.files:
usable_templates.append(template)
# check if a newer version was supplied and is not in the list of restricted
# access templates
if template.filename in request.files and template.filename in forbidden_templates:
usable_templates.append(template)
logger.info(
f"Cannot append user file {request.files[template.filename]} to "
f"workflow request #{request.workflow_request_id}. "
f"Users are not allowed to override required system templates."
)
# check that supplied files are not in conflict with the sorted templates
for file in request.files:
if file.filename not in usable_templates:
usable_files.append(file)
for file in usable_templates:
self.info.save_file(request=request, filename=file.filename, content=file.content)
workflow_files = usable_templates + usable_files
workflow_files = self._determine_usable_files(request, templated_files)
# 4. prepare files for condor execution
if not request.results_dir:
......@@ -266,6 +216,78 @@ class WorkflowService(WorkflowServiceIF):
logger.info("Settled on temp folder %s", temp_folder)
return temp_folder
def _render_with_metadata(
self, wf_request: WorkflowRequestIF, tempdir: Path, wf_definition: WorkflowIF
):
name = wf_request.workflow_name
if "calibration" in name:
wrest_type = "-sc"
argument = wf_request.argument["product_locator"]
elif "imaging" in name:
wrest_type = "-si"
argument = wf_request.argument["sdmId"]
else:
logger.info(f"No wrester found for workflow {name}. Does it actually require metadata?")
return wf_request
logger.info(f" workflow {name} has wrest option: {wrest_type}")
with subprocess.Popen(
["aat_wrest", wrest_type, argument],
stdout=subprocess.PIPE,
) as wf_metadata:
wf_json = wf_metadata.communicate()[0]
if "ERROR" not in wf_json.decode():
merged_args = {
**json.loads(wf_json),
**wf_request.argument,
"relative_path": str(tempdir).split("/")[-1],
"root_directory": self.processing_settings.rootDirectory,
"ramInGb": self.processing_settings.ramInGb,
}
return wf_definition.render_templates(merged_args)
else:
logger.error(wf_json.decode())
logger.info("SENDING WORKFLOW FAIL MESSAGE!")
failed_msg = {
"service": "workflow",
"routing_key": "workflow",
"subject": wf_request,
"type": "workflow-failed",
}
self.message_router.send_message(**failed_msg)
return wf_request
def _determine_usable_files(
self, request: WorkflowRequestIF, templated_files: List[WorkflowRequestFile]
):
# Override templates if user supplied file has same name and is a valid input file
usable_templates = []
usable_files = []
forbidden_templates = self._get_forbidden_templates_list(request.workflow_name)
for template in templated_files:
# check if there is not a newer version supplied, add the template
if template.filename not in request.files:
usable_templates.append(template)
# check if a newer version was supplied and is not in the list of restricted
# access templates
if template.filename in request.files and template.filename in forbidden_templates:
usable_templates.append(template)
logger.info(
f"Cannot append user file {request.files[template.filename]} to "
f"workflow request #{request.workflow_request_id}. "
f"Users are not allowed to override required system templates."
)
# check that supplied files are not in conflict with the sorted templates
for file in request.files:
if file.filename not in usable_templates:
usable_files.append(file)
for file in usable_templates:
self.info.save_file(request=request, filename=file.filename, content=file.content)
return usable_templates + usable_files
@staticmethod
def _prepare_files_for_condor(files: List[WorkflowRequestFile], temp_folder: Path) -> Path:
"""
......
......@@ -228,6 +228,21 @@ def mock_workflow_requests(
add_data_to_db_mock(mock_workflow_info, r1)
r2 = WorkflowRequest(
workflow_request_id=-2,
workflow_name="std_calibration",
argument={
"product_locator": "uid://evla/execblock/ec082e65-452d-4fec-ad88-f5b4af1f9e36",
"user_email": "chausman@nrao.edu",
},
state="Created",
created_at=datetime(2021, 1, 1),
updated_at=datetime(2021, 1, 1),
)
requests.append(r2)
add_data_to_db_mock(mock_workflow_info, r2)
return requests
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment