Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • ssa/workspaces
1 result
Show changes
"""
Tests for aat_wrest.observation_wrester
"""
import argparse
from enum import Enum
from unittest.mock import patch, MagicMock
# pylint: disable=E0401, E0402
import json
import pytest
from aat_wrest.observation_wrester import WrestObservationMetadata
class Keys(Enum):
"""The things we're reporting"""
PROJECT = "projectCode"
OBS_TIME = "timeObserved"
QUEUE_TIME = "timeInQueue"
PROJECT = Keys.PROJECT.value
OBS_TIME = Keys.OBS_TIME.value
QUEUE_TIME = Keys.QUEUE_TIME.value
_17A_109_SPL = "uid://evla/execblock/5c71ade0-d035-4fd5-a36f-0389e34db0e5"
_17A_109_ARGS = argparse.Namespace(spl=_17A_109_SPL)
_17A_109_EXPECTED = {
PROJECT: "17A-109",
OBS_TIME: "0 days, 0 hours, 4 minutes, 34 seconds",
QUEUE_TIME: "1125 days, 11 hours, 43 minutes, 58 seconds",
}
args = argparse.Namespace(observation="17B-197.sb34663512.eb34806505.58108.78427738426")
result_obs = (
'{"spl": "uid://evla/execblock/8fbfb54b-d141-42fe-b079-609339a69cba", '
'"bands": "Ka", '
'"array_config": "D", '
'"obs_start_time": "1234567", '
'"obs_end_time": "123598898" }'
)
def mock_wrester(args: argparse.Namespace) -> WrestObservationMetadata:
"""
Pretend to get the information from the MDDB.
:param args:
:return:
"""
with patch("psycopg2.connect") as conn:
return WrestObservationMetadata(conn, args.observation)
wrester = mock_wrester(args)
class TestWrestObservationInfo:
@patch("json.dumps", MagicMock(return_value=result_obs))
def test_wrest_observation_info(self):
wrester.conn.cursor.return_value.fetchall.return_value = [
"uid://evla/execblock/8fbfb54b-d141-42fe-b079-609339a69cba",
"Ka",
"D",
"1234567",
"123598898",
]
assert args.observation == "17B-197.sb34663512.eb34806505.58108.78427738426"
metadata = wrester.wrest_observation_info()
assert (
metadata == '{"spl": "uid://evla/execblock/8fbfb54b-d141-42fe-b079-609339a69cba", '
'"bands": "Ka", '
'"array_config": "D", '
'"obs_start_time": "1234567", '
'"obs_end_time": "123598898" }'
)
@pytest.mark.skip("Dates are broken. Method superseded by wrest_observation_info")
@patch("json.dumps", MagicMock(return_value=_17A_109_EXPECTED))
def test_gets_expected_observation_info(self):
"""
Does ObservationWrester wrest the expected data from the MDDB?
:return:
"""
wrester = mock_wrester(_17A_109_ARGS)
assert wrester.spl == _17A_109_SPL
actual = wrester.wrest_observation_time_info()
assert actual == _17A_109_EXPECTED
@pytest.mark.skip("... Dates are broken... Method superseded by wrest_observation_info")
def test_handles_evla_exec_block(self):
"""
Confirm we get what we expect when observing time is minimal.
:return:
"""
spl = "uid://evla/execblock/91c685b6-4527-44b1-9f91-3904e1125817"
args = argparse.Namespace(spl=spl)
expected = {
PROJECT: "19A-440",
OBS_TIME: "0 days, 0 hours, 0 minutes, 4 seconds",
QUEUE_TIME: "627 days, 22 hours, 43 minutes, 6 seconds",
}
with patch("json.dumps", MagicMock(return_value=expected)):
wrester = mock_wrester(args)
actual = wrester.wrest_observation_time_info()
assert actual[PROJECT] == expected[PROJECT]
obs_parts = actual[OBS_TIME].split(", ")
seconds_part = obs_parts[3].split(" ")
assert int(seconds_part[0]) == 4
""" WS-351: As a DA I want to see a list of capability requests per capability / WS-368 """
import argparse
import json
import logging
import os
import sys
from enum import Enum
from unittest.mock import patch, MagicMock
# pylint: disable=E0401, E0402
import pytest
from aat_wrest.observation_wrester import ObservationWrester
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler(sys.stdout))
class Keys(Enum):
"""The things we're reporting"""
PROJECT = "projectCode"
OBS_TIME = "timeObserved"
QUEUE_TIME = "timeInQueue"
PROJECT = Keys.PROJECT.value
OBS_TIME = Keys.OBS_TIME.value
QUEUE_TIME = Keys.QUEUE_TIME.value
_17A_109_SPL = "uid://evla/execblock/5c71ade0-d035-4fd5-a36f-0389e34db0e5"
_17A_109_ARGS = argparse.Namespace(spl=_17A_109_SPL)
_17A_109_EXPECTED = {
PROJECT: "17A-109",
OBS_TIME: "0 days, 0 hours, 4 minutes, 34 seconds",
QUEUE_TIME: "1125 days, 11 hours, 43 minutes, 58 seconds",
}
def mock_wrester(args: argparse.Namespace) -> ObservationWrester:
"""
Pretend to get the information from the MDDB.
:param args:
:return:
"""
with patch("psycopg2.connect") as conn:
return ObservationWrester(conn, args.spl)
@pytest.mark.skip("... Dates are broken...")
@patch("json.dumps", MagicMock(return_value=_17A_109_EXPECTED))
def test_gets_expected_observation_info():
"""
Does ObservationWrester wrest the expected data from the MDDB?
:return:
"""
wrester = mock_wrester(_17A_109_ARGS)
assert wrester.spl == _17A_109_SPL
actual = wrester.wrest_observation_info()
assert actual == _17A_109_EXPECTED
@pytest.mark.skip("actually hits the metadata database")
def test_really_pulls_observation_info():
"""
Does ObservationWrester wrest the expected data from the MDDB 4 realz?
We're always going to skip this test, but let's keep it here
for degubbing porpoises.
:return:
"""
wrester = mock_wrester(_17A_109_ARGS)
json_str = ObservationWrester(wrester.conn, _17A_109_SPL).wrest_observation_info()
obsv_dict = json.loads(json_str)
assert obsv_dict[PROJECT] == _17A_109_EXPECTED[PROJECT]
time_obsvd = obsv_dict[OBS_TIME]
assert time_obsvd == "0 days, 0 hours, 4 minutes, 34 seconds"
time_in_queue = obsv_dict[QUEUE_TIME]
parts = time_in_queue.split(", ")
days_part = parts[0].split(" ")
assert int(days_part[0]) > 1124
@pytest.mark.skip("... Dates are broken...")
def test_handles_evla_exec_block():
"""
Confirm we get what we expect when observing time is minimal.
:return:
"""
spl = "uid://evla/execblock/91c685b6-4527-44b1-9f91-3904e1125817"
args = argparse.Namespace(spl=spl)
expected = {
PROJECT: "19A-440",
OBS_TIME: "0 days, 0 hours, 0 minutes, 4 seconds",
QUEUE_TIME: "627 days, 22 hours, 43 minutes, 6 seconds",
}
with patch("json.dumps", MagicMock(return_value=expected)):
wrester = mock_wrester(args)
actual = wrester.wrest_observation_info()
assert actual[PROJECT] == expected[PROJECT]
obs_parts = actual[OBS_TIME].split(", ")
seconds_part = obs_parts[3].split(" ")
assert int(seconds_part[0]) == 4
@pytest.mark.skip("... VLBA isn't calibratable...")
def test_handles_vlba_observation():
"""
Confirm we get what we expect when observing time is at high end.
:return:
"""
spl = "uid://evla/execblock/9fbb86eb-ba14-42e7-8fd7-5c45aff36725"
expected = {
PROJECT: "BM468",
OBS_TIME: "0 days, 11 hours, 59 minutes, 57 seconds",
QUEUE_TIME: "933 days, 11 hours, 48 minutes, 12 seconds",
}
with patch("json.dumps", MagicMock(return_value=expected)):
wrester = mock_wrester(argparse.Namespace(spl=spl))
actual = wrester.wrest_observation_info()
assert actual[PROJECT] == expected[PROJECT]
obs_parts = actual[OBS_TIME].split(", ")
hours_part = obs_parts[1].split(" ")
assert int(hours_part[0]) == 11
@pytest.mark.skip("... ALMA isn't calibratable...")
def test_handles_alma():
"""
Can we report a pending ALMA observation?
:return:
"""
spl = "uid://alma/execblock/a4154db4-a834-4020-896a-6fd4613bf986"
expected = {
PROJECT: "2019.1.01635.S",
OBS_TIME: "0 days, 1 hours, 57 minutes, 4 seconds",
QUEUE_TIME: "517 days, 2 hours, 5 minutes, 15 seconds",
}
with patch("json.dumps", MagicMock(return_value=expected)):
wrester = mock_wrester(argparse.Namespace(spl=spl))
actual = wrester.wrest_observation_info()
assert actual[PROJECT] == expected[PROJECT]
time_parts = actual[QUEUE_TIME].split(", ")
days_part = time_parts[0].split(" ")
assert int(days_part[0]) >= 517
"""add restore capabilities
Revision ID: 7200d0d19938
Revises: a7c2b4682aae
Create Date: 2021-07-13 09:51:45.729067
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "7200d0d19938"
down_revision = "a7c2b4682aae"
branch_labels = None
depends_on = None
condor_content = """executable = restore_cms.sh
arguments = {{product_locator}} {{cal_product_locator}} {{request_id}} metadata.json PPR.xml
output = restore.out
error = restore.err
log = condor.log
SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
should_transfer_files = yes
transfer_input_files = $(SBIN_PATH)/.matplotlib, $(SBIN_PATH)/pycapo, $(SBIN_PATH)/framework.sh, $(SBIN_PATH)/productfetcher, $(SBIN_PATH)/casa_envoy, $(SBIN_PATH)/vela, $(SBIN_PATH)/deliver, ./PPR.xml, ./metadata.json
transfer_output_files = working, rawdata, products
request_memory = {{ramInGb}}
getenv = True
environment = "CAPO_PATH=/home/casa/capo"
queue
"""
script_content = """#!/bin/sh
export HOME=$TMPDIR
set -o errexit
./framework.sh -d .
chmod 770 .
cd rawdata/
../productfetcher --product-locator $1 $2
cd ../
./casa_envoy --restore -c $4 $5
cp ./working/{{sdmId}}.ms ./products/
./deliver -p ./products --prefix $3
"""
metadata_content = """{
"fileSetIds": ["{{sdmId}}", "{{calSdmId}}"]
"workflowName": "std_calibration",
"systemId": "{{request_id}}",
"creationTime": "{{created_at}}",
"productLocator": "{{product_locator}}",
"calProductLocator": "{{cal_locator}}",
"projectMetadata": {
"projectCode": "{{projectCode}}",
"title": "{{title}}",
"startTime": "{{startTime}}",
"observer": "{{observer}}"
},
"destinationDirectory": "{{root_directory}}/{{relative_path}}"
}
"""
ppr_content = """<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ns2:SciPipeRequest xmlns:ns2="Common/pipelinescience/SciPipeRequest">
<ProjectSummary>
<ProposalCode>VLA/null</ProposalCode>
<Observatory>NRAO</Observatory>
<Telescope>VLA</Telescope>
<ProcessingSite>Socorro</ProcessingSite>
<Operator>vlapipe</Operator>
<Mode>SCIENCE</Mode>
<Version>NGRH-ALMA-10_8</Version>
<CreationTime>{{created_at}}</CreationTime>
</ProjectSummary>
<ProjectStructure>TBD</ProjectStructure>
<ProcessingRequests>
<RootDirectory>{{root_directory}}</RootDirectory>
<ProcessingRequest>
<ProcessingIntents>
<Intents>
<Keyword>VLA_INTERFEROMETRY_STANDARD_OBSERVING_MODE</Keyword>
<Value>Undefined</Value>
</Intents>
</ProcessingIntents>
<ProcessingProcedure>
<ProcedureTitle>Workspaces Restore</ProcedureTitle>
<ProcessingCommand>
<Command>hifv_restoredata</Command>
<ParameterSet/>
</ProcessingCommand>
<ProcessingCommand>
<Command>hifv_statwt</Command>
<ParameterSet/>
</ProcessingCommand>
</ProcessingProcedure>
<DataSet>
<RelativePath>{{relative_path}}</RelativePath>
<SdmIdentifier>{{sdmId}}</SdmIdentifier>
<DataType>asdm</DataType>
</DataSet>
</ProcessingRequest>0
</ProcessingRequests>
<ResultsProcessing>
<ArchiveResults>false</ArchiveResults>
<CleanUpDisk>false</CleanUpDisk>
<UpdateProjectLifeCycle>false</UpdateProjectLifeCycle>
<NotifyOperatorWhenDone>false</NotifyOperatorWhenDone>
<SDMall>false</SDMall>
<SDMonly>false</SDMonly>
<PipelineOperatorAddress>Unknown</PipelineOperatorAddress>
</ResultsProcessing>
</ns2:SciPipeRequest>
"""
def upgrade():
restore_steps = """prepare-and-run-workflow restore_cms
await-workflow
await-qa
"""
op.execute(
f"""
INSERT INTO capabilities (capability_name, capability_steps, max_jobs)
VALUES ('restore_cms', '{restore_steps}', 20)
"""
)
op.execute(
"""
INSERT INTO workflows (workflow_name) VALUES ('restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore_cms.condor', E'{condor_content}', 'restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('restore_cms.sh', E'{script_content}', 'restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('metadata.json', E'{metadata_content}', 'restore_cms')
"""
)
op.execute(
f"""
INSERT INTO workflow_templates (filename, content, workflow_name)
VALUES ('PPR.xml', E'{ppr_content}', 'restore_cms')
"""
)
def downgrade():
op.execute(
"""
DELETE FROM capabilities WHERE capability_name = 'restore_cms'
"""
)
op.execute(
"""
DELETE FROM workflows WHERE workflow_name = 'restore_cms'
"""
)
op.execute(
"""
DELETE FROM workflow_templates WHERE workflow_name = 'restore_cms'
"""
)
"""empty message
Revision ID: a70e3e5d5bf3
Revises: 0bca0d8b3564, 7200d0d19938
Create Date: 2021-07-15 10:22:22.651390
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'a70e3e5d5bf3'
down_revision = ('0bca0d8b3564', '7200d0d19938')
branch_labels = None
depends_on = None
def upgrade():
pass
def downgrade():
pass
from typing import Dict from typing import Dict
from unittest.mock import patch from unittest.mock import patch
import json
import pytest import pytest
from workspaces.products.services.archive_service import ArchiveService from workspaces.products.services.archive_service import ArchiveService
@pytest.fixture(scope="module") @pytest.fixture(scope="module")
def fake_metadata() -> Dict[str, str]: def fake_metadata() -> json:
return { return {
"spl": "uid://evla/execblock/8fbfb54b-d141-42fe-b079-609339a69cba", "spl": "uid://evla/execblock/8fbfb54b-d141-42fe-b079-609339a69cba",
"bands": "Ka", "bands": "Ka",
...@@ -46,7 +47,7 @@ class TestArchiveService: ...@@ -46,7 +47,7 @@ class TestArchiveService:
expected_spl = "uid://evla/execblock/8fbfb54b-d141-42fe-b079-609339a69cba" expected_spl = "uid://evla/execblock/8fbfb54b-d141-42fe-b079-609339a69cba"
with patch( with patch(
"workspaces.products.services.archive_service.WrestWorkflowMetadata.wrest_obs_metadata_from_fileset_id", "workspaces.products.services.archive_service.WrestObservationMetadata.wrest_observation_info",
) as mock_wrest_obs_metadata: ) as mock_wrest_obs_metadata:
mock_wrest_obs_metadata.return_value = fake_metadata mock_wrest_obs_metadata.return_value = fake_metadata
with patch("workspaces.products.services.archive_service.Router"): with patch("workspaces.products.services.archive_service.Router"):
......
...@@ -2,7 +2,8 @@ import logging ...@@ -2,7 +2,8 @@ import logging
from typing import Dict from typing import Dict
import requests import requests
from aat_wrest.metadata_wrester import WrestWorkflowMetadata from aat_wrest.observation_wrester import WrestObservationMetadata
from aat_wrest.utilities import MDDBConnector from aat_wrest.utilities import MDDBConnector
from messaging.router import Router, on_message from messaging.router import Router, on_message
from pycapo import CapoConfig from pycapo import CapoConfig
...@@ -36,12 +37,12 @@ class ArchiveService(ArchiveServiceIF): ...@@ -36,12 +37,12 @@ class ArchiveService(ArchiveServiceIF):
logger.info(f"ingestion-complete event: {logdata}") logger.info(f"ingestion-complete event: {logdata}")
execblock_id = logdata["execblock_id"] execblock_id = logdata["execblock_id"]
fileset_id = logdata["fileset_id"] sdm_id = logdata["fileset_id"]
project_code = logdata["project_code"] project_code = logdata["project_code"]
wrester = WrestWorkflowMetadata(MDDBConnector(), spl=None, fileset_id=fileset_id) wrester = WrestObservationMetadata(MDDBConnector(), sdm_id=sdm_id)
metadata = wrester.wrest_obs_metadata_from_fileset_id(fileset_id) metadata = wrester.wrest_observation_info()
metadata["fileset_id"] = fileset_id metadata["sdm_id"] = sdm_id
spl = metadata["spl"] spl = metadata["spl"]
del metadata["spl"] del metadata["spl"]
...@@ -53,7 +54,7 @@ class ArchiveService(ArchiveServiceIF): ...@@ -53,7 +54,7 @@ class ArchiveService(ArchiveServiceIF):
type(response), type(response),
response.content, response.content,
) )
return spl, execblock_id, fileset_id, project_code return spl, execblock_id, sdm_id, project_code
@staticmethod @staticmethod
def _create_std_calibration_request(spl: str, metadata: Dict[str, str]) -> Response: def _create_std_calibration_request(spl: str, metadata: Dict[str, str]) -> Response:
...@@ -62,7 +63,7 @@ class ArchiveService(ArchiveServiceIF): ...@@ -62,7 +63,7 @@ class ArchiveService(ArchiveServiceIF):
:param spl: Science product locator of observation :param spl: Science product locator of observation
:param metadata: Dict with the following information inside: :param metadata: Dict with the following information inside:
- fileset_id: File Set ID - sdm_id: SDM ID (sometimes called fileset id)
- bands: Radio bands for observation - bands: Radio bands for observation
- array_config: Configuration of array telescopes - array_config: Configuration of array telescopes
- obs_start_time: Start time of observation - obs_start_time: Start time of observation
......
...@@ -235,6 +235,9 @@ class WorkflowService(WorkflowServiceIF): ...@@ -235,6 +235,9 @@ class WorkflowService(WorkflowServiceIF):
if "calibration" in name: if "calibration" in name:
wrest_type = "-sc" wrest_type = "-sc"
argument = wf_request.argument["product_locator"] argument = wf_request.argument["product_locator"]
elif "restore" in name:
wrest_type = "-r"
argument = [wf_request.argument["product_locator"], wf_request.argument["cal_locator"]]
elif "imaging" in name: elif "imaging" in name:
wrest_type = "-si" wrest_type = "-si"
argument = wf_request.argument["sdmId"] argument = wf_request.argument["sdmId"]
......