Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/pythonbuild.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ jobs:
- name: Install dependencies
run: |
make setup
pip install git+https://github.com/flyteorg/flyteidl@65ce204c166b29ce5457c94b362cd9c8b3a3964e
pip freeze
- name: Test with coverage
run: |
Expand Down Expand Up @@ -152,6 +153,7 @@ jobs:
pip install -r requirements.txt
if [ -f dev-requirements.txt ]; then pip install -r dev-requirements.txt; fi
pip install -U https://github.com/flyteorg/flytekit/archive/${{ github.sha }}.zip#egg=flytekit
pip install git+https://github.com/flyteorg/flyteidl@65ce204c166b29ce5457c94b362cd9c8b3a3964e
pip freeze
- name: Test with coverage
run: |
Expand Down Expand Up @@ -182,6 +184,7 @@ jobs:
run: |
python -m pip install --upgrade pip
pip install -r dev-requirements.in
pip install git+https://github.com/flyteorg/flyteidl@65ce204c166b29ce5457c94b362cd9c8b3a3964e
- name: Lint
run: |
make lint
Expand All @@ -203,6 +206,7 @@ jobs:
run: |
python -m pip install --upgrade pip==21.2.4 setuptools wheel
pip install -r doc-requirements.txt
pip install git+https://github.com/flyteorg/flyteidl@65ce204c166b29ce5457c94b362cd9c8b3a3964e
- name: Build the documentation
run: |
# TODO: Remove after buf migration is done and packages updated
Expand Down
12 changes: 6 additions & 6 deletions .github/workflows/pythonpublish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ jobs:
cache-from: type=gha
cache-to: type=gha,mode=max

build-and-push-external-plugin-service-images:
build-and-push-flyteagent-images:
runs-on: ubuntu-latest
needs: deploy
steps:
Expand All @@ -161,12 +161,12 @@ jobs:
registry: ghcr.io
username: "${{ secrets.FLYTE_BOT_USERNAME }}"
password: "${{ secrets.FLYTE_BOT_PAT }}"
- name: Prepare External Plugin Service Image Names
id: external-plugin-service-names
- name: Prepare Fylte Agent Image Names
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Fylte

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, thanks

id: flyteagent-names
uses: docker/metadata-action@v3
with:
images: |
ghcr.io/${{ github.repository_owner }}/external-plugin-service
ghcr.io/${{ github.repository_owner }}/flyteagent
tags: |
latest
${{ github.sha }}
Expand All @@ -177,10 +177,10 @@ jobs:
context: "."
platforms: linux/arm64, linux/amd64
push: ${{ github.event_name == 'release' }}
tags: ${{ steps.external-plugin-service-names.outputs.tags }}
tags: ${{ steps.flyteagent-names.outputs.tags }}
build-args: |
VERSION=${{ needs.deploy.outputs.version }}
file: ./Dockerfile.external-plugin-service
file: ./Dockerfile.agent
cache-from: type=gha
cache-to: type=gha,mode=max

Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion doc-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ flask==2.2.3
# via mlflow
flatbuffers==23.1.21
# via tensorflow
flyteidl==1.5.6
flyteidl==1.5.9
# via flytekit
fonttools==4.38.0
# via matplotlib
Expand Down
6 changes: 3 additions & 3 deletions flytekit/clis/sdk_in_container/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import click
import grpc
from flyteidl.service.external_plugin_service_pb2_grpc import add_ExternalPluginServiceServicer_to_server
from flyteidl.service.agent_pb2_grpc import add_AgentServiceServicer_to_server

from flytekit.extend.backend.external_plugin_service import BackendPluginServer
from flytekit.extend.backend.agent_service import AgentService

_serve_help = """Start a grpc server for the external plugin service."""

Expand Down Expand Up @@ -39,7 +39,7 @@ def serve(_: click.Context, port, worker, timeout):
"""
click.secho("Starting the external plugin service...", fg="blue")
server = grpc.server(futures.ThreadPoolExecutor(max_workers=worker))
add_ExternalPluginServiceServicer_to_server(BackendPluginServer(), server)
add_AgentServiceServicer_to_server(AgentService(), server)

server.add_insecure_port(f"[::]:{port}")
server.start()
Expand Down
53 changes: 53 additions & 0 deletions flytekit/extend/backend/agent_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import grpc
from flyteidl.admin.agent_pb2 import (
PERMANENT_FAILURE,
CreateTaskRequest,
CreateTaskResponse,
DeleteTaskRequest,
DeleteTaskResponse,
GetTaskRequest,
GetTaskResponse,
)
from flyteidl.service.agent_pb2_grpc import AgentServiceServicer

from flytekit import logger
from flytekit.extend.backend.base_agent import AgentRegistry
from flytekit.models.literals import LiteralMap
from flytekit.models.task import TaskTemplate


class AgentService(AgentServiceServicer):
def CreateTask(self, request: CreateTaskRequest, context: grpc.ServicerContext) -> CreateTaskResponse:
try:
tmp = TaskTemplate.from_flyte_idl(request.template)
inputs = LiteralMap.from_flyte_idl(request.inputs) if request.inputs else None
agent = AgentRegistry.get_agent(context, tmp.type)
if agent is None:
return CreateTaskResponse()
return agent.create(context=context, inputs=inputs, output_prefix=request.output_prefix, task_template=tmp)
except Exception as e:
logger.error(f"failed to create task with error {e}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to create task with error {e}")

def GetTask(self, request: GetTaskRequest, context: grpc.ServicerContext) -> GetTaskResponse:
try:
agent = AgentRegistry.get_agent(context, request.task_type)
if agent is None:
return GetTaskResponse(state=PERMANENT_FAILURE)
return agent.get(context=context, job_id=request.job_id)
except Exception as e:
logger.error(f"failed to get task with error {e}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to get task with error {e}")

def DeleteTask(self, request: DeleteTaskRequest, context: grpc.ServicerContext) -> DeleteTaskResponse:
try:
agent = AgentRegistry.get_agent(context, request.task_type)
if agent is None:
return DeleteTaskResponse()
return agent.delete(context=context, job_id=request.job_id)
except Exception as e:
logger.error(f"failed to delete task with error {e}")
context.set_code(grpc.StatusCode.INTERNAL)
context.set_details(f"failed to delete task with error {e}")
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,30 @@
from abc import ABC, abstractmethod

import grpc
from flyteidl.core.tasks_pb2 import TaskTemplate
from flyteidl.service.external_plugin_service_pb2 import (
from flyteidl.admin.agent_pb2 import (
RETRYABLE_FAILURE,
RUNNING,
SUCCEEDED,
CreateTaskResponse,
DeleteTaskResponse,
GetTaskResponse,
State,
TaskCreateResponse,
TaskDeleteResponse,
TaskGetResponse,
)
from flyteidl.core.tasks_pb2 import TaskTemplate

from flytekit import logger
from flytekit.models.literals import LiteralMap


class BackendPluginBase(ABC):
class AgentBase(ABC):
"""
This is the base class for all backend plugins. It defines the interface that all plugins must implement.
The external plugins service will be run either locally or in a pod, and will be responsible for
invoking backend plugins. The propeller will communicate with the external plugins service
This is the base class for all agents. It defines the interface that all agents must implement.
The agent service will be run either locally or in a pod, and will be responsible for
invoking agents. The propeller will communicate with the agent service
to create tasks, get the status of tasks, and delete tasks.

All the backend plugins should be registered in the BackendPluginRegistry. External plugins service
will look up the plugin based on the task type. Every task type can only have one plugin.
All the agents should be registered in the AgentRegistry. Agent Service
will look up the agent based on the task type. Every task type can only have one agent.
"""

def __init__(self, task_type: str):
Expand All @@ -34,7 +34,7 @@ def __init__(self, task_type: str):
@property
def task_type(self) -> str:
"""
task_type is the name of the task type that this plugin supports.
task_type is the name of the task type that this agent supports.
"""
return self._task_type

Expand All @@ -45,14 +45,14 @@ def create(
output_prefix: str,
task_template: TaskTemplate,
inputs: typing.Optional[LiteralMap] = None,
) -> TaskCreateResponse:
) -> CreateTaskResponse:
"""
Return a Unique ID for the task that was created. It should return error code if the task creation failed.
"""
pass

@abstractmethod
def get(self, context: grpc.ServicerContext, job_id: str) -> TaskGetResponse:
def get(self, context: grpc.ServicerContext, job_id: str) -> GetTaskResponse:
"""
Return the status of the task, and return the outputs in some cases. For example, bigquery job
can't write the structured dataset to the output location, so it returns the output literals to the propeller,
Expand All @@ -61,41 +61,41 @@ def get(self, context: grpc.ServicerContext, job_id: str) -> TaskGetResponse:
pass

@abstractmethod
def delete(self, context: grpc.ServicerContext, job_id: str) -> TaskDeleteResponse:
def delete(self, context: grpc.ServicerContext, job_id: str) -> DeleteTaskResponse:
"""
Delete the task. This call should be idempotent.
"""
pass


class BackendPluginRegistry(object):
class AgentRegistry(object):
"""
This is the registry for all backend plugins. The external plugins service will look up the plugin
This is the registry for all agents. The agent service will look up the agent
based on the task type.
"""

_REGISTRY: typing.Dict[str, BackendPluginBase] = {}
_REGISTRY: typing.Dict[str, AgentBase] = {}

@staticmethod
def register(plugin: BackendPluginBase):
if plugin.task_type in BackendPluginRegistry._REGISTRY:
raise ValueError(f"Duplicate plugin for task type {plugin.task_type}")
BackendPluginRegistry._REGISTRY[plugin.task_type] = plugin
logger.info(f"Registering backend plugin for task type {plugin.task_type}")
def register(agent: AgentBase):
if agent.task_type in AgentRegistry._REGISTRY:
raise ValueError(f"Duplicate agent for task type {agent.task_type}")
AgentRegistry._REGISTRY[agent.task_type] = agent
logger.info(f"Registering an agent for task type {agent.task_type}")

@staticmethod
def get_plugin(context: grpc.ServicerContext, task_type: str) -> typing.Optional[BackendPluginBase]:
if task_type not in BackendPluginRegistry._REGISTRY:
logger.error(f"Cannot find backend plugin for task type [{task_type}]")
def get_agent(context: grpc.ServicerContext, task_type: str) -> typing.Optional[AgentBase]:
if task_type not in AgentRegistry._REGISTRY:
logger.error(f"Cannot find agent for task type [{task_type}]")
context.set_code(grpc.StatusCode.NOT_FOUND)
context.set_details(f"Cannot find backend plugin for task type [{task_type}]")
context.set_details(f"Cannot find the agent for task type [{task_type}]")
return None
return BackendPluginRegistry._REGISTRY[task_type]
return AgentRegistry._REGISTRY[task_type]


def convert_to_flyte_state(state: str) -> State:
"""
Convert the state from the backend plugin to the state in flyte.
Convert the state from the agent to the state in flyte.
"""
state = state.lower()
if state in ["failed"]:
Expand Down
53 changes: 0 additions & 53 deletions flytekit/extend/backend/external_plugin_service.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

BigQueryConfig
BigQueryTask
BigQueryAgent
"""

from .backend_plugin import BigQueryPlugin
from .agent import BigQueryAgent
from .task import BigQueryConfig, BigQueryTask
Loading