diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py
index 9747a57c9879ea16afff72d30891818fe4693093..81515c907130c3ec32c9509568c7446959afbe76 100644
--- a/services/workflow/workflow/server.py
+++ b/services/workflow/workflow/server.py
@@ -226,7 +226,7 @@ class WorkflowFilesRestService:
             self.request.matchdict["filename"],
             self.request.context,
         )
-        file = self.request.info.save_file(
+        file = self.request.workflows.attach_file_to_request(
             request=self.request.context,
             filename=self.request.matchdict["filename"],
             content=self.request.body,
diff --git a/shared/workspaces/workspaces/capability/services/capability_engine.py b/shared/workspaces/workspaces/capability/services/capability_engine.py
index c1b92c68048fc6a7b843cec049255fa61f9e10e4..a6f060bfd289220b9c704199c726a74aff5f330a 100644
--- a/shared/workspaces/workspaces/capability/services/capability_engine.py
+++ b/shared/workspaces/workspaces/capability/services/capability_engine.py
@@ -79,7 +79,7 @@ class CapabilityEngine(CapabilityEngineIF):
         )
 
         for file in files:
-            response = self.workflow_service.attach_file_to_request(workflow_request, file)
+            response = self.workflow_service.attach_file_to_request(workflow_request, file.filename, file.content)
             logger.info(f"Added file {file.filename} to workflow request #{workflow_request.workflow_request_id}")
 
         execution = self.capability_info.lookup_execution(execution_id)
diff --git a/shared/workspaces/workspaces/capability/services/capability_info.py b/shared/workspaces/workspaces/capability/services/capability_info.py
index 262042a18dc29f4c22cf5ee2ab3c986283668e19..3bec2363cb7ff0733c72f473f2e55c397d9b85cb 100644
--- a/shared/workspaces/workspaces/capability/services/capability_info.py
+++ b/shared/workspaces/workspaces/capability/services/capability_info.py
@@ -125,13 +125,15 @@ class CapabilityInfo(CapabilityInfoIF):
         :return: new CapabilityVersion
         """
         request = self.lookup_capability_request(capability_request_id)
+        current_version_files = request.current_version.files
         version = CapabilityVersion(
             capability_request_id=capability_request_id,
             version_number=len(request.versions) + 1,
             parameters=request.parameters,
-            request=request
+            request=request,
         )
         self.save_entity(version)
+        self.copy_previous_version_files(version=version, previous_version_files=current_version_files)
         return version
 
     def create_execution(self, request: CapabilityRequest) -> CapabilityExecution:
@@ -232,6 +234,14 @@ class CapabilityInfo(CapabilityInfoIF):
         self.session.flush()
         return vf
 
+    def copy_previous_version_files(self,
+                                    version: CapabilityVersionIF,
+                                    previous_version_files: List[CapabilityVersionFile]):
+        for file in previous_version_files:
+            self.save_version_file(version=version,
+                                   filename=file.filename,
+                                   content=file.content)
+
     def delete_request_versions(self, request_id: int) -> int:
         """
         Deleting all versions of a capability request. This is executed as preparation for deleting the request itself
diff --git a/shared/workspaces/workspaces/workflow/services/interfaces.py b/shared/workspaces/workspaces/workflow/services/interfaces.py
index b25537f2710e0db9d7881d3224068f60a610275b..001bb1502e2a3ba442c3783a845514bd49935027 100644
--- a/shared/workspaces/workspaces/workflow/services/interfaces.py
+++ b/shared/workspaces/workspaces/workflow/services/interfaces.py
@@ -26,7 +26,7 @@ class WorkflowServiceIF(ABC):
         pass
 
     @abstractmethod
-    def attach_file_to_request(self, request: WorkflowRequestIF, file: AbstractFile):
+    def attach_file_to_request(self, request: WorkflowRequestIF, filename: str, content:bytes):
         pass
 
     @abstractmethod
diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py
index 53588ae9d596e0725a577f1ea579dcd8541c3fd7..4595483978453d9b01f8af0c0881dcf436bcb108 100644
--- a/shared/workspaces/workspaces/workflow/services/workflow_service.py
+++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py
@@ -38,17 +38,17 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
         )
         return result.json()
 
-    def attach_file_to_request(self, request: WorkflowRequestIF, file: AbstractFile):
-       result = requests.put(
+    def attach_file_to_request(self, request: WorkflowRequestIF, filename: str, content: bytes):
+        result = requests.put(
             f"{self.url}/workflows/{request.workflow_name}"
             f"/requests/{request.workflow_request_id}"
-            f"/files/{file.filename}",
-            data=file.content,
+            f"/files/{filename}",
+            data=content,
         )
-       return result
+        return result
 
     def create_workflow_request(
-        self, workflow: Union[str, WorkflowIF], argument: Dict
+            self, workflow: Union[str, WorkflowIF], argument: Dict
     ) -> WorkflowRequestIF:
         # 1. Handle the type ambiguity with the workflow argument
         workflow_name = workflow if type(workflow) == type(str()) else workflow.workflow_name
@@ -82,8 +82,12 @@ class WorkflowService(WorkflowServiceIF):
             CapoConfig().settings("edu.nrao.archive.workspaces.ProcessingSettings").rootDirectory
         )
 
-    def attach_file_to_request(self, request: WorkflowRequestIF, file: AbstractFile):
-        self.info.save_file(request=request, filename=file.filename, content=file.content)
+    def attach_file_to_request(self, request: WorkflowRequestIF, filename: str, content: bytes):
+        forbidden = self._get_forbidden_templates_list(request.workflow_name)
+        if filename not in forbidden:
+            self.info.save_file(request=request, filename=filename, content=content)
+        else:
+            logger.info(f"{filename} is a protected file name.")
 
     def create_workflow_request(self, workflow, argument) -> WorkflowRequestIF:
         # return self.info.save_request(WorkflowRequest(workflow_name=workflow, argument=argument))
@@ -127,11 +131,31 @@ class WorkflowService(WorkflowServiceIF):
         else:
             templated_files = definition.render_templates(request.argument)
 
-        for file in templated_files:
+        # 3. Combine the templated with the request files.
+        # Override templates if user supplied file has same name and is a valid input file
+        usable_templates = []
+        usable_files = []
+        forbidden_templates = self._get_forbidden_templates_list(request.workflow_name)
+        for template in templated_files:
+            # check if there is not a newer version supplied, add the template
+            if template.filename not in request.files:
+                usable_templates.append(template)
+            # check if a newer version was supplied and is not in the list of restricted access templates
+            if template.filename in request.files and template.filename in forbidden_templates:
+                usable_templates.append(template)
+                logger.info(f"Cannot append user file {request.files[template.filename]} to "
+                            f"workflow request #{request.workflow_request_id}. "
+                            f"Users are not allowed to override required system templates.")
+
+        # check that supplied files are not in conflict with the sorted templates
+        for file in request.files:
+            if file.filename not in usable_templates:
+                usable_files.append(file)
+
+        for file in usable_templates:
             self.info.save_file(request=request, filename=file.filename, content=file.content)
 
-        # 3. Combine the templated with the request files
-        workflow_files = templated_files + request.files
+        workflow_files = usable_templates + usable_files
 
         # 4. prepare files for condor execution
         if not request.results_dir:
@@ -254,6 +278,14 @@ class WorkflowService(WorkflowServiceIF):
                     return Path(logfile_name.strip())
         return Path(f"{jobfile_name.stem}.log")
 
+    @staticmethod
+    def _get_forbidden_templates_list(workflow_name: str):
+        return [
+            workflow_name + ".sh",
+            workflow_name + ".condor",
+            workflow_name + ".dag",
+        ]
+
     @on_message(service="workflow", type="workflow-complete")
     def propagate_delivery(self, **message: Dict):
         # look up the workflow request to get the path