diff --git a/services/workflow/src/workflow/server.py b/services/workflow/src/workflow/server.py
index f4c8c0147d0ecae33fc3cfb0abaf5cd2ebbc8b22..8ffb9ff5de2e0143334ff241b4d1b55bec6e1e62 100644
--- a/services/workflow/src/workflow/server.py
+++ b/services/workflow/src/workflow/server.py
@@ -1,7 +1,10 @@
 import inspect
+import stat
+import subprocess
 from abc import ABC, abstractmethod
 from dataclasses import dataclass
 from pathlib import Path
+from tempfile import mkdtemp
 from typing import List, Dict
 
 import sqlalchemy as sa
@@ -43,7 +46,7 @@ To consider:
 # ---------------------------------------------------------
 
 
-class WorkflowService(ABC):
+class WorkflowServiceIF(ABC):
     """
     Executes workflows; should be a freestanding service.
     """
@@ -60,7 +63,7 @@ class WorkflowService(ABC):
         raise NotImplementedError(f'{self.__class__.__name__}.{inspect.stack()[0][3]}')
 
 
-class WorkflowInfo(ABC):
+class WorkflowInfoIF(ABC):
     """
     Holds information about workflows.
     """
@@ -152,10 +155,87 @@ class WorkflowRequestFile(Base):
         return AbstractFile(self.filename, self.content)
 
 
-WORKFLOWS = [{'id': 1, 'name': 'foo', 'files': {'file.txt': {'id': 1, 'name': 'file.txt', 'content': 'Hello, world!'}}}]
+# ---------------------------------------------------------
+#
+#    I N N E R    S E R V I C E   L A Y E R
+#
+# ---------------------------------------------------------
+
+
+class WorkflowService(WorkflowServiceIF):
+    def execute(self, workflow_name: str, argument: Dict, files: List[Path]):
+        """
+        Here's the implementation from wksp0 that does this:
+
+            # 1. look up the workflow info for this workflow name
+            info = self.db.lookup_workflow_definition(workflow_name)
+
+            # 2. render the templates to files
+            contents = info.render_templates(argument, files)
+
+            # 3. serialize the templated files
+            temp_folder = self._prepare_files_for_condor(contents)
+
+            # 4. execute condor and get the log file
+            log_file = self._execute_prepared(temp_folder)
+
+            # 5. start reading the logs
+            return HTCondorWorkflowEventStream(log_file)
+
+        Of these, step 5 is going to have to be the most different, because
+        that's launching the new CLI wf_monitor program that Nathan wrote.
+        The other steps are more similar than different, though figuring out
+        where to put the temp files will be interesting.
+        """
+        raise NotImplementedError
+
+    @staticmethod
+    def _prepare_files_for_condor(files: Dict[str, str]) -> Path:
+        """
+        Place the files for Condor into a new temp directory and returns the directory.
+
+        :param files:  a dictionary of filename -> content
+        :return:       a Path
+        """
+        # 1. create a temporary directory
+        temp_folder = Path(mkdtemp(dir=str(Path.home() / "tmp")))
+
+        # 2. spool each of the temp files to it
+        for name, content in files.items():
+            (temp_folder / name).write_text(content)
+
+        # 3. make any scripts in there executable
+        for file in temp_folder.glob('*.sh'):
+            file.chmod(file.stat().st_mode | stat.S_IEXEC)
+
+        # finished, return folder
+        return temp_folder
+
+    @staticmethod
+    def _execute_prepared(folder: Path) -> Path:
+        """
+        Execute HTCondor using the named folder as the source of the files.
+
+        :param folder:  the path to the folder to execute
+        :return:        the path to the log file
+        """
+        print(f'executing on folder {folder}')
+
+        # some file in here should end in .dag; that file is our dagman input
+        dagman = list(folder.glob('*.dag'))[0]
+
+        # ensure the log file exists
+        logfile = folder / 'condor.log'
+        logfile.touch()
+
+        # submit
+        subprocess.run(['condor_submit_dag', str(dagman)], cwd=str(folder.absolute()))
+
+        # return the logfile
+        return logfile
 
 
-class DBWorkflowInfo(WorkflowInfo):
+class WorkflowInfo(WorkflowInfoIF):
     """
     Implements WorkflowInfo, backed by a relational database, using SQL Alchemy.
     """
@@ -188,7 +268,7 @@ def lookup_file(request):
 
 
 @view_defaults(route_name='workflows', renderer='json')
-class WorkflowService:
+class WorkflowRestService:
     """
     Top-level service for workflow requests.
 
@@ -215,8 +295,10 @@ class WorkflowService:
         Audience: front-end and CLI
         :return:
         """
-        WORKFLOWS.append(self.request.json_body)
-        return True
+        # all we should have to do here is take the WorkflowRequest from the context and
+        # hand it to WorkflowInfo to save it, but we're still conflating
+        # workflows and workflow requests right now
+        raise NotImplementedError
 
     @view_config(request_method='GET', route_name='workflow')
     def get_workflow(self):
@@ -240,7 +322,7 @@ class WorkflowService:
 
 
 @view_defaults(route_name='workflow_files', renderer='json')
-class WorkflowFilesService:
+class WorkflowFilesRestService:
     """
     Services for the user-submitted files attached to workflows.
     """
@@ -351,13 +433,15 @@ def main(global_config, **settings):
         session_factory = get_session_factory(get_engine())
         config.registry['dbsession_factory'] = session_factory
 
-        # make request.dbsession available for use in Pyramid
+        # make workflow_info available for use in Pyramid
         config.add_request_method(
             # r.tm is the transaction manager used by pyramid_tm
-            lambda request: DBWorkflowInfo(get_tm_session(session_factory, request.tm)),
-            'info',
+            lambda request: WorkflowInfo(get_tm_session(session_factory, request.tm)),
+            'workflow_info',
             reify=True
         )
+        # make workflow_service available for use in Pyramid
+        config.add_request_method(lambda r: WorkflowService(r.workflow_info), 'workflow_service', reify=True)
 
         config.add_route('workflows', '/workflows')
         config.add_route('workflow', '/workflows/{id}', factory=lookup_workflow)