Skip to content
Snippets Groups Projects
Commit 76a28798 authored by Charlotte Hausman's avatar Charlotte Hausman
Browse files

fix for file transfer issue - on main

parent 8eae5d9d
No related branches found
No related tags found
1 merge request!810fix for file transfer issue - on main
Pipeline #4442 passed
Pipeline: workspaces

#4446

    ......@@ -15,11 +15,14 @@
    #
    # You should have received a copy of the GNU General Public License
    # along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
    """
    Classes and methods for laying the data location foundation for various types of CASA processing
    """
    import logging
    import os
    import shutil
    import sys
    import tarfile
    from distutils.dir_util import copy_tree
    ......@@ -29,7 +32,43 @@ from casa_envoy.interfaces import FoundationIF
    from casa_envoy.schema import AbstractTextFile
    class GeneralFoundation(FoundationIF):
    """
    Class to set up the general data foundation for anything running CASA.
    - Special attention to user provided file locations, must be placed in working directory.
    """
    def __init__(self, parameters: dict, metadata: AbstractTextFile):
    self.logger = logging.getLogger("casa_envoy")
    self.parent_path = parameters["parent_path"]
    self.working_dir = self.parent_path / "working"
    self.metadata = metadata
    def data_foundation(self):
    """
    All user provided files should be placed in the 'working' directory inorder for CASA to pick them up.
    :return:
    """
    self.logger.info("LAYING DATA FOUNDATION...")
    if "providedFiles" in self.metadata.content:
    provided_files = (
    self.metadata.content["providedFiles"].replace("[", "").replace("'", "").replace("]", "").split(",")
    )
    # move provided files from parent directory into working directory for CASA
    for file in provided_files:
    shutil.copy(file.strip(), self.working_dir)
    self.logger.info("DATA FOUNDATION COMPLETE!")
    class RestoreFoundation(FoundationIF):
    """
    Class to set up the data foundation for restores
    """
    def __init__(self, parameters: dict, metadata: AbstractTextFile):
    self.logger = logging.getLogger("casa_envoy")
    self.parameters = parameters
    ......@@ -42,6 +81,7 @@ class RestoreFoundation(FoundationIF):
    After download, all data is in rawdata and the calibrations tables are contained
    in a tar file. We need to extract all the calibration tables to the products directory
    for CASA processing.
    :return:
    """
    ......
    ......@@ -25,7 +25,7 @@ import sys
    from typing import Dict
    from casa_envoy.auditor import AuditDirectories, AuditFiles
    from casa_envoy.foundation import RestoreFoundation
    from casa_envoy.foundation import RestoreFoundation, GeneralFoundation
    from casa_envoy.interfaces import LauncherIF
    from casa_envoy.schema import AbstractTextFile
    ......@@ -143,6 +143,7 @@ class CalibrationLauncher(LauncherIF):
    def prepare_for_casa(self):
    # Ensure all data is in the required locations for CASA processing (This is not always rawdata!)
    GeneralFoundation(self.parameters, self.metadata).data_foundation()
    if self.parameters["product_type"] == "restore":
    RestoreFoundation(self.parameters, self.metadata).data_foundation()
    ......@@ -197,6 +198,8 @@ class ImagingLauncher(LauncherIF):
    if check_input:
    wf_name = self.metadata.content["workflowName"]
    GeneralFoundation(self.parameters, self.metadata).data_foundation()
    if "restore" in wf_name:
    RestoreFoundation(self.parameters, self.metadata).data_foundation()
    ......
    ......@@ -16,7 +16,8 @@
    # You should have received a copy of the GNU General Public License
    # along with Workspaces. If not, see <https://www.gnu.org/licenses/>.
    import os
    from unittest.mock import mock_open, patch
    from pathlib import Path
    from unittest.mock import patch
    from casa_envoy.launchers import CalibrationLauncher, CasaLauncher, ImagingLauncher
    ......@@ -28,6 +29,7 @@ cal_parameters = {
    "metadata": "test/input_files/test.json",
    "ppr": "test/input_files/PPR.xml",
    "product_type": "standard-cal",
    "parent_path": Path("I/am/a/path"),
    }
    img_parameters = {
    "useCasa": False,
    ......@@ -37,6 +39,7 @@ img_parameters = {
    "metadata": "test/input_files/image-metadata.json",
    "ppr": "test/input_files/cmsimage-PPR.xml",
    "product_type": "standard-img",
    "parent_path": Path("I/am/a/path"),
    }
    restore_parameters = {
    ......
    ......@@ -158,8 +158,8 @@ class ExecutionManager(ExecutionManagerIF):
    :param files:
    """
    logger.info("Submitting workflow request")
    transfer_files_string = self._get_transfer_files_property(files)
    args_list = {**workflow_args, "files_to_transfer": transfer_files_string}
    transfer_files_string, transfer_files_list = self._get_transfer_files_properties(files)
    args_list = {**workflow_args, "files_to_transfer": transfer_files_string, "provided_files": transfer_files_list}
    execution = self.capability_info.lookup_execution(execution_id)
    ......@@ -299,6 +299,14 @@ class ExecutionManager(ExecutionManagerIF):
    self._error_execution(execution)
    @staticmethod
    def _get_transfer_files_property(files: List[AbstractFile]):
    def _get_transfer_files_properties(files: List[AbstractFile]):
    name_list = []
    separator = ", ./"
    return separator + separator.join(file.filename for file in files)
    # needed for template rendering
    if files:
    for file in files:
    name_list.append(file.filename)
    return [separator + separator.join(file.filename for file in files), name_list]
    else:
    return ["", []]
    ......@@ -700,9 +700,9 @@ class WorkflowMessageHandler:
    subject = message["subject"]
    wf_req_id = subject["workflow_request_id"]
    wf_request = self.info.lookup_workflow_request(wf_req_id)
    subject["execution_wf_id"] = wf_request.argument["parent_wf_request_id"]
    if "ingest" in wf_request.workflow_name:
    subject["execution_wf_id"] = wf_request.argument["parent_wf_request_id"]
    logger.info("SENDING INGESTION FAILED MESSAGE!")
    ingestion_failed_msg = WorkflowMessageArchitect(previous_info=subject).compose_message("ingestion_failed")
    ......
    0% Loading or .
    You are about to add 0 people to the discussion. Proceed with caution.
    Finish editing this message first!
    Please register or to comment