Skip to content

Commit

Permalink
Support Acto to run with user provided Kubernetes cluster
Browse files Browse the repository at this point in the history
Signed-off-by: Tyler Gu <[email protected]>
  • Loading branch information
tylergu committed Jan 17, 2025
1 parent 572fc01 commit d948f1f
Show file tree
Hide file tree
Showing 11 changed files with 289 additions and 62 deletions.
20 changes: 19 additions & 1 deletion acto/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from acto.engine import Acto, apply_testcase
from acto.input.input import DeterministicInputModel
from acto.kubernetes_engine import base, kind, provided

Check warning on line 13 in acto/__main__.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 13
from acto.lib.operator_config import OperatorConfig
from acto.post_process.post_diff_test import PostDiffTest
from acto.utils.error_handler import handle_excepthook, thread_excepthook
Expand Down Expand Up @@ -133,11 +134,28 @@

apply_testcase_f = apply_testcase

kubernetes_engine: base.KubernetesEngine
if config.kubernetes_engine.self_provided:
kubernetes_engine = provided.ProvidedKubernetesEngine(
acto_namespace=0,
feature_gates=config.kubernetes_engine.feature_gates,
num_nodes=config.num_nodes,
version=config.kubernetes_version,
provided=config.kubernetes_engine.provided,
)
else:
kubernetes_engine = kind.Kind(

Check warning on line 147 in acto/__main__.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 137-147
acto_namespace=0,
feature_gates=config.kubernetes_engine.feature_gates,
num_nodes=config.num_nodes,
version=config.kubernetes_version,
)

start_time = datetime.now()
acto = Acto(
workdir_path=args.workdir_path,
operator_config=config,
cluster_runtime="KIND",
kubernetes_engine=kubernetes_engine,
context_file=context_cache,
helper_crd=args.helper_crd,
num_workers=args.num_workers,
Expand Down
4 changes: 2 additions & 2 deletions acto/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import random
import re
import string
from typing import Any, Sequence, Tuple, TypeAlias, Union
from typing import Any, Optional, Sequence, Tuple, TypeAlias, Union

import deepdiff.model as deepdiff_model
import kubernetes
Expand Down Expand Up @@ -391,7 +391,7 @@ def translate_op(input_op: str):


def kubernetes_client(
kubeconfig: str, context_name: str
kubeconfig: str, context_name: Optional[str]
) -> kubernetes.client.ApiClient:
"""Create a kubernetes client from kubeconfig and context name"""
return kubernetes.config.kube_config.new_client_from_config(
Expand Down
43 changes: 16 additions & 27 deletions acto/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from acto.input.testplan import TestGroup
from acto.input.value_with_schema import ValueWithSchema, attach_schema_to_value
from acto.kubectl_client import KubectlClient
from acto.kubernetes_engine import base, kind
from acto.kubernetes_engine import base
from acto.lib.operator_config import OperatorConfig
from acto.oracle_handle import OracleHandle
from acto.result import (
Expand Down Expand Up @@ -149,12 +149,20 @@ def check_state_equality(
prev_pods = prev_system_state["pod"]

for k, v in curr_pods.items():
if "owner_reference" in v["metadata"] and v["metadata"]["owner_reference"] is not None and ["owner_references"][0]["kind"] == "Job":
if (
"owner_reference" in v["metadata"]
and v["metadata"]["owner_reference"] is not None
and v["metadata"]["owner_references"][0]["kind"] == "Job"
):
continue
curr_system_state[k] = v

for k, v in prev_pods.items():
if "owner_reference" in v["metadata"] and v["metadata"]["owner_reference"] is not None and ["owner_references"][0]["kind"] == "Job":
if (

Check warning on line 161 in acto/engine.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 152-161
"owner_reference" in v["metadata"]
and v["metadata"]["owner_reference"] is not None
and v["metadata"]["owner_references"][0]["kind"] == "Job"
):
continue
prev_system_state[k] = v

Expand Down Expand Up @@ -255,7 +263,7 @@ def __init__(
runner_t: type,
checker_t: type,
wait_time: int,
custom_on_init: Optional[Callable],
custom_on_init: Optional[list[Callable]],
custom_checker: Optional[type[CheckerInterface]],
workdir: str,
cluster: base.KubernetesEngine,
Expand Down Expand Up @@ -805,7 +813,7 @@ def __init__(
self,
workdir_path: str,
operator_config: OperatorConfig,
cluster_runtime: str,
kubernetes_engine: base.KubernetesEngine,
context_file: str,
helper_crd: Optional[str],
num_workers: int,
Expand Down Expand Up @@ -833,26 +841,7 @@ def __init__(

deploy = Deploy(operator_config.deploy)

if cluster_runtime == "KIND":
cluster = kind.Kind(
acto_namespace=acto_namespace,
feature_gates=operator_config.kubernetes_engine.feature_gates,
num_nodes=operator_config.num_nodes,
version=operator_config.kubernetes_version,
)
else:
logger.warning(
"Cluster Runtime %s is not supported, defaulted to use kind",
cluster_runtime,
)
cluster = kind.Kind(
acto_namespace=acto_namespace,
feature_gates=operator_config.kubernetes_engine.feature_gates,
num_nodes=operator_config.num_nodes,
version=operator_config.kubernetes_version,
)

self.cluster = cluster
self.cluster = kubernetes_engine
self.deploy = deploy
self.operator_config = operator_config
self.crd_name = operator_config.crd_name
Expand Down Expand Up @@ -889,7 +878,7 @@ def __init__(
self.sequence_base = 0

self.custom_oracle: Optional[type[CheckerInterface]] = None
self.custom_on_init: Optional[Callable] = None
self.custom_on_init: Optional[list[Callable]] = None
if operator_config.custom_oracle is not None:
module = importlib.import_module(operator_config.custom_oracle)
if hasattr(module, "CUSTOM_CHECKER") and issubclass(
Expand Down
11 changes: 8 additions & 3 deletions acto/kubernetes_engine/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import kubernetes

from acto.constant import CONST
from acto.lib.operator_config import SelfProvidedKubernetesConfig
from acto.utils import get_thread_logger

KubernetesEnginePostHookType = Callable[[kubernetes.client.ApiClient], None]
Expand All @@ -22,13 +23,17 @@ def __init__(
feature_gates: Optional[dict[str, bool]] = None,
num_nodes=1,
version="",
provided: Optional[SelfProvidedKubernetesConfig] = None,
) -> None:
"""Constructor for KubernetesEngine
Args:
acto_namespace: the namespace of the acto
posthooks: a list of posthooks to be executed after the cluster is created
feature_gates: a list of feature gates to be enabled
feature_gates: a list of Kubernetes feature gates to be enabled
num_nodes: the number of nodes in the cluster
version: the version of Kubernetes
provided: the configuration for a self-provided Kubernetes engine
"""

@abstractmethod
Expand Down Expand Up @@ -91,10 +96,10 @@ def restart_cluster(self, name: str, kubeconfig: str):
continue
break

def get_node_list(self, name: str):
def get_node_list(self, name: str) -> list[str]:
"""Fetch the name of worker nodes inside a cluster
Args:
1. name: name of the cluster name
name: name of the cluster name
"""
_ = get_thread_logger(with_prefix=False)

Expand Down
8 changes: 4 additions & 4 deletions acto/kubernetes_engine/kind.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import os
import subprocess
import time
from typing import Any, Dict, List, Optional
from typing import Any, Optional

import kubernetes
import yaml
Expand All @@ -19,8 +19,8 @@ class Kind(base.KubernetesEngine):
def __init__(
self,
acto_namespace: int,
posthooks: List[base.KubernetesEnginePostHookType] = None,
feature_gates: Dict[str, bool] = None,
posthooks: Optional[list[base.KubernetesEnginePostHookType]] = None,
feature_gates: Optional[dict[str, bool]] = None,
num_nodes=1,
version: Optional[str] = None,
):
Expand Down Expand Up @@ -140,7 +140,7 @@ def create_cluster(self, name: str, kubeconfig: str):

if self._posthooks:
for posthook in self._posthooks:
posthook(apiclient=apiclient)
posthook(apiclient)

Check warning on line 143 in acto/kubernetes_engine/kind.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 143

def load_images(self, images_archive_path: str, name: str):
logging.info("Loading preload images")
Expand Down
113 changes: 113 additions & 0 deletions acto/kubernetes_engine/provided.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import logging
import subprocess
from typing import Optional

import kubernetes

from acto.common import kubernetes_client, print_event
from acto.constant import CONST

from . import base


class ProvidedKubernetesEngine(base.KubernetesEngine):
"""KubernetesEngine for user-provided k8s cluster
Configuration for user-provided k8s cluster is very limited,
as it is assumed that the user has already set up the cluster.
Everything needs to be deployed in the ACTO_NAMESPACE to provide the
necessary isolation.
"""

def __init__(
self,
acto_namespace: int,
posthooks: Optional[list[base.KubernetesEnginePostHookType]] = None,
feature_gates: Optional[dict[str, bool]] = None,
num_nodes: int = 1,
version: Optional[str] = None,
provided: Optional[base.SelfProvidedKubernetesConfig] = None,
):
self._posthooks = posthooks

if feature_gates:
logging.error("Feature gates are not supported in provided k8s")

Check warning on line 34 in acto/kubernetes_engine/provided.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 34

if num_nodes != 1:
logging.error("num_nodes is not supported in provided k8s")

Check warning on line 37 in acto/kubernetes_engine/provided.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 37

if version:
logging.error("version is not supported in provided k8s")

Check warning on line 40 in acto/kubernetes_engine/provided.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 40

if provided is None:
raise ValueError("Missing configuration for provided k8s")

Check warning on line 43 in acto/kubernetes_engine/provided.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 43
self._kube_config = provided.kube_config
self._kube_context = provided.kube_context

def get_context_name(self, cluster_name: str) -> str:
"""Returns the kubecontext based on the cluster name
KIND always adds `kind` before the cluster name
"""
return self._kube_context

Check warning on line 51 in acto/kubernetes_engine/provided.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on line 51

def create_cluster(self, name: str, kubeconfig: str):
"""Does nothing as the cluster is already created
Args:
name: name of the cluster
config: path of the config file for cluster
version: k8s version
"""
print_event("Connecting to a user-provided Kubernetes cluster...")

try:
kubernetes.config.load_kube_config(
config_file=self._kube_config, context=self._kube_context
)
apiclient = kubernetes_client(self._kube_config, self._kube_context)
except Exception as e:
logging.debug("Incorrect kube config file:")
with open(self._kube_config, encoding="utf-8") as f:
logging.debug(f.read())
raise e

Check warning on line 71 in acto/kubernetes_engine/provided.py

View workflow job for this annotation

GitHub Actions / coverage-report

Missing coverage

Missing coverage on lines 67-71

if self._posthooks:
for posthook in self._posthooks:
posthook(apiclient)

def load_images(self, images_archive_path: str, name: str):
logging.info("Loading preload images")
cmd = ["kind", "load", "image-archive"]
if images_archive_path is None:
logging.warning(
"No image to preload, we at least should have operator image"
)

if name is not None:
cmd.extend(["--name", name])
else:
logging.error("Missing cluster name for kind load")

p = subprocess.run(cmd + [images_archive_path], check=False)
if p.returncode != 0:
logging.error("Failed to preload images archive")

def delete_cluster(self, name: str, kubeconfig: str):
"""Cluster deletion via deleting the acto-namespace
Args:
name: name of the cluster
kubeconfig: path of the config file for cluster
kubecontext: context of the cluster
"""
logging.info("Deleting cluster %s", name)
apiclient = kubernetes_client(self._kube_config, self._kube_context)
core_v1 = kubernetes.client.CoreV1Api(apiclient)
core_v1.delete_namespace(
CONST.ACTO_NAMESPACE, propagation_policy="Foreground"
)

def get_node_list(self, name: str) -> list[str]:
"""We don't have a way to get the node list for a user-provided cluster
Args:
Name of the cluster
"""
return []
39 changes: 38 additions & 1 deletion acto/lib/operator_config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Optional
from typing import Any, Optional

import pydantic
from typing_extensions import Self
Expand Down Expand Up @@ -134,12 +134,49 @@ class AnalysisConfig(pydantic.BaseModel, extra="forbid"):
)


class SelfProvidedKubernetesConfig(pydantic.BaseModel, extra="forbid"):
"""Configuration for self-provided Kubernetes engine"""

kube_config: str = pydantic.Field(
description="Path to the kubeconfig file for the Kubernetes cluster"
)
kube_context: str = pydantic.Field(
description="Context name for the Kubernetes cluster"
)


class KubernetesEngineConfig(pydantic.BaseModel, extra="forbid"):
"""Configuration for Kubernetes"""

num_nodes: int = pydantic.Field(
description="Number of workers in the Kubernetes cluster", default=4
)
kubernetes_version: str = pydantic.Field(
default="v1.28.0", description="Kubernetes version"
)
feature_gates: dict[str, bool] = pydantic.Field(
description="Path to the feature gates file", default=None
)
self_provided: Optional[SelfProvidedKubernetesConfig] = pydantic.Field(
default=None,
description="Configuration for self-provided Kubernetes engine",
)

@pydantic.model_validator(mode="before")
@classmethod
def check_self_provided(cls, data: Any) -> Any:
"""Check if the self-provided Kubernetes engine is valid"""
if isinstance(data, dict) and "self_provided" in data:
if (
"num_nodes" in data
or "kubernetes_version" in data
or "feature_gates" in data
):
raise ValueError(
"num_nodes, kubernetes_version, and feature_gates "
+ "are not supported in self-provided Kubernetes engine"
)
return data


class OperatorConfig(pydantic.BaseModel, extra="forbid"):
Expand Down
Loading

0 comments on commit d948f1f

Please sign in to comment.