diff --git a/schema/versions/990c5cd70082_add_dag_ified_null_workflow.py b/schema/versions/990c5cd70082_add_dag_ified_null_workflow.py
new file mode 100644
index 0000000000000000000000000000000000000000..eeb6ca85214963fc5ab98d766c06519b1c0d582b
--- /dev/null
+++ b/schema/versions/990c5cd70082_add_dag_ified_null_workflow.py
@@ -0,0 +1,73 @@
+"""add DAG-ified null workflow
+
+Revision ID: 990c5cd70082
+Revises: 257537f99abc
+Create Date: 2021-09-14 15:12:31.154288
+
+"""
+from alembic import op
+
+# revision identifiers, used by Alembic.
+revision = "990c5cd70082"
+down_revision = "257537f99abc"
+branch_labels = None
+depends_on = None
+
+null_dag_script = """#!/bin/sh
+
+./null $*
+"""
+null_dag_job_submit = """executable = null_dag.sh
+arguments = {}
+
+output = null_dag.$(jobname).out
+error = null_dag.$(jobname).err
+log = null_dag.$(jobname).log
+
+SBIN_PATH = /lustre/aoc/cluster/pipeline/$ENV(CAPO_PROFILE)/workspaces/sbin
+should_transfer_files = yes
+transfer_input_files = $(SBIN_PATH)/null
+
+queue
+"""
+null_dag_dag_submit = """JOB GREET null_dag_greeting.condor
+VARS GREET jobname="$(JOB)"
+
+JOB NAP null_dag_nap.condor
+VARS NAP jobname="$(JOB)"
+
+JOB VBOSE_GREET null_dag_verbose_greeting.condor
+VARS VBOSE_GREET jobname="$(JOB)"
+
+PARENT GREET CHILD NAP
+PARENT NAP CHILD VBOSE_GREET
+"""
+
+
+def upgrade():
+    op.execute("INSERT INTO workflows (workflow_name) VALUES ('null_dag')")
+    op.execute(
+        "INSERT INTO workflow_templates (workflow_name, filename, content) "
+        f"VALUES ('null_dag', 'null_dag.sh', E'{null_dag_script}')"
+    )
+    op.execute(
+        "INSERT INTO workflow_templates (workflow_name, filename, content) "
+        f"VALUES ('null_dag', 'null_dag_greeting.condor', E'{null_dag_job_submit.format('--greeting')}')"
+    )
+    op.execute(
+        "INSERT INTO workflow_templates (workflow_name, filename, content) "
+        f"VALUES ('null_dag', 'null_dag_nap.condor', E'{null_dag_job_submit.format('--verbose --nap')}')"
+    )
+    op.execute(
+        "INSERT INTO workflow_templates (workflow_name, filename, content) "
+        f"VALUES ('null_dag', 'null_dag_verbose_greeting.condor', E'{null_dag_job_submit.format('--verbose --greeting')}')"
+    )
+    op.execute(
+        "INSERT INTO workflow_templates (workflow_name, filename, content) "
+        f"VALUES ('null_dag', 'null_dag.dag', E'{null_dag_dag_submit}')"
+    )
+
+
+def downgrade():
+    op.execute("DELETE FROM workflows WHERE workflow_name = 'null_dag'")
+    op.execute("DELETE FROM workflow_templates WHERE workflow_name = 'null_dag'")
diff --git a/shared/workspaces/test/test_workflow_service.py b/shared/workspaces/test/test_workflow_service.py
index 7da509017ca54e6d8cf700d2402a6950861c6447..d05100095d7d398d2f26edfd5fc8d0eaa351618b 100644
--- a/shared/workspaces/test/test_workflow_service.py
+++ b/shared/workspaces/test/test_workflow_service.py
@@ -1,7 +1,9 @@
 """ Tests for the workflow service """
 
 import json
+import os
 import pathlib
+import shutil
 from typing import List
 
 # pylint: disable=E0401, R0201, W0212
@@ -133,6 +135,7 @@ class TestWorkflowService:
             },
             "type": "workflow-complete",
         }
+
         assert request.state == "Ready"
 
         with patch("workspaces.workflow.schema.WorkflowRequest.update_status") as mock_update:
@@ -175,6 +178,85 @@ class TestWorkflowService:
         usable = mock_workflow_service._determine_usable_files(request, files)
         assert len(usable) == 3
 
+    def test_execute_prepared(self, mock_workflow_service: WorkflowService, tmp_path: pathlib.Path):
+        # Create fake folder and DAG file
+        fake_folder = tmp_path / "test"
+        fake_folder.mkdir()
+        fake_dagfile = fake_folder / "test.dag"
+        fake_dagfile.touch()
+
+        orig_submit_dag = mock_workflow_service._submit_dag
+        mock_workflow_service._submit_dag = MagicMock()
+
+        # Test DAG submission
+        mock_workflow_service._execute_prepared(fake_folder)
+        mock_workflow_service._submit_dag.assert_called_with(fake_folder, fake_dagfile)
+
+        # DAG cleanup
+        mock_workflow_service._submit_dag = orig_submit_dag
+        os.remove(fake_dagfile)
+
+        # Create fake submit file
+        fake_jobfile = fake_folder / "test.condor"
+        fake_jobfile.touch()
+
+        orig_submit_job = mock_workflow_service._submit_job
+        mock_workflow_service._submit_job = MagicMock()
+
+        # Test job submission
+        mock_workflow_service._execute_prepared(fake_folder)
+        mock_workflow_service._submit_job.assert_called_with(fake_folder, fake_jobfile)
+
+        # Cleanup
+        mock_workflow_service._submit_job = orig_submit_job
+        shutil.rmtree(tmp_path)
+
+    def test_submit_job(self, mock_workflow_service: WorkflowService):
+        fake_folder = pathlib.Path("fake_folder")
+        fake_jobfile = pathlib.Path("fake.submit")
+        fake_jobfile.touch()
+        with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run:
+            log_path = mock_workflow_service._submit_job(fake_folder, fake_jobfile)
+            mock_sp_run.assert_called_with(
+                ["condor_submit", fake_jobfile.name],
+                cwd=str(fake_folder.absolute()),
+                preexec_fn=mock_workflow_service._switch_to_submituser,
+            )
+            assert log_path.name == "fake.log"
+
+        os.remove(fake_jobfile)
+
+    def test_submit_dag(self, mock_workflow_service: WorkflowService):
+        fake_folder = pathlib.Path("fake_folder")
+        fake_dagfile = pathlib.Path("fake.dag")
+        with patch("workspaces.workflow.services.workflow_service.subprocess.run") as mock_sp_run:
+            log_path = mock_workflow_service._submit_dag(fake_folder, fake_dagfile)
+            mock_sp_run.assert_called_with(
+                ["condor_submit_dag", "-f", fake_dagfile.name],
+                cwd=str(fake_folder.absolute()),
+                preexec_fn=mock_workflow_service._switch_to_submituser,
+            )
+            assert log_path.name == "fake.dag.dagman.log"
+
+    def test_get_jobfile_log(self, tmp_path: pathlib.Path, mock_workflow_service: WorkflowService):
+        with open(tmp_path / "fake.submit", "w") as fake_jobfile:
+            # Test with file that doesn't containe "log = " entry
+            log_path = mock_workflow_service._get_job_logfile_name(tmp_path / "fake.submit")
+            assert log_path.name == "fake.log"
+            # Write "log = " entry
+            fake_jobfile.write("log = test.log")
+
+        # Test with same file that now contains "log = " entry
+        log_path = mock_workflow_service._get_job_logfile_name(tmp_path / "fake.submit")
+        assert log_path.name == "test.log"
+
+        # Remove tmp_path just in case
+        shutil.rmtree(tmp_path)
+
+    def test_get_dagfile_log(self, mock_workflow_service: WorkflowService):
+        log_path = mock_workflow_service._get_dag_logfile_name(pathlib.Path("fake.dag"))
+        assert log_path.name == "fake.dag.dagman.log"
+
     @pytest.mark.skip("test broken")
     @patch("workspaces.workflow.services.workflow_service.Router")
     def test_send_carta_url_to_aat(self, mock_router: MagicMock, mock_workflow_service: WorkflowService):
diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py
index 803170b4567e0e0183701509127b09c8ed9b09aa..dceb755b99bee9896d27af15a0c77a46c18a2fb6 100644
--- a/shared/workspaces/workspaces/workflow/services/workflow_service.py
+++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py
@@ -19,6 +19,7 @@ from messaging.router import Router, on_message
 from pycapo import CapoConfig
 from requests import Response
 
+from workspaces.system.services.remote_processing_service import CapoInjector
 from workspaces.workflow.enum import ArchiveWorkflows, WorkflowRequestState
 from workspaces.workflow.message_architect import (
     ArchiveMessageArchitect,
@@ -27,7 +28,6 @@ from workspaces.workflow.message_architect import (
 from workspaces.workflow.schema import WorkflowRequest, WorkflowRequestFile
 from workspaces.workflow.schema_interfaces import WorkflowIF, WorkflowRequestIF
 from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF
-from workspaces.system.services.remote_processing_service import CapoInjector
 
 
 logger = logging.getLogger(__name__)
@@ -47,8 +47,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
         :return: the response as JSON
         """
         response = requests.post(
-            f"{self.url}/workflows/{request.workflow_name}"
-            f"/requests/{request.workflow_request_id}/submit"
+            f"{self.url}/workflows/{request.workflow_name}" f"/requests/{request.workflow_request_id}/submit"
         )
         logger.info(
             "Got result %s with type %s and body %s",
@@ -58,9 +57,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
         )
         return response.json()
 
-    def attach_file_to_request(
-        self, request: WorkflowRequestIF, filename: str, content: bytes
-    ) -> Response:
+    def attach_file_to_request(self, request: WorkflowRequestIF, filename: str, content: bytes) -> Response:
         """
         Add a file to this workflow request.
 
@@ -86,14 +83,10 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
         :return: dict containing file content
         """
 
-        response = requests.get(
-            f"{self.url}/workflows/{name}/requests/{request_id}/files/{filename}"
-        )
+        response = requests.get(f"{self.url}/workflows/{name}/requests/{request_id}/files/{filename}")
         return response.content.decode()
 
-    def create_workflow_request(
-        self, workflow: Union[str, WorkflowIF], argument: Dict
-    ) -> WorkflowRequestIF:
+    def create_workflow_request(self, workflow: Union[str, WorkflowIF], argument: Dict) -> WorkflowRequestIF:
         """
         Create a workflow request using the supplied arguments.
 
@@ -129,9 +122,7 @@ class WorkflowServiceRESTClient(WorkflowServiceIF):
         :param request: completed workflow request to ingest
         :return:
         """
-        requests.post(
-            f"{self.url}/workflows/{request.workflow_name}/requests/{request.workflow_request_id}/ingest"
-        )
+        requests.post(f"{self.url}/workflows/{request.workflow_name}/requests/{request.workflow_request_id}/ingest")
 
 
 class WorkflowService(WorkflowServiceIF):
@@ -163,10 +154,7 @@ class WorkflowService(WorkflowServiceIF):
         """
         forbidden = self._get_forbidden_templates_list(request.workflow_name)
         if filename not in forbidden:
-            if (
-                ArchiveWorkflows.is_archive_wf(request.workflow_name)
-                and filename == "metadata.json"
-            ):
+            if ArchiveWorkflows.is_archive_wf(request.workflow_name) and filename == "metadata.json":
                 content_dict = json.loads(content.decode())
                 content_dict["workflowName"] = request.workflow_name
                 content_dict["data_location"] = request.argument["data_location"]
@@ -199,9 +187,7 @@ class WorkflowService(WorkflowServiceIF):
         :param workflow_request_id: ID of carta workflow that wants to send this message
         :param carta_url: JSON blob with CARTA URL
         """
-        logger.info(
-            f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!"
-        )
+        logger.info(f"SENDING CARTA MESSAGE to AAT Request Handler for request #{workflow_request_id}!")
         wf_request = self.info.lookup_workflow_request(workflow_request_id)
         routing_key = f"ws-workflow.carta-instance-ready.{workflow_request_id}"
         carta_url_msg = ArchiveMessageArchitect(
@@ -242,11 +228,7 @@ class WorkflowService(WorkflowServiceIF):
 
         # create a temporary directory if processing directory is not supplied,
         # needs to exist before template rendering
-        temp_folder = (
-            self._make_temp_directory(request)
-            if not request.results_dir
-            else Path(request.results_dir)
-        )
+        temp_folder = self._make_temp_directory(request) if not request.results_dir else Path(request.results_dir)
 
         # if remote is true, create a capo subspace file in the request's directory
         if remote:
@@ -326,9 +308,7 @@ class WorkflowService(WorkflowServiceIF):
                 )
                 request.argument["ramInGb"] = self.processing_settings.ramInGb
 
-    def _render_with_metadata(
-        self, wf_request: WorkflowRequestIF, tempdir: Path, wf_definition: WorkflowIF
-    ):
+    def _render_with_metadata(self, wf_request: WorkflowRequestIF, tempdir: Path, wf_definition: WorkflowIF):
         name = wf_request.workflow_name
         if "calibration" in name:
             wrest_type = "-sc"
@@ -348,9 +328,7 @@ class WorkflowService(WorkflowServiceIF):
             argument2 = []
         elif "ingest" in name:
             wrest_type = "-aux"
-            parent_req = self.info.lookup_workflow_request(
-                int(wf_request.argument["parent_wf_request_id"])
-            )
+            parent_req = self.info.lookup_workflow_request(int(wf_request.argument["parent_wf_request_id"]))
             eb = (
                 parent_req.argument["product_locator"]
                 if "product_locator" in parent_req.argument
@@ -381,15 +359,11 @@ class WorkflowService(WorkflowServiceIF):
             else:
                 logger.error(wf_json.decode())
                 logger.info("SENDING WORKFLOW FAIL MESSAGE!")
-                failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message(
-                    "workflow_failed"
-                )
+                failed_msg = WorkflowMessageArchitect(request=wf_request).compose_message("workflow_failed")
                 self.messenger.send_message(**failed_msg)
                 return wf_request
 
-    def _determine_usable_files(
-        self, request: WorkflowRequestIF, templated_files: List[WorkflowRequestFile]
-    ):
+    def _determine_usable_files(self, request: WorkflowRequestIF, templated_files: List[WorkflowRequestFile]):
         # Override templates if user supplied file has same name and is a valid input file
         usable_templates = []
         usable_files = []
@@ -447,21 +421,56 @@ class WorkflowService(WorkflowServiceIF):
         logger.info("executing on folder %s", folder)
 
         # some file in here should end in .dag; that file is our dagman input
-        # dagman = list(folder.glob("*.dag"))[0]
-        # logger.info("dagman file %s exists.", dagman)
+        dag_files = list(folder.glob("*.dag"))
+        if dag_files:
+            logger.info("dagman file %s exists.", dag_files[0])
+            return self._submit_dag(folder, dag_files[0])
+        else:
+            job_file = list(folder.glob("*.condor"))[0]
+            logger.info("condor file %s exists.", job_file)
+            return self._submit_job(folder, job_file)
+
+    def _submit_job(self, folder: Path, job_file: Path) -> Path:
+        """
+        Submit job file to HTCondor
+
+        :param folder: Folder to execute workflow in
+        :param job_file: Path to job submit file
+        :return: PAth to workflow log file
+        """
+        # ensure the log file exists
+        logfile = self._get_job_logfile_name(job_file)
+        logger.info("log file %s exists.", logfile)
+        logfile.touch()
+
+        # submit
+        logger.info("submitting job to condor...")
+        subprocess.run(
+            ["condor_submit", str(job_file)],
+            cwd=str(folder.absolute()),
+            preexec_fn=self._switch_to_submituser,
+        )
+
+        # return the logfile
+        return logfile
 
-        condor = list(folder.glob("*.condor"))[0]
-        logger.info("condor file %s exists.", condor)
+    def _submit_dag(self, folder: Path, dag_file: Path) -> Path:
+        """
+        Submit DAG file to HTCondor
 
+        :param folder: Folder to execute workflow in
+        :param dag_file: Path to DAG submit file
+        :return: Path to workflow log file
+        """
         # ensure the log file exists
-        logfile = self._get_logfile_name(condor)
+        logfile = self._get_dag_logfile_name(dag_file)
         logger.info("log file %s exists.", logfile)
         logfile.touch()
 
         # submit
-        logger.info("submitting to condor...")
+        logger.info("submitting DAG to condor...")
         subprocess.run(
-            ["condor_submit", str(condor)],
+            ["condor_submit_dag", "-f", str(dag_file)],
             cwd=str(folder.absolute()),
             preexec_fn=self._switch_to_submituser,
         )
@@ -483,7 +492,7 @@ class WorkflowService(WorkflowServiceIF):
         os.setuid(submituser_uid)
 
     @staticmethod
-    def _get_logfile_name(jobfile_name: Path) -> Path:
+    def _get_job_logfile_name(jobfile_name: Path) -> Path:
         """
         Read HTCondor job file and get the log file name, if it exists
 
@@ -497,6 +506,16 @@ class WorkflowService(WorkflowServiceIF):
                     return Path(logfile_name.strip())
         return Path(f"{jobfile_name.stem}.log")
 
+    @staticmethod
+    def _get_dag_logfile_name(dag_file: Path) -> Path:
+        """
+        Return path to DAG log file, which will always be {dag-file-name}.dagman.log
+
+        :param dag_file: Path to workflow DAG file
+        :return: Path to workflow DAG log
+        """
+        return Path(str(dag_file) + ".dagman.log")
+
     @staticmethod
     def _get_forbidden_templates_list(workflow_name: str):
         return [
@@ -569,9 +588,9 @@ class WorkflowMessageHandler:
             logger.info("SENDING INGESTION COMPLETE MESSAGE!")
             subject["execution_wf_id"] = wf_request.argument["parent_wf_request_id"]
 
-            ingestion_complete_msg = WorkflowMessageArchitect(
-                previous_info=subject
-            ).compose_message("ingestion_complete")
+            ingestion_complete_msg = WorkflowMessageArchitect(previous_info=subject).compose_message(
+                "ingestion_complete"
+            )
             self.messenger.send_message(**ingestion_complete_msg)
 
     @on_message(service="workflow")
@@ -594,9 +613,7 @@ class WorkflowMessageHandler:
 
                 if htcondor_job_id := int(message["condor_metadata"]["condor_job_id"]):
                     # Workflow has corresponding condor job ID
-                    logger.info(
-                        f"Workflow request has an HTCondor job ID of {htcondor_job_id}. Setting DB column!"
-                    )
+                    logger.info(f"Workflow request has an HTCondor job ID of {htcondor_job_id}. Setting DB column!")
                     request.htcondor_job_id = htcondor_job_id
             elif message["type"] == "workflow-complete":
                 status = WorkflowRequestState.Complete.name
@@ -631,8 +648,7 @@ class WorkflowMessageHandler:
             except Exception as exc:
                 transaction.abort()
                 logger.error(
-                    f"Failed to update status on workflow request "
-                    f"{request.workflow_request_id} to {status}: {exc}"
+                    f"Failed to update status on workflow request " f"{request.workflow_request_id} to {status}: {exc}"
                 )
         else:
             logger.warning(f"Message {message} does not concern a workflow request. Ignoring.")
@@ -642,24 +658,19 @@ class WorkflowMessageHandler:
         wf_id = subject["workflow_request_id"]
         wf_request = self.info.lookup_workflow_request(wf_id)
 
-        if (
-            wf_request.workflow_name == ArchiveWorkflows.CARTA.value
-            and wf_request.argument["notify_ready"] is False
-        ):
-            logger.info(
-                f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!"
-            )
+        if wf_request.workflow_name == ArchiveWorkflows.CARTA.value and wf_request.argument["notify_ready"] is False:
+            logger.info(f"SENDING FAILED CARTA MESSAGE to AAT Request Handler for request #{wf_id}!")
             routing_key = f"ws-workflow.carta-instance-ready.{wf_id}"
-            carta_url_msg = ArchiveMessageArchitect(
-                routing_key=routing_key, request=wf_request
-            ).compose_message("carta_failed")
+            carta_url_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message(
+                "carta_failed"
+            )
             self.archive_messenger.send_message(**carta_url_msg)
         if wf_request.workflow_name == ArchiveWorkflows.SECI.value:
             logger.info(f"SENDING FAILED SECI MESSAGE to VLASS Manager for request #{wf_id}!")
             routing_key = f"ws-workflow.seci.{wf_id}"
-            seci_msg = ArchiveMessageArchitect(
-                routing_key=routing_key, request=wf_request
-            ).compose_message("seci_failed")
+            seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message(
+                "seci_failed"
+            )
             self.archive_messenger.send_message(**seci_msg)
 
     def send_archive_complete_event(self, **message: Dict):
@@ -670,9 +681,9 @@ class WorkflowMessageHandler:
         if wf_request.workflow_name == ArchiveWorkflows.SECI.value:
             logger.info(f"SENDING SECI COMPLETE MESSAGE to VLASS Manager for request #{wf_id}!")
             routing_key = f"ws-workflow.seci.{wf_id}"
-            seci_msg = ArchiveMessageArchitect(
-                routing_key=routing_key, request=wf_request
-            ).compose_message("seci_complete")
+            seci_msg = ArchiveMessageArchitect(routing_key=routing_key, request=wf_request).compose_message(
+                "seci_complete"
+            )
             self.archive_messenger.send_message(**seci_msg)
 
     def clean_remote_workflow(self, request: WorkflowRequestIF):