Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
"""
Tests for aat_wrest.observation_wrester
"""
import argparse
from enum import Enum
from unittest.mock import patch, MagicMock
# pylint: disable=E0401, E0402
import json
import pytest
from aat_wrest.observation_wrester import WrestObservationMetadata
class Keys(Enum):
"""The things we're reporting"""
PROJECT = "projectCode"
OBS_TIME = "timeObserved"
QUEUE_TIME = "timeInQueue"
PROJECT = Keys.PROJECT.value
OBS_TIME = Keys.OBS_TIME.value
QUEUE_TIME = Keys.QUEUE_TIME.value
_17A_109_SPL = "uid://evla/execblock/5c71ade0-d035-4fd5-a36f-0389e34db0e5"
_17A_109_ARGS = argparse.Namespace(spl=_17A_109_SPL)
_17A_109_EXPECTED = {
PROJECT: "17A-109",
OBS_TIME: "0 days, 0 hours, 4 minutes, 34 seconds",
QUEUE_TIME: "1125 days, 11 hours, 43 minutes, 58 seconds",
}
args = argparse.Namespace(observation="17B-197.sb34663512.eb34806505.58108.78427738426")
result_obs = (
'{"spl": "uid://evla/execblock/8fbfb54b-d141-42fe-b079-609339a69cba", '
'"bands": "Ka", '
'"array_config": "D", '
'"obs_start_time": "1234567", '
'"obs_end_time": "123598898" }'
)
def mock_wrester(args: argparse.Namespace) -> WrestObservationMetadata:
"""
Pretend to get the information from the MDDB.
:param args:
:return:
"""
with patch("psycopg2.connect") as conn:
return WrestObservationMetadata(conn, args.observation)
wrester = mock_wrester(args)
class TestWrestObservationInfo:
@patch("json.dumps", MagicMock(return_value=result_obs))
def test_wrest_observation_info(self):
wrester.conn.cursor.return_value.fetchall.return_value = [
"uid://evla/execblock/8fbfb54b-d141-42fe-b079-609339a69cba",
"Ka",
"D",
"1234567",
"123598898",
]
assert args.observation == "17B-197.sb34663512.eb34806505.58108.78427738426"
metadata = wrester.wrest_observation_info()
assert (
metadata == '{"spl": "uid://evla/execblock/8fbfb54b-d141-42fe-b079-609339a69cba", '
'"bands": "Ka", '
'"array_config": "D", '
'"obs_start_time": "1234567", '
'"obs_end_time": "123598898" }'
)
@pytest.mark.skip("Dates are broken. Method superseded by wrest_observation_info")
@patch("json.dumps", MagicMock(return_value=_17A_109_EXPECTED))
def test_gets_expected_observation_info(self):
"""
Does ObservationWrester wrest the expected data from the MDDB?
:return:
"""
wrester = mock_wrester(_17A_109_ARGS)
assert wrester.spl == _17A_109_SPL
actual = wrester.wrest_observation_time_info()
assert actual == _17A_109_EXPECTED
@pytest.mark.skip("... Dates are broken... Method superseded by wrest_observation_info")
def test_handles_evla_exec_block(self):
"""
Confirm we get what we expect when observing time is minimal.
:return:
"""
spl = "uid://evla/execblock/91c685b6-4527-44b1-9f91-3904e1125817"
args = argparse.Namespace(spl=spl)
expected = {
PROJECT: "19A-440",
OBS_TIME: "0 days, 0 hours, 0 minutes, 4 seconds",
QUEUE_TIME: "627 days, 22 hours, 43 minutes, 6 seconds",
}
with patch("json.dumps", MagicMock(return_value=expected)):
wrester = mock_wrester(args)
actual = wrester.wrest_observation_time_info()
assert actual[PROJECT] == expected[PROJECT]
obs_parts = actual[OBS_TIME].split(", ")
seconds_part = obs_parts[3].split(" ")
assert int(seconds_part[0]) == 4
""" WS-351: As a DA I want to see a list of capability requests per capability / WS-368 """
import argparse
import json
import logging
import os
import sys
from enum import Enum
from unittest.mock import patch, MagicMock
# pylint: disable=E0401, E0402
import pytest
from aat_wrest.observation_wrester import ObservationWrester
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
class Keys(Enum):
"""The things we're reporting"""
PROJECT = "projectCode"
OBS_TIME = "timeObserved"
QUEUE_TIME = "timeInQueue"
PROJECT = Keys.PROJECT.value
OBS_TIME = Keys.OBS_TIME.value
QUEUE_TIME = Keys.QUEUE_TIME.value
_17A_109_SPL = "uid://evla/execblock/5c71ade0-d035-4fd5-a36f-0389e34db0e5"
_17A_109_ARGS = argparse.Namespace(spl=_17A_109_SPL)
_17A_109_EXPECTED = {
PROJECT: "17A-109",
OBS_TIME: "0 days, 0 hours, 4 minutes, 34 seconds",
QUEUE_TIME: "1125 days, 11 hours, 43 minutes, 58 seconds",
}
def mock_wrester(args: argparse.Namespace) -> ObservationWrester:
"""
Pretend to get the information from the MDDB.
:param args:
:return:
"""
with patch("psycopg2.connect") as conn:
return ObservationWrester(conn, args.spl)
@pytest.mark.skip("... Dates are broken...")
@patch("json.dumps", MagicMock(return_value=_17A_109_EXPECTED))
def test_gets_expected_observation_info():
"""
Does ObservationWrester wrest the expected data from the MDDB?
:return:
"""
wrester = mock_wrester(_17A_109_ARGS)
assert wrester.spl == _17A_109_SPL
actual = wrester.wrest_observation_info()
assert actual == _17A_109_EXPECTED
@pytest.mark.skip("actually hits the metadata database")
def test_really_pulls_observation_info():
"""
Does ObservationWrester wrest the expected data from the MDDB 4 realz?
We're always going to skip this test, but let's keep it here
for degubbing porpoises.
:return:
"""
wrester = mock_wrester(_17A_109_ARGS)
json_str = ObservationWrester(wrester.conn, _17A_109_SPL).wrest_observation_info()
obsv_dict = json.loads(json_str)
assert obsv_dict[PROJECT] == _17A_109_EXPECTED[PROJECT]
time_obsvd = obsv_dict[OBS_TIME]
assert time_obsvd == "0 days, 0 hours, 4 minutes, 34 seconds"
time_in_queue = obsv_dict[QUEUE_TIME]
parts = time_in_queue.split(", ")
days_part = parts[0].split(" ")
assert int(days_part[0]) > 1124
@pytest.mark.skip("... Dates are broken...")
def test_handles_evla_exec_block():
"""
Confirm we get what we expect when observing time is minimal.
:return:
"""
spl = "uid://evla/execblock/91c685b6-4527-44b1-9f91-3904e1125817"
args = argparse.Namespace(spl=spl)
expected = {
PROJECT: "19A-440",
OBS_TIME: "0 days, 0 hours, 0 minutes, 4 seconds",
QUEUE_TIME: "627 days, 22 hours, 43 minutes, 6 seconds",
}
with patch("json.dumps", MagicMock(return_value=expected)):
wrester = mock_wrester(args)
actual = wrester.wrest_observation_info()
assert actual[PROJECT] == expected[PROJECT]
obs_parts = actual[OBS_TIME].split(", ")
seconds_part = obs_parts[3].split(" ")
assert int(seconds_part[0]) == 4
@pytest.mark.skip("... VLBA isn't calibratable...")
def test_handles_vlba_observation():
"""
Confirm we get what we expect when observing time is at high end.
:return:
"""
spl = "uid://evla/execblock/9fbb86eb-ba14-42e7-8fd7-5c45aff36725"
expected = {
PROJECT: "BM468",
OBS_TIME: "0 days, 11 hours, 59 minutes, 57 seconds",
QUEUE_TIME: "933 days, 11 hours, 48 minutes, 12 seconds",
}
with patch("json.dumps", MagicMock(return_value=expected)):
wrester = mock_wrester(argparse.Namespace(spl=spl))
actual = wrester.wrest_observation_info()
assert actual[PROJECT] == expected[PROJECT]
obs_parts = actual[OBS_TIME].split(", ")
hours_part = obs_parts[1].split(" ")
assert int(hours_part[0]) == 11
@pytest.mark.skip("... ALMA isn't calibratable...")
def test_handles_alma():
"""
Can we report a pending ALMA observation?
:return:
"""
spl = "uid://alma/execblock/a4154db4-a834-4020-896a-6fd4613bf986"
expected = {
PROJECT: "2019.1.01635.S",
OBS_TIME: "0 days, 1 hours, 57 minutes, 4 seconds",
QUEUE_TIME: "517 days, 2 hours, 5 minutes, 15 seconds",
}
with patch("json.dumps", MagicMock(return_value=expected)):
wrester = mock_wrester(argparse.Namespace(spl=spl))
actual = wrester.wrest_observation_info()
assert actual[PROJECT] == expected[PROJECT]
time_parts = actual[QUEUE_TIME].split(", ")
days_part = time_parts[0].split(" ")
assert int(days_part[0]) >= 517
"""add restore capabilities
Revision ID: 7200d0d19938
Revises: a7c2b4682aae
Create Date: 2021-07-13 09:51:45.729067
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "7200d0d19938"
down_revision = "a7c2b4682aae"
branch_labels = None
depends_on = None
condor_content = """executable = restore_cms.sh
arguments = {{product_locator}} {{cal_product_locator}} {{request_id}} metadata.json PPR.xml
output = restore.out
error = restore.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/deliver, ./PPR.xml, ./metadata.json
transfer_output_files = working, rawdata, products
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
script_content = """#!/bin/sh
export HOME=$TMPDIR
set -o errexit
./framework.sh -d .
chmod 770 .
cd rawdata/
../productfetcher --product-locator $1 $2
cd ../
./casa_envoy --restore -c $4 $5
cp ./working/{{sdmId}}.ms ./products/
./deliver -p ./products --prefix $3
"""
metadata_content = """{
"fileSetIds": ["{{sdmId}}", "{{calSdmId}}"]
"workflowName": "std_calibration",
"systemId": "{{request_id}}",
"creationTime": "{{created_at}}",
"productLocator": "{{product_locator}}",
"calProductLocator": "{{cal_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}"
}
"""
ppr_content = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ns2:SciPipeRequest xmlns:ns2="Common/pipelinescience/SciPipeRequest">
<ProjectSummary>
<ProposalCode>VLA/null</ProposalCode>
<Observatory>NRAO</Observatory>
<Telescope>VLA</Telescope>
<ProcessingSite>Socorro</ProcessingSite>
<Operator>vlapipe</Operator>
<Mode>SCIENCE</Mode>
<Version>NGRH-ALMA-10_8</Version>
<CreationTime>{{created_at}}</CreationTime>
</ProjectSummary>
<ProjectStructure>TBD</ProjectStructure>
<ProcessingRequests>
<RootDirectory>{{root_directory}}</RootDirectory>
<ProcessingRequest>
<ProcessingIntents>
<Intents>
<Keyword>VLA_INTERFEROMETRY_STANDARD_OBSERVING_MODE</Keyword>
<Value>Undefined</Value>
</Intents>
</ProcessingIntents>
<ProcessingProcedure>
<ProcedureTitle>Workspaces Restore</ProcedureTitle>
<ProcessingCommand>
<Command>hifv_restoredata</Command>
<ParameterSet/>
</ProcessingCommand>
<ProcessingCommand>
<Command>hifv_statwt</Command>
<ParameterSet/>
</ProcessingCommand>
</ProcessingProcedure>
<DataSet>
<RelativePath>{{relative_path}}</RelativePath>
<SdmIdentifier>{{sdmId}}</SdmIdentifier>
<DataType>asdm</DataType>
</DataSet>
</ProcessingRequest>0
</ProcessingRequests>
<ResultsProcessing>
<ArchiveResults>false</ArchiveResults>
<CleanUpDisk>false</CleanUpDisk>
<UpdateProjectLifeCycle>false</UpdateProjectLifeCycle>
<NotifyOperatorWhenDone>false</NotifyOperatorWhenDone>
<SDMall>false</SDMall>
<SDMonly>false</SDMonly>
<PipelineOperatorAddress>Unknown</PipelineOperatorAddress>
</ResultsProcessing>
</ns2:SciPipeRequest>
"""
def upgrade():
restore_steps = """prepare-and-run-workflow restore_cms
await-workflow
await-qa
"""
op.execute(
f"""
INSERT INTO capabilities (capability_name, capability_steps, max_jobs)
VALUES ('restore_cms', '{restore_steps}', 20)
"""
)
op.execute(
"""
INSERT INTO workflows (workflow_name) VALUES ('restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore_cms.condor', E'{condor_content}', 'restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore_cms.sh', E'{script_content}', 'restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('metadata.json', E'{metadata_content}', 'restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('PPR.xml', E'{ppr_content}', 'restore_cms')
"""
)
def downgrade():
op.execute(
"""
DELETE FROM capabilities WHERE capability_name = 'restore_cms'
"""
)
op.execute(
"""
DELETE FROM workflows WHERE workflow_name = 'restore_cms'
"""
)
op.execute(
"""
DELETE FROM workflow_templates WHERE workflow_name = 'restore_cms'
"""
)
"""empty message
Revision ID: a70e3e5d5bf3
Revises: 0bca0d8b3564, 7200d0d19938
Create Date: 2021-07-15 10:22:22.651390
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a70e3e5d5bf3'
down_revision = ('0bca0d8b3564', '7200d0d19938')
branch_labels = None
depends_on = None
def upgrade():
pass
def downgrade():
pass
from typing import Dict
from unittest.mock import patch
import json
import pytest
from workspaces.products.services.archive_service import ArchiveService
@pytest.fixture(scope="module")
def fake_metadata() -> Dict[str, str]:
def fake_metadata() -> json:
return {
"spl": "uid://evla/execblock/8fbfb54b-d141-42fe-b079-609339a69cba",
"bands": "Ka",
......@@ -46,7 +47,7 @@ class TestArchiveService:
expected_spl = "uid://evla/execblock/8fbfb54b-d141-42fe-b079-609339a69cba"
with patch(
"workspaces.products.services.archive_service.WrestWorkflowMetadata.wrest_obs_metadata_from_fileset_id",
"workspaces.products.services.archive_service.WrestObservationMetadata.wrest_observation_info",
) as mock_wrest_obs_metadata:
mock_wrest_obs_metadata.return_value = fake_metadata
with patch("workspaces.products.services.archive_service.Router"):
......
......@@ -2,7 +2,8 @@ import logging
from typing import Dict
import requests
from aat_wrest.metadata_wrester import WrestWorkflowMetadata
from aat_wrest.observation_wrester import WrestObservationMetadata
from aat_wrest.utilities import MDDBConnector
from messaging.router import Router, on_message
from pycapo import CapoConfig
......@@ -36,12 +37,12 @@ class ArchiveService(ArchiveServiceIF):
logger.info(f"ingestion-complete event: {logdata}")
execblock_id = logdata["execblock_id"]
fileset_id = logdata["fileset_id"]
sdm_id = logdata["fileset_id"]
project_code = logdata["project_code"]
wrester = WrestWorkflowMetadata(MDDBConnector(), spl=None, fileset_id=fileset_id)
metadata = wrester.wrest_obs_metadata_from_fileset_id(fileset_id)
metadata["fileset_id"] = fileset_id
wrester = WrestObservationMetadata(MDDBConnector(), sdm_id=sdm_id)
metadata = wrester.wrest_observation_info()
metadata["sdm_id"] = sdm_id
spl = metadata["spl"]
del metadata["spl"]
......@@ -53,7 +54,7 @@ class ArchiveService(ArchiveServiceIF):
type(response),
response.content,
)
return spl, execblock_id, fileset_id, project_code
return spl, execblock_id, sdm_id, project_code
@staticmethod
def _create_std_calibration_request(spl: str, metadata: Dict[str, str]) -> Response:
......@@ -62,7 +63,7 @@ class ArchiveService(ArchiveServiceIF):
:param spl: Science product locator of observation
:param metadata: Dict with the following information inside:
- fileset_id: File Set ID
- sdm_id: SDM ID (sometimes called fileset id)
- bands: Radio bands for observation
- array_config: Configuration of array telescopes
- obs_start_time: Start time of observation
......
......@@ -235,6 +235,9 @@ class WorkflowService(WorkflowServiceIF):
if "calibration" in name:
wrest_type = "-sc"
argument = wf_request.argument["product_locator"]
elif "restore" in name:
wrest_type = "-r"
argument = [wf_request.argument["product_locator"], wf_request.argument["cal_locator"]]
elif "imaging" in name:
wrest_type = "-si"
argument = wf_request.argument["sdmId"]
......