diff --git a/apps/web/src/app/workspaces/services/capability-launcher.service.ts b/apps/web/src/app/workspaces/services/capability-launcher.service.ts index 4b47559c65b1e66e4b93ff3763931a9b1de0354e..896ca534af4ef0e95f890a22689b9c8f146fcfdf 100644 --- a/apps/web/src/app/workspaces/services/capability-launcher.service.ts +++ b/apps/web/src/app/workspaces/services/capability-launcher.service.ts @@ -27,8 +27,7 @@ export class CapabilityLauncherService { this.endpoint + capabilityName + '/request/create'; - const requestParams = JSON.stringify({'parameters': parameters}); - return this.httpClient.post<CapabilityRequest>(url, requestParams); + return this.httpClient.post<CapabilityRequest>(url, JSON.stringify({'parameters': parameters})); } /** diff --git a/docs/swagger-schema.yaml b/docs/swagger-schema.yaml index e1a6d30ea0c93900d80f382b76b7d30edb90bd18..ecbc5e5aef8db263180c7fc6b8555bcfeb79f3f7 100644 --- a/docs/swagger-schema.yaml +++ b/docs/swagger-schema.yaml @@ -10,7 +10,9 @@ tags: - name: "capabilities" description: "Access capabilities and create requests" - name: "requests" - description: "Access to and submit capability requests" + description: "Access and submit capability requests" + - name: "workflows" + description: "Access and submit workflow requests" schemes: - "http" paths: @@ -235,6 +237,173 @@ paths: description: "Invalid name supplied" "404": description: "Capability not found" + /workflows: + get: + tags: + - "workflows" + summary: "View the list of workflows" + description: "" + operationId: "Workflows.list_workflows" + produces: + - "application/json" + responses: + "200": + description: "successful operation" + schema: + $ref: "#/definitions/WorkflowList" + /workflows/{name}: + get: + tags: + - "workflows" + summary: "View the named workflow" + description: "" + operationId: "Workflows.get_workflow" + parameters: + - name: "name" + in: "path" + description: "name of workflow to view" + required: true + type: "string" + consumes: + - "application/json" + produces: + - "application/json" + responses: + "404": + description: + "Workflow not found" + "200": + description: "successful operation" + schema: + $ref: "#/definitions/Workflow" + /workflows/{name}/create: + post: + tags: + - "workflows" + summary: "Create a workflow request" + description: "" + operationId: "WorkflowRequestRestService.create_workflow_request" + parameters: + - name: "name" + in: "path" + description: "name of workflow to create a request for" + required: true + type: "string" + - name: "body" + in: "body" + description: "Argument to the workflow" + schema: + type: "object" + consumes: + - "application/json" + produces: + - "application/json" + responses: + "404": + description: + "Workflow not found" + "200": + description: "successful operation" + schema: + $ref: "#/definitions/WorkflowRequest" + /workflows/{name}/requests/{request_id}: + get: + tags: + - "workflows" + summary: "View the workflow request with this ID" + description: "" + operationId: "WorkflowRequestRestService.get_workflow_request" + parameters: + - name: "name" + in: "path" + description: "name of workflow this request belongs to" + required: true + type: "string" + - name: "request_id" + in: "path" + description: "ID of the request" + type: "integer" + required: true + consumes: + - "application/json" + produces: + - "application/json" + responses: + "404": + description: + "Workflow request not found" + "200": + description: "successful operation" + schema: + $ref: "#/definitions/WorkflowRequest" + /workflows/{name}/requests/{request_id}/submit: + post: + tags: + - "workflows" + summary: "Submit the workflow request for execution" + description: "" + operationId: "WorkflowRequestRestService.submit_workflow" + parameters: + - name: "name" + in: "path" + description: "name of workflow this request belongs to" + required: true + type: "string" + - name: "request_id" + in: "path" + description: "ID of the request" + type: "integer" + required: true + consumes: + - "application/json" + produces: + - "application/json" + responses: + "404": + description: + "Workflow request not found" + "200": + description: "successful operation" + schema: + $ref: "#/definitions/WorkflowRequest" + + /workflows/{name}/requests/{request_id}/files/{filename}: + put: + tags: + - "workflows" + summary: "Add a file to this workflow request" + description: "" + operationId: "WorkflowFilesRestService.add_file" + parameters: + - name: "name" + in: "path" + description: "name of workflow the request belongs to" + required: true + type: "string" + - name: "request_id" + in: "path" + description: "ID of the request" + type: "integer" + required: true + - name: "filename" + in: "path" + description: "filename to add" + type: "string" + required: true + consumes: + - "application/octet-stream" + produces: + - "application/json" + responses: + "400": + description: "File already exists" + "404": + description: + "Workflow request not found" + "200": + description: "successful operation" + schema: + $ref: "#/definitions/WorkflowRequestFile" definitions: Capability: @@ -252,6 +421,39 @@ definitions: properties: body: type: "object" + WorkflowList: + type: "array" + items: + $ref: "#/definitions/Workflow" + Workflow: + type: "object" + properties: + name: + type: "string" + WorkflowRequest: + type: "object" + properties: + workflow_request_id: + type: "integer" + format: "int64" + workflow_name: + type: "string" + argument: + type: "object" + state: + type: "string" + results_dir: + type: "string" + WorkflowRequestFile: + type: "object" + properties: + workflow_request_id: + type: "integer" + format: "int64" + filename: + type: "string" + content: + type: "string" externalDocs: description: "More about the capability service" url: "http://builder.aoc.nrao.edu/ssa/architecture/workspaces" \ No newline at end of file diff --git a/schema/versions/25add553c317_convert_capability_request_argument_to_.py b/schema/versions/25add553c317_convert_capability_request_argument_to_.py new file mode 100644 index 0000000000000000000000000000000000000000..a071c3be6173c95a155d28e7213e95392b100d44 --- /dev/null +++ b/schema/versions/25add553c317_convert_capability_request_argument_to_.py @@ -0,0 +1,36 @@ +"""convert capability request argument to JSON + +Revision ID: 25add553c317 +Revises: e50583fc2a8e +Create Date: 2021-03-04 16:55:41.512834 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "25add553c317" +down_revision = "e50583fc2a8e" +branch_labels = None +depends_on = None + + +def upgrade(): + op.alter_column( + "capability_requests", + "parameters", + type_=sa.JSON, + postgresql_using="parameters::json", + ) + op.alter_column( + "capability_versions", + "parameters", + type_=sa.JSON, + postgresql_using="parameters::json", + ) + + +def downgrade(): + op.alter_column("capability_requests", "parameters", type_=sa.String) + op.alter_column("capability_versions", "parameters", type_=sa.String) diff --git a/schema/versions/50ff97fe0c2a_update_workflow_definition_for_test_.py b/schema/versions/50ff97fe0c2a_update_workflow_definition_for_test_.py new file mode 100644 index 0000000000000000000000000000000000000000..1cbd845c09bc1ca6154c17afcdc42d643397f0ce --- /dev/null +++ b/schema/versions/50ff97fe0c2a_update_workflow_definition_for_test_.py @@ -0,0 +1,39 @@ +"""update workflow definition for test_download + +Revision ID: 50ff97fe0c2a +Revises: 25add553c317 +Create Date: 2021-03-04 16:55:57.373412 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "50ff97fe0c2a" +down_revision = "25add553c317" +branch_labels = None +depends_on = None + + +def upgrade(): + op.execute( + "UPDATE capabilities " + "SET capability_steps = 'prepare-and-run-workflow test_download\nawait-workflow' " + "WHERE capability_name = 'test_download'" + ) + op.execute("DELETE FROM workflow_templates WHERE workflow_name = 'test_download'") + op.execute( + "INSERT INTO workflow_templates (workflow_name, filename, content) " + "VALUES ('test_download', 'test_download.condor', " + "E'executable = test_download.sh\narguments = {{product_locator}}" + "\nerror = test_download.err\nlog = test_download.log\n\n\nqueue')" + ) + op.execute( + "INSERT INTO workflow_templates (workflow_name, filename, content) VALUES " + "('test_download', 'test_download.sh', E'#!/bin/sh\n\ndatafetcher --product-locator $1\ndeliver -r .\n')" + ) + + +def downgrade(): + pass diff --git a/services/workflow/workflow/server.py b/services/workflow/workflow/server.py index 16d08fa97d0dc7da50b813452fb4e73c992b6d57..6898b1a4133bfa1437bd3b526c3aaf7e185c994b 100644 --- a/services/workflow/workflow/server.py +++ b/services/workflow/workflow/server.py @@ -27,11 +27,6 @@ def lookup_workflow(request): return request.info.lookup_workflow_definition(request.matchdict["name"]) -def lookup_workflow_json(request): - req = request.json_body - return request.info.lookup_workflow_definition(req["workflow"]) - - def lookup_request(request): return request.info.lookup_workflow_request(request.matchdict["request_id"]) @@ -100,9 +95,9 @@ class WorkflowRequestRestService: :return: """ - req = self.request.json_body - request = self.request.info.create_workflow_request(req["workflow"], req["args"]) - return request + return self.request.info.create_workflow_request( + self.request.context.workflow_name, self.request.json_body + ) @view_config(request_method="POST", route_name="submit_workflow_request") def submit_workflow(self): @@ -114,7 +109,7 @@ class WorkflowRequestRestService: """ print(f"Submitting workflow {self.request.context}") - return self.request.workflows.execute(self.request.context, self.request.json_body["files"]) + return self.request.workflows.execute(self.request.context) @view_defaults(route_name="workflow_request_files", renderer="json") @@ -133,7 +128,9 @@ class WorkflowFilesRestService: Audience: front-end and CLI """ - print(f"Adding file {self.request.matchdict['filename']} to {self.request.context}") + print( + f"Adding file {self.request.matchdict['filename']} to {self.request.context}" + ) file = self.request.info.save_file( request=self.request.context, filename=self.request.matchdict["filename"], @@ -233,7 +230,9 @@ def main(global_config, **settings): reify=True, ) # make workflow_service available for use in Pyramid - config.add_request_method(lambda r: WorkflowService(r.info), "workflows", reify=True) + config.add_request_method( + lambda r: WorkflowService(r.info), "workflows", reify=True + ) # GET /workflows <- list of workflows # GET /workflows/null <- info about the null workflow @@ -247,22 +246,22 @@ def main(global_config, **settings): # parameters are passed with json: {workflow:<name>, args: <args>} config.add_route( "create_workflow_request", - "/workflows/requests/create", - factory=lookup_workflow_json, + "/workflows/{name}/requests/create", + factory=lookup_workflow, ) config.add_route( "workflow_request", - "/workflows/requests/{request_id}", + "/workflows/{name}/requests/{request_id}", factory=lookup_request, ) config.add_route( "workflow_request_files", - "/workflows/requests/{request_id}/files", + "/workflows/{name}/requests/{request_id}/files", factory=lookup_request, ) config.add_route( "workflow_request_file", - "/workflows/requests/{request_id}/files/{filename}", + "/workflows/{name}/requests/{request_id}/files/{filename}", factory=lookup_request, ) @@ -270,7 +269,7 @@ def main(global_config, **settings): # Use empty braces for no additional files config.add_route( "submit_workflow_request", - "/workflows/requests/{request_id}/submit", + "/workflows/{name}/requests/{request_id}/submit", factory=lookup_request, ) config.include("pyramid_beaker") diff --git a/shared/workspaces/workspaces/capability/helpers.py b/shared/workspaces/workspaces/capability/helpers.py index a36412fd46f805b842edbd821c2b24bc486df1b1..f75bb435b6cff8c12d4864760c0a85dc3207005d 100644 --- a/shared/workspaces/workspaces/capability/helpers.py +++ b/shared/workspaces/workspaces/capability/helpers.py @@ -1,6 +1,7 @@ from __future__ import annotations import json +import logging import re from typing import Iterator, List, Optional @@ -14,6 +15,9 @@ from workspaces.capability.schema_interfaces import CapabilityExecutionIF from workspaces.capability.services.interfaces import CapabilityEngineIF +logger = logging.getLogger(__name__) + + class MalformedCapabilityStep(ValueError): pass @@ -176,6 +180,11 @@ class PrepareAndRunWorkflow(CapabilityStep): # DO NOT TAKE THIS OUT! Python will yell at you. parameters = json.dumps(parameters) workflow_args = json.loads(parameters) + logger.info( + "submitting workflow request with args %s (and type %s)", + workflow_args, + type(workflow_args), + ) engine.submit_workflow_request( execution.id, workflow_name, workflow_args, files ) diff --git a/shared/workspaces/workspaces/capability/schema.py b/shared/workspaces/workspaces/capability/schema.py index 896954985ea1ade4fb33badeaae535036de1c950..2013f8f18370d3ea7da62ebda69a52fbe61e8ff1 100644 --- a/shared/workspaces/workspaces/capability/schema.py +++ b/shared/workspaces/workspaces/capability/schema.py @@ -28,7 +28,9 @@ class CapabilityEvent: An event sent from the underlying workflow execution system. """ - def __init__(self, event_type: CapabilityEventType, request_id: int, execution_id: int): + def __init__( + self, event_type: CapabilityEventType, request_id: int, execution_id: int + ): self.event_type = event_type self.execution_id = execution_id @@ -106,7 +108,11 @@ class Capability(Base, CapabilityIF): for line in f.readlines(): step_type, step_value = line.strip().split(" ") - steps.append(CapabilityStep(CapabilityStepType.from_string(step_type), step_value)) + steps.append( + CapabilityStep( + CapabilityStepType.from_string(step_type), step_value + ) + ) return name, int(max_jobs), CapabilitySequence(steps) @@ -147,7 +153,7 @@ class CapabilityRequest(Base, CapabilityRequestIF): capability_name = sa.Column( "capability_name", sa.String, sa.ForeignKey("capabilities.capability_name") ) - parameters = sa.Column("parameters", sa.String) + parameters = sa.Column("parameters", sa.JSON) # FIXME: This needs to be changed to properly keep track of product locators. # future_products = sa.Column('future_products', sa.String) versions = relationship("CapabilityVersion", back_populates="request") @@ -181,7 +187,7 @@ class CapabilityVersion(Base, CapabilityVersionIF): primary_key=True, ) version_number = sa.Column("version_number", sa.Integer, primary_key=True) - parameters = sa.Column("parameters", sa.String) + parameters = sa.Column("parameters", sa.JSON) request = relationship(CapabilityRequest, back_populates="versions") executions = relationship("CapabilityExecution", back_populates="version") diff --git a/shared/workspaces/workspaces/capability/services/capability_engine.py b/shared/workspaces/workspaces/capability/services/capability_engine.py index 4aa361035f4c3f804dc5975b3390b955bf594625..8fd15f5f31d6069e219b985c34ff883a8f6db686 100644 --- a/shared/workspaces/workspaces/capability/services/capability_engine.py +++ b/shared/workspaces/workspaces/capability/services/capability_engine.py @@ -1,3 +1,4 @@ +import logging from typing import List from workspaces.capability.helpers import CapabilitySequence @@ -9,6 +10,8 @@ from workspaces.capability.services.interfaces import ( from workspaces.system.schema import AbstractFile from workspaces.workflow.services.interfaces import WorkflowServiceIF +logger = logging.getLogger(__name__) + class CapabilityEngine(CapabilityEngineIF): """ @@ -42,7 +45,11 @@ class CapabilityEngine(CapabilityEngineIF): cur_step.execute(self, self.execution) def submit_workflow_request( - self, execution_id: int, workflow_name: str, workflow_args: dict, files: List[AbstractFile] + self, + execution_id: int, + workflow_name: str, + workflow_args: dict, + files: List[AbstractFile], ): """ Submit a workflow request to the workflow service @@ -52,11 +59,25 @@ class CapabilityEngine(CapabilityEngineIF): :param workflow_args: :param files: """ - workflow_request = self.workflow_service.create_workflow_request(workflow=workflow_name, argument=workflow_args) + logger.info( + "creating workflow request with name %s, args %s (and arg type %s)", + workflow_name, + workflow_args, + type(workflow_args), + ) + workflow_request = self.workflow_service.create_workflow_request( + workflow=workflow_name, argument=workflow_args + ) + + logger.info( + "My workflow_request is %s, with type %s and workflow request ID %s", + workflow_request, + type(workflow_request), + workflow_request.workflow_request_id, + ) execution = self.capability_info.lookup_execution(execution_id) - execution.update_execution(workflow_request['workflow_request_id']) + execution.update_execution(workflow_request.workflow_request_id) self.capability_info.save_execution(execution) - self.workflow_service.execute(workflow_request, files) - + self.workflow_service.execute(workflow_request) diff --git a/shared/workspaces/workspaces/capability/services/capability_info.py b/shared/workspaces/workspaces/capability/services/capability_info.py index ef68432a2f32d29f621f3fb40d195793a412a36a..a3cbdb4ca6febd6ecc529dde2bc48bd5e2369ac5 100644 --- a/shared/workspaces/workspaces/capability/services/capability_info.py +++ b/shared/workspaces/workspaces/capability/services/capability_info.py @@ -73,7 +73,9 @@ class CapabilityInfo(CapabilityInfoIF): changes[Capability.enabled] = enabled if changes: - rows_changed = self.session.query(Capability).filter_by(name=name).update(changes) + rows_changed = ( + self.session.query(Capability).filter_by(name=name).update(changes) + ) else: # No changes were made because none were given # TODO: This should log a message saying as much, but logging is not implemented @@ -102,9 +104,9 @@ class CapabilityInfo(CapabilityInfoIF): request = CapabilityRequest( state=CapabilityRequestState.Ready.name, capability=capability, - parameters=str(parameters), + parameters=parameters, # a trick here is to ensure that we always have a first version, with the original parameters - versions=[CapabilityVersion(version_number=1, parameters=str(parameters))], + versions=[CapabilityVersion(version_number=1, parameters=parameters)], ) self.save_entity(request) return request @@ -136,7 +138,9 @@ class CapabilityInfo(CapabilityInfoIF): return self.session.query(CapabilityRequest).filter_by(id=request_id).first() def lookup_execution(self, execution_id: int) -> CapabilityExecution: - return self.session.query(CapabilityExecution).filter_by(id=execution_id).first() + return ( + self.session.query(CapabilityExecution).filter_by(id=execution_id).first() + ) def save_entity(self, entity: Base): """ @@ -171,7 +175,9 @@ class CapabilityInfo(CapabilityInfoIF): """ capability = self.lookup_capability(capability_name) return ( - self.session.query(CapabilityRequest).filter_by(capability_name=capability.name).all() + self.session.query(CapabilityRequest) + .filter_by(capability_name=capability.name) + .all() ) def save_execution(self, execution: CapabilityExecutionIF): @@ -202,7 +208,9 @@ class CapabilityInfo(CapabilityInfoIF): """ if self.can_delete_capability_request(request_id): self.delete_request_versions(request_id) - rows_changed = self.session.query(CapabilityRequest).filter_by(id=request_id).delete() + rows_changed = ( + self.session.query(CapabilityRequest).filter_by(id=request_id).delete() + ) self.session.flush() return rows_changed else: diff --git a/shared/workspaces/workspaces/workflow/json.py b/shared/workspaces/workspaces/workflow/json.py index 96761885521975c5e6bf1e8faa838855e2486de5..60b37b4dfc392af44fa9ec3086d0f600e6f6ffd2 100644 --- a/shared/workspaces/workspaces/workflow/json.py +++ b/shared/workspaces/workspaces/workflow/json.py @@ -1,7 +1,7 @@ from marshmallow import Schema, fields, post_load from workspaces.workflow.enum import WorkflowEventType -from workspaces.workflow.schema import WorkflowEvent +from workspaces.workflow.schema import WorkflowEvent, WorkflowRequest class WorkflowEventSchema(Schema): @@ -29,4 +29,18 @@ class WorkflowEventSchema(Schema): return WorkflowEvent(**data) +class WorkflowRequestSchema(Schema): + workflow_request_id = fields.Integer() + workflow_name = fields.String() + argument = fields.Raw() + state = fields.String() + results_dir = fields.String() + files = fields.Raw() + + @post_load + def make_request(self, data, **kwargs): + return WorkflowRequest(**data) + + workflow_event = WorkflowEventSchema() +workflow_request = WorkflowRequestSchema() diff --git a/shared/workspaces/workspaces/workflow/schema.py b/shared/workspaces/workspaces/workflow/schema.py index 2b9f25be6f8ec9e92cd2373703e76d25f375ae81..1eeb5eb59f18c9f9edd26fbc765b13c5dfb78a36 100644 --- a/shared/workspaces/workspaces/workflow/schema.py +++ b/shared/workspaces/workspaces/workflow/schema.py @@ -89,9 +89,7 @@ class Workflow(Base, WorkflowIF): def __repr__(self): return f"<Workflow workflow_name={self.workflow_name}>" - def render_templates( - self, argument: Dict, files: List[Path] = None - ) -> List[AbstractFile]: + def render_templates(self, argument: Dict) -> List[AbstractFile]: """ Render the templates associated with this workflow @@ -101,9 +99,18 @@ class Workflow(Base, WorkflowIF): """ rendered_files = [] - for f in files: - rendered_files.append(AbstractFile.from_path(f)) - + """ + + Original: + - All the user's input files are collected at submission time + - This method combines user input files with rendered template results + - They wind up mixed together in the list on this request + + New: + - User's input files are collected prior to submission + - They already exist in the list of files on this request + - At submission time, we only need to render the templates and append them to the request + """ for template in self.templates: rendered_files.append(template.render(argument)) @@ -215,6 +222,9 @@ class WorkflowRequestFile(Base, WorkflowRequestFileIF): A Workflow Request File is a file supplied by the user and attached to the request they have submitted. """ + def abstract_file(self): + return AbstractFile(self.filename, self.content) + __tablename__ = "workflow_request_files" workflow_request_id = sa.Column( "workflow_request_id", diff --git a/shared/workspaces/workspaces/workflow/schema_interfaces.py b/shared/workspaces/workspaces/workflow/schema_interfaces.py index 65aad69e94b641f1337c60c162905c739dfd041e..a0610814b97f2488ed9aceea0ebea840284316c6 100644 --- a/shared/workspaces/workspaces/workflow/schema_interfaces.py +++ b/shared/workspaces/workspaces/workflow/schema_interfaces.py @@ -1,6 +1,5 @@ from __future__ import annotations -from pathlib import Path from typing import Dict, List @@ -19,10 +18,8 @@ class WorkflowIF: templates: List[WorkflowTemplateIF] requests: List[WorkflowRequestIF] - def render_templates( - self, argument: Dict, files: List[Path] - ) -> List[WorkflowRequestFileIF]: - raise NotImplementedError + def render_templates(self, argument: Dict) -> List[WorkflowRequestFileIF]: + pass class WorkflowRequestIF: diff --git a/shared/workspaces/workspaces/workflow/services/interfaces.py b/shared/workspaces/workspaces/workflow/services/interfaces.py index 437009e946a6f86677b2ccafa6ed18a3360c7342..a12e992087034f7fd9c3f8476b82572511569508 100644 --- a/shared/workspaces/workspaces/workflow/services/interfaces.py +++ b/shared/workspaces/workspaces/workflow/services/interfaces.py @@ -16,18 +16,22 @@ class WorkflowServiceIF(ABC): """ @abstractmethod - def execute(self, request: WorkflowRequestIF, files: List[AbstractFile]): + def execute(self, request: WorkflowRequestIF): """ Execute this workflow against these files. :param request: - :param files: some extra files the workflow should consider :return: a stream of events from this workflow """ - raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}") + pass + @abstractmethod + def attach_file_to_request(self, request: WorkflowRequestIF, file: AbstractFile): + pass + + @abstractmethod def create_workflow_request(self, workflow, argument) -> WorkflowRequestIF: - raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}") + pass class WorkflowInfoIF(ABC): @@ -43,7 +47,7 @@ class WorkflowInfoIF(ABC): :param name: Workflow name :return: Workflow instance """ - raise NotImplementedError(f"{self.__class__.__name__}.{inspect.stack()[0][3]}") + pass @abstractmethod def lookup_workflow_request(self, request_id: int) -> WorkflowIF: @@ -53,7 +57,7 @@ class WorkflowInfoIF(ABC): :param request_id: Request ID :return Workflow instance """ - raise NotImplementedError + pass @abstractmethod def all_workflows(self) -> List[WorkflowIF]: @@ -62,7 +66,7 @@ class WorkflowInfoIF(ABC): :return all the workflows """ - raise NotImplementedError + pass @abstractmethod def create_workflow_request( @@ -74,7 +78,7 @@ class WorkflowInfoIF(ABC): :param argument: workflow arguments :return: new WorkflowRequest """ - raise NotImplementedError + pass def save_request(self, request: WorkflowRequestIF): """ @@ -82,7 +86,7 @@ class WorkflowInfoIF(ABC): :param request: request to save :return: the request id """ - raise NotImplementedError + pass def save_file( self, request: WorkflowRequestIF, filename: str, content: bytes @@ -94,4 +98,4 @@ class WorkflowInfoIF(ABC): :param content: contents of new file in bytes :return: """ - raise NotImplementedError + pass diff --git a/shared/workspaces/workspaces/workflow/services/workflow_service.py b/shared/workspaces/workspaces/workflow/services/workflow_service.py index 86b4eb01a51809340221a6f7915925580aa0c8b3..2edc5c8c03202a4e29b56cfeda775bea7715adbe 100644 --- a/shared/workspaces/workspaces/workflow/services/workflow_service.py +++ b/shared/workspaces/workspaces/workflow/services/workflow_service.py @@ -19,33 +19,58 @@ from workspaces.workflow.schema import ( ) from workspaces.workflow.schema_interfaces import WorkflowIF, WorkflowRequestIF from workspaces.workflow.services.interfaces import WorkflowInfoIF, WorkflowServiceIF +import workspaces.workflow.json as json + +import logging + +logger = logging.getLogger(__name__) class WorkflowServiceRESTClient(WorkflowServiceIF): def __init__(self): - self.url = CapoConfig().settings("edu.nrao.archive.workspaces.WorkflowSettings").serviceUrl + self.url = ( + CapoConfig() + .settings("edu.nrao.archive.workspaces.WorkflowSettings") + .serviceUrl + ) - def execute(self, request: WorkflowRequestIF, files: List[AbstractFile]): - # step 1: if necessary, pass the files up for this request - for file in files: - requests.post( - f"{self.url}/workflows/requests/{request['workflow_request_id']}/files/{file.filename}", - {"files": file.content}, - ) + def execute(self, request: WorkflowRequestIF): + result = requests.post( + f"{self.url}/workflows/{request.workflow_name}" + f"/requests/{request.workflow_request_id}/submit" + ) + logger.info( + "Got result %s with type %s and body %s", + result, + type(result), + result.content, + ) + return json.workflow_request.load(result.json()) - # step 2: execute the request + def attach_file_to_request(self, request: WorkflowRequestIF, file: AbstractFile): requests.post( - f"{self.url}/workflows/requests/{request['workflow_request_id']}/submit", - json={"files": files}, - ).json() + f"{self.url}/workflows/{request.workflow_name}" + f"/requests/{request.workflow_request_id}" + f"/files/{file.filename}", + data=file.content, + ) def create_workflow_request( self, workflow: Union[str, WorkflowIF], argument: Dict ) -> WorkflowRequestIF: - return requests.post( - f"{self.url}/workflows/requests/create", json={"workflow": workflow, "args": argument} + # 1. Handle the type ambiguity with the workflow argument + workflow_name = ( + workflow if type(workflow) == type(str()) else workflow.workflow_name + ) + + # 2. Make the request + result = requests.post( + f"{self.url}/workflows/{workflow_name}/requests/create", + json=argument, ).json() + return json.workflow_request.load(result) + class WorkflowService(WorkflowServiceIF): """ @@ -67,7 +92,17 @@ class WorkflowService(WorkflowServiceIF): ) self.listener.start() - def execute(self, request: WorkflowRequest, files: List[AbstractFile] = None): + def attach_file_to_request(self, request: WorkflowRequestIF, file: AbstractFile): + self.info.save_file( + request=request, filename=file.filename, content=file.content + ) + + def create_workflow_request(self, workflow, argument) -> WorkflowRequestIF: + return self.info.save_request( + WorkflowRequest(workflow_name=workflow, argument=argument) + ) + + def execute(self, request: WorkflowRequest): """ Execute a workflow per the supplied parameters. @@ -79,14 +114,21 @@ class WorkflowService(WorkflowServiceIF): definition = self.info.lookup_workflow_definition(request.workflow_name) # 2. render templates to files, returns list of rendered files - workflow_files = definition.render_templates(request.argument, files) - for file in workflow_files: - self.info.save_file(request=request, filename=file.filename, content=file.content) + templated_files = definition.render_templates(request.argument) + for file in templated_files: + self.info.save_file( + request=request, filename=file.filename, content=file.content + ) + + # 3. Combine the + workflow_files = templated_files + request.files # 4. prepare files for condor execution if not request.results_dir: # Create temp results directory if the request doesn't already have one (from a previous execution attempt) - request.results_dir = str(self._prepare_files_for_condor(workflow_files).absolute()) + request.results_dir = str( + self._prepare_files_for_condor(workflow_files).absolute() + ) self.info.save_request(request) # 5. execute condor and retrieve log file @@ -98,6 +140,8 @@ class WorkflowService(WorkflowServiceIF): cwd=request.results_dir, ) + return request + @staticmethod def _prepare_files_for_condor(files: List[WorkflowRequestFile]) -> Path: """ @@ -135,8 +179,8 @@ class WorkflowService(WorkflowServiceIF): # some file in here should end in .dag; that file is our dagman input # TODO: not currently using DAG files for vulture - dagman = list(folder.glob("*.dag"))[0] - print(f"dagman file {dagman} exists.") + # dagman = list(folder.glob("*.dag"))[0] + # print(f"dagman file {dagman} exists.") # need .condor file for vulture submission condor = list(folder.glob("*.condor"))[0] @@ -153,7 +197,9 @@ class WorkflowService(WorkflowServiceIF): # vulture is a workaround for testing locally without submitting to condor print("submitting to vulture...") - subprocess.run(["vulture", "job", "execute", str(condor)], cwd=str(folder.absolute())) + subprocess.run( + ["vulture", "job", "execute", str(condor)], cwd=str(folder.absolute()) + ) # return the logfile return logfile @@ -173,5 +219,7 @@ class WorkflowService(WorkflowServiceIF): else: status = WorkflowRequestState.Running.name - print(f"Updating state on workflow request {request.workflow_request_id} to {status}...") + print( + f"Updating state on workflow request {request.workflow_request_id} to {status}..." + ) request.update_status(status)