From efde0ad84c4422d849b71b42f1cb38ad69117f10 Mon Sep 17 00:00:00 2001 From: Karl Rister Date: Wed, 25 Mar 2026 17:03:54 -0500 Subject: [PATCH 1/2] feat: support multiple client/server engines per pod in kube endpoint Add an optional "pods" key to the kube endpoint run-file config that allows grouping multiple client/server engines into a single pod as separate containers. Each pod group can have a user-defined name or an auto-generated one. Engines not listed in any pod group automatically get their own solo pod, preserving backward compatibility. Key changes: - Add "pods" schema to kube.json with name, engines (role + ids) - Normalize and validate pod groups in normalize_endpoint_settings() - Validate pod-level setting consistency (cpu-partitioning, nodeSelector, hostNetwork, runtimeClassName, annotations, securityContext.pod) across engines sharing a pod - Refactor create_pod_crd() to accept a list of engines and build one container per engine with per-engine settings - Refactor create_cs_pods() to iterate pod groups instead of individual engines - Update verify_pods_running() to handle multi-engine pod details - Add engine-to-pod aliases so downstream lookups by engine name (tool deployment, services, log collection) continue to work - Add deduplication in engine_init() and kube_cleanup() to avoid processing aliased pod entries multiple times Co-Authored-By: Claude Opus 4.6 (1M context) --- endpoints/kube/kube.py | 438 +++++++++++++++++++++++++++++------------ schema/kube.json | 50 ++++- 2 files changed, 366 insertions(+), 122 deletions(-) diff --git a/endpoints/kube/kube.py b/endpoints/kube/kube.py index 507a0c90..6e1c9367 100755 --- a/endpoints/kube/kube.py +++ b/endpoints/kube/kube.py @@ -271,6 +271,139 @@ def normalize_endpoint_settings(endpoint, rickshaw): if engine_id not in endpoint["engines"]["verifications"][engine_role]: endpoint["engines"]["verifications"][engine_role][engine_id] = endpoint["engines"]["defaults"]["verifications"] + # Normalize pod groups + # Build the list of all engines for validation + all_engines = [] + for engine_role in [ "client", "server" ]: + if engine_role in endpoint["engines"]: + for engine_id in endpoint["engines"][engine_role]: + all_engines.append({"role": engine_role, "id": engine_id}) + + if "pods" in endpoint: + # Expand IDs and validate pod groups + assigned_engines = set() + pod_names = set() + auto_pod_id = 1 + + for pod_group_idx, pod_group in enumerate(endpoint["pods"]): + # Validate and track pod name + if "name" in pod_group: + if pod_group["name"] in pod_names: + msg = "Duplicate pod name '%s' found in pod group at index %d" % (pod_group["name"], pod_group_idx) + if args.validate: + endpoints.validate_error(msg) + else: + logger.error(msg) + return None + pod_names.add(pod_group["name"]) + + # Expand engine IDs in pod group + expanded_engines = [] + for engine_entry in pod_group["engines"]: + try: + engine_entry["ids"] = endpoints.expand_ids(engine_entry["ids"]) + except ValueError as e: + msg = "While expanding IDs for role '%s' in pod group at index %d encountered exception '%s'" % (engine_entry["role"], pod_group_idx, str(e)) + if args.validate: + endpoints.validate_error(msg) + else: + logger.error(msg) + return None + + for engine_id in engine_entry["ids"]: + engine_key = "%s-%d" % (engine_entry["role"], engine_id) + + # Validate engine exists + if engine_entry["role"] not in endpoint["engines"] or engine_id not in endpoint["engines"][engine_entry["role"]]: + msg = "Engine '%s' in pod group at index %d does not exist in endpoint engines" % (engine_key, pod_group_idx) + if args.validate: + endpoints.validate_error(msg) + else: + logger.error(msg) + return None + + # Validate no duplicate assignment + if engine_key in assigned_engines: + msg = "Engine '%s' in pod group at index %d is already assigned to another pod group" % (engine_key, pod_group_idx) + if args.validate: + endpoints.validate_error(msg) + else: + logger.error(msg) + return None + + assigned_engines.add(engine_key) + expanded_engines.append({"role": engine_entry["role"], "id": engine_id}) + + pod_group["expanded_engines"] = expanded_engines + + # Auto-generate pod name if not provided + if "name" not in pod_group: + while ("pod-%d" % auto_pod_id) in pod_names: + auto_pod_id += 1 + pod_group["name"] = "pod-%d" % auto_pod_id + pod_names.add(pod_group["name"]) + auto_pod_id += 1 + + # Create solo pod groups for any unassigned engines + for engine in all_engines: + engine_key = "%s-%d" % (engine["role"], engine["id"]) + if engine_key not in assigned_engines: + solo_name = engine_key + endpoint["pods"].append({ + "name": solo_name, + "engines": [{"role": engine["role"], "ids": [engine["id"]]}], + "expanded_engines": [{"role": engine["role"], "id": engine["id"]}], + "solo": True + }) + else: + # No pods config: create a solo pod group per engine (backward compatible) + endpoint["pods"] = [] + for engine in all_engines: + engine_key = "%s-%d" % (engine["role"], engine["id"]) + endpoint["pods"].append({ + "name": engine_key, + "engines": [{"role": engine["role"], "ids": [engine["id"]]}], + "expanded_engines": [{"role": engine["role"], "id": engine["id"]}], + "solo": True + }) + + # Validate pod-level setting consistency within each pod group + pod_level_keys = [ "cpu-partitioning", "nodeSelector", "hostNetwork", "runtimeClassName", "annotations" ] + for pod_group in endpoint["pods"]: + if len(pod_group["expanded_engines"]) <= 1: + continue + + first_engine = pod_group["expanded_engines"][0] + first_settings = endpoint["engines"]["settings"][first_engine["role"]][first_engine["id"]] + + for engine in pod_group["expanded_engines"][1:]: + engine_settings = endpoint["engines"]["settings"][engine["role"]][engine["id"]] + engine_key = "%s-%d" % (engine["role"], engine["id"]) + + for key in pod_level_keys: + first_val = first_settings.get(key) + engine_val = engine_settings.get(key) + if first_val != engine_val: + first_key = "%s-%d" % (first_engine["role"], first_engine["id"]) + msg = "Engines '%s' and '%s' in pod group '%s' have different values for pod-level setting '%s'" % (first_key, engine_key, pod_group["name"], key) + if args.validate: + endpoints.validate_error(msg) + else: + logger.error(msg) + return None + + # Check securityContext.pod specifically + first_sc_pod = first_settings.get("securityContext", {}).get("pod") + engine_sc_pod = engine_settings.get("securityContext", {}).get("pod") + if first_sc_pod != engine_sc_pod: + first_key = "%s-%d" % (first_engine["role"], first_engine["id"]) + msg = "Engines '%s' and '%s' in pod group '%s' have different values for pod-level setting 'securityContext.pod'" % (first_key, engine_key, pod_group["name"]) + if args.validate: + endpoints.validate_error(msg) + else: + logger.error(msg) + return None + return endpoint def find_k8s_bin(validate, connection, remote_env): @@ -695,31 +828,38 @@ def compile_object_configs(): settings["engines"]["endpoint"]["classes"]["cpu-partitioning"] = dict() settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["with"] = [] settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["without"] = [] - for role in roles: - if role in endpoint["engines"]["settings"]: - csids = list(endpoint["engines"]["settings"][role].keys()) - csids.sort() - for csid in csids: - engine = { - "role": role, - "id": int(csid) - } + for pod_group in endpoint["pods"]: + # Use the first engine's settings to determine cpu-partitioning + # (already validated that all engines in the group agree) + first_engine = pod_group["expanded_engines"][0] + first_settings = endpoint["engines"]["settings"][first_engine["role"]][first_engine["id"]] + + pod_group_entry = { + "name": pod_group["name"], + "engines": pod_group["expanded_engines"], + "solo": pod_group.get("solo", False) + } - if endpoint["engines"]["settings"][role][csid]["cpu-partitioning"]: - settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["with"].append(engine) - else: - settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["without"].append(engine) + if first_settings["cpu-partitioning"]: + settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["with"].append(pod_group_entry) + else: + settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["without"].append(pod_group_entry) endpoints.log_settings(settings, mode = "engines") return 0 -def create_pod_crd(role = None, id = None, node = None, node_tools = None): +def create_pod_crd(role = None, id = None, node = None, node_tools = None, cs_engines = None, pod_name = None): """ Create a pod CRD Args: - None + role (str): The role for profiler pods ('master' or 'worker'), or None for CS pods using cs_engines + id (int): The pod ID for profiler pods, or None for CS pods using cs_engines + node (str): The node name for profiler pods (required when role is 'master' or 'worker') + node_tools (list): List of tools for profiler pods + cs_engines (list): List of engine dicts [{"role": ..., "id": ...}, ...] for CS pods + pod_name (str): The pod name (without prefix) for CS pods Globals: args (namespace): the script's CLI parameters @@ -731,28 +871,45 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): crd, 0: success crd, 1: failure """ - logger.info("Creating CRD for engine %s-%d" % (role, id)) + is_cs_pod = cs_engines is not None + is_profiler_pod = role is not None - if role is None or id is None: - logger.error("You must define role and id when calling this function") + if is_cs_pod: + logger.info("Creating CRD for CS pod '%s' with engines: %s" % (pod_name, ["%s-%d" % (e["role"], e["id"]) for e in cs_engines])) + else: + logger.info("Creating CRD for profiler engine %s-%d" % (role, id)) + + if not is_cs_pod and not is_profiler_pod: + logger.error("You must define either cs_engines or role/id when calling this function") return None, 1 - if role == "master" or role == "worker": - if node is None: - logger.error("You must define node when role is either 'master' or 'worker'") + if is_profiler_pod: + if id is None: + logger.error("You must define id when role is specified") return None, 1 + if role == "master" or role == "worker": + if node is None: + logger.error("You must define node when role is either 'master' or 'worker'") + return None, 1 + + if is_cs_pod: + name = pod_name + else: + name = "%s-%d" % (role, id) - name = "%s-%d" % (role, id) endpoint = settings["run-file"]["endpoints"][args.endpoint_index] + # For CS pods, use the first engine's settings for pod-level config + # (all engines in the group are validated to have matching pod-level settings) pod_settings = None - if role == "client" or role == "server": - pod_settings = endpoint["engines"]["settings"][role][id] + if is_cs_pod: + first_engine = cs_engines[0] + pod_settings = endpoint["engines"]["settings"][first_engine["role"]][first_engine["id"]] elif role == "worker" or role == "master": pod_settings = endpoint["engines"]["defaults"]["settings"] if pod_settings is None: logger.error("Could not find mapping for pod settings") - return None,1 + return None, 1 crd = { "apiVersion": "v1", @@ -766,7 +923,7 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): } } - if role == "client" or role == "server": + if is_cs_pod: crd["metadata"]["labels"] = { "app": crd["metadata"]["name"] } @@ -833,7 +990,7 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): has_hugepages = False - if role == "client" or role == "server": + if is_cs_pod: if "securityContext" in pod_settings and "pod" in pod_settings["securityContext"]: crd["spec"]["securityContext"] = copy.deepcopy(pod_settings["securityContext"]["pod"]) @@ -846,40 +1003,44 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): if "hostNetwork" in pod_settings: crd["spec"]["hostNetwork"] = pod_settings["hostNetwork"] - if "volumes" in pod_settings: - for volume in pod_settings["volumes"]: - new_volume = { - "name": volume["name"] - } - - # this loop should only execute once; the JSON schema - # defines the min and max properties as 1, but by - # writing it this way we don't have to handle the - # individual values (of which there are many) -- the - # loop is generic and should adapt to the different - # values automatically - for key in volume["volume"].keys(): - new_volume[key] = copy.deepcopy(volume["volume"][key]) - - crd["spec"]["volumes"].append(new_volume) - - if "resources" in pod_settings: - for key in pod_settings["resources"].keys(): - if "hugepages" in key: - has_hugepages = True - - crd["spec"]["volumes"].append({ - "name": "hugepage", - "emptyDir": { - "medium": "HugePages" + # Accumulate volumes from all engines in the pod, deduplicating by name + volume_names_added = set() + for engine in cs_engines: + engine_settings = endpoint["engines"]["settings"][engine["role"]][engine["id"]] + if "volumes" in engine_settings: + for volume in engine_settings["volumes"]: + if volume["name"] not in volume_names_added: + new_volume = { + "name": volume["name"] } - }) - break + for key in volume["volume"].keys(): + new_volume[key] = copy.deepcopy(volume["volume"][key]) + + crd["spec"]["volumes"].append(new_volume) + volume_names_added.add(volume["name"]) + + if "resources" in engine_settings and not has_hugepages: + for key in engine_settings["resources"].keys(): + if "hugepages" in key: + has_hugepages = True + + crd["spec"]["volumes"].append({ + "name": "hugepage", + "emptyDir": { + "medium": "HugePages" + } + }) + + break container_names = [] - if role == "client" or role == "server": - container_names.append(name) + container_engine_map = {} + if is_cs_pod: + for engine in cs_engines: + cn = "%s-%d" % (engine["role"], engine["id"]) + container_names.append(cn) + container_engine_map[cn] = engine if role == "worker" or role == "master": for tool in settings["engines"]["profiler-mapping"].keys(): if node_tools is not None and tool not in node_tools: @@ -889,14 +1050,15 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): engine_name = "%s-%s-%d" % ("profiler", mapping["label"], id) mapping["ids"].append(engine_name) container_names.append(engine_name) - pass crd["spec"]["containers"] = [] for container_name in container_names: logger.info("Adding container '%s' to pod" % (container_name)) image = None - if role == "client" or role == "server": - image = endpoints.get_engine_id_image(settings, role, id, pod_settings["userenv"]) + if is_cs_pod: + engine = container_engine_map[container_name] + engine_settings = endpoint["engines"]["settings"][engine["role"]][engine["id"]] + image = endpoints.get_engine_id_image(settings, engine["role"], engine["id"], engine_settings["userenv"]) elif role == "worker" or role == "master": userenv = endpoints.get_profiler_userenv(settings, container_name) if userenv is None: @@ -956,6 +1118,12 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): else: logger.info("Image pull secret '%s' already added to pod spec" % (secret_name)) + # For CS pods, resolve per-container settings from the engine's own config + if is_cs_pod: + container_settings = endpoint["engines"]["settings"][container_engine_map[container_name]["role"]][container_engine_map[container_name]["id"]] + else: + container_settings = pod_settings + container = { "name": container_name, "image": image, @@ -963,7 +1131,7 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): "env": [ { "name": "rickshaw_host", - "value": pod_settings["controller-ip-address"] + "value": container_settings["controller-ip-address"] }, { "name": "base_run_dir", @@ -983,7 +1151,7 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): }, { "name": "osruntime", - "value": pod_settings["osruntime"] + "value": container_settings["osruntime"] }, { "name": "roadblock_passwd", @@ -1008,9 +1176,9 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): ] } - if role == "client" or role == "server": + if is_cs_pod: cpu_partitioning = None - if pod_settings["cpu-partitioning"]: + if container_settings["cpu-partitioning"]: cpu_partitioning = 1 else: cpu_partitioning = 0 @@ -1048,12 +1216,12 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): ] ) - if role == "client" or role == "server": - if "securityContext" in pod_settings and "container" in pod_settings["securityContext"]: - container["securityContext"] = copy.deepcopy(pod_settings["securityContext"]["container"]) + if is_cs_pod: + if "securityContext" in container_settings and "container" in container_settings["securityContext"]: + container["securityContext"] = copy.deepcopy(container_settings["securityContext"]["container"]) - if "volumes" in pod_settings: - for volume in pod_settings["volumes"]: + if "volumes" in container_settings: + for volume in container_settings["volumes"]: new_volume_mount = { "name": volume["name"] } @@ -1069,8 +1237,8 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): "name": "hugepage" }) - if "resources" in pod_settings: - container["resources"] = copy.deepcopy(pod_settings["resources"]) + if "resources" in container_settings: + container["resources"] = copy.deepcopy(container_settings["resources"]) crd["spec"]["containers"].append(container) @@ -1154,10 +1322,19 @@ def verify_pods_running(connection, pods, pod_details, abort_event): running_containers.append(container["name"]) pod_verifications = None - if pod_details[pod_name]["role"] in [ "client", "server" ]: - pod_verifications = endpoint["engines"]["verifications"][pod_details[pod_name]["role"]][pod_details[pod_name]["id"]] - elif pod_details[pod_name]["role"] in [ "worker", "master" ]: - pod_verifications = endpoint["engines"]["defaults"]["verifications"] + if "role" in pod_details[pod_name]: + # profiler pod format + if pod_details[pod_name]["role"] in [ "client", "server" ]: + pod_verifications = endpoint["engines"]["verifications"][pod_details[pod_name]["role"]][pod_details[pod_name]["id"]] + elif pod_details[pod_name]["role"] in [ "worker", "master" ]: + pod_verifications = endpoint["engines"]["defaults"]["verifications"] + elif "engines" in pod_details[pod_name]: + # CS pod group format: resolve verification from container name + fields = container["name"].split("-") + container_role = fields[0] + container_id = int(fields[1]) + if container_role in [ "client", "server" ]: + pod_verifications = endpoint["engines"]["verifications"][container_role][container_id] if pod_verifications is None: logger.error("Could not find mapping for pod verifications") @@ -1244,40 +1421,40 @@ def create_cs_pods(cpu_partitioning = None, abort_event = None): endpoint = settings["run-file"]["endpoints"][args.endpoint_index] - engines = None + pod_groups = None if cpu_partitioning is None: logger.error("You must define cpu_partitioning when calling this function") return 1 elif cpu_partitioning: - engines = settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["with"] + pod_groups = settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["with"] else: - engines = settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["without"] + pod_groups = settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["without"] - if len(engines) > 0: - logger.info("Going to create %d engines" % (len(engines))) + if len(pod_groups) > 0: + logger.info("Going to create %d pod groups" % (len(pod_groups))) else: - logger.info("No engines to create") + logger.info("No pod groups to create") return 0 - for engine in engines: - logger.info("Creating engine %s-%d" % (engine["role"], engine["id"])) + for pod_group in pod_groups: + logger.info("Creating pod group '%s' with engines: %s" % (pod_group["name"], ["%s-%d" % (e["role"], e["id"]) for e in pod_group["engines"]])) - engine["crd"], rc = create_pod_crd(engine["role"], engine["id"]) + pod_group["crd"], rc = create_pod_crd(cs_engines = pod_group["engines"], pod_name = pod_group["name"]) if rc == 1: - logger.error("Failed to create CRD for %s-%d" % (engine["role"], engine["id"])) - if crd is None: - logger.error("No CRD available for %s-%d" % (engine["role"], engine["id"])) + logger.error("Failed to create CRD for pod group '%s'" % (pod_group["name"])) + if pod_group["crd"] is None: + logger.error("No CRD available for pod group '%s'" % (pod_group["name"])) else: - logger.error("CRD generated for %s-%d is:\n%s" % (engine["role"], engine["id"], endpoints.dump_json(engine["crd"]))) + logger.error("CRD generated for pod group '%s' is:\n%s" % (pod_group["name"], endpoints.dump_json(pod_group["crd"]))) else: - crd_json_str = endpoints.dump_json(engine["crd"]) - - logger.info("Created CRD for %s-%d:\n%s" % (engine["role"], engine["id"], crd_json_str)) + crd_json_str = endpoints.dump_json(pod_group["crd"]) + + logger.info("Created CRD for pod group '%s':\n%s" % (pod_group["name"], crd_json_str)) - crd_filename = settings["dirs"]["local"]["crds"]["pods"] + "/%s-%s.json" % (engine["role"], engine["id"]) + crd_filename = settings["dirs"]["local"]["crds"]["pods"] + "/%s.json" % (pod_group["name"]) with open(crd_filename, "w", encoding = "ascii") as crd_fp: crd_fp.write(crd_json_str) - logger.info("Wrote CRD for %s-%s to %s" % (engine["role"], engine["id"], crd_filename)) + logger.info("Wrote CRD for pod group '%s' to %s" % (pod_group["name"], crd_filename)) if not "validation" in settings["engines"]["endpoint"]: settings["engines"]["endpoint"]["validation"] = { @@ -1322,24 +1499,24 @@ def create_cs_pods(cpu_partitioning = None, abort_event = None): logger.info("Validating CRDs") invalid_crds = [] valid_crds = [] - for engine in engines: - engine_name = "%s-%d" % (engine["role"], engine["id"]) - logger.info("Validating CRD for '%s'" % (engine_name)) + for pod_group in pod_groups: + pod_name = pod_group["name"] + logger.info("Validating CRD for pod '%s'" % (pod_name)) cmd = "%s create --filename - --dry-run=server --validate=strict" % (settings["misc"]["k8s-bin"]) - result = endpoints.run_remote(con, cmd, debug = settings["misc"]["debug-output"], stdin = endpoints.dump_json(engine["crd"]), env = settings["misc"]["remote-env"]) + result = endpoints.run_remote(con, cmd, debug = settings["misc"]["debug-output"], stdin = endpoints.dump_json(pod_group["crd"]), env = settings["misc"]["remote-env"]) endpoints.log_result(result) if result.exited != 0: - logger.error("Did not validate CRD for '%s'" % (engine_name)) - invalid_crds.append(engine_name) + logger.error("Did not validate CRD for pod '%s'" % (pod_name)) + invalid_crds.append(pod_name) else: - logger.info("Validated CRD for '%s'" % (engine_name)) - valid_crds.append(engine_name) + logger.info("Validated CRD for pod '%s'" % (pod_name)) + valid_crds.append(pod_name) settings["engines"]["endpoint"]["validation"]["valid"].extend(valid_crds) settings["engines"]["endpoint"]["validation"]["invalid"].extend(invalid_crds) if len(valid_crds) > 0: - logger.info("Validated the CRDs for these %d engines: %s" % (len(valid_crds), valid_crds)) + logger.info("Validated the CRDs for these %d pods: %s" % (len(valid_crds), valid_crds)) if len(invalid_crds) > 0: - logger.error("Failed to validate the CRDs for these %d engines: %s" % (len(invalid_crds), invalid_crds)) + logger.error("Failed to validate the CRDs for these %d pods: %s" % (len(invalid_crds), invalid_crds)) return 1 if "pull-secrets" in settings["misc"] and len(settings["misc"]["pull-secrets"]): @@ -1362,41 +1539,41 @@ def create_cs_pods(cpu_partitioning = None, abort_event = None): logger.info("Creating CRDs") failed_crds = [] created_crds = [] - engine_details = dict() - for engine in engines: - engine_name = "%s-%d" % (engine["role"], engine["id"]) - logger.info("Creating CRD for '%s'" % (engine_name)) + pod_details = dict() + for pod_group in pod_groups: + pod_name = pod_group["name"] + logger.info("Creating CRD for pod '%s'" % (pod_name)) cmd = "%s create --filename -" % (settings["misc"]["k8s-bin"]) - result = endpoints.run_remote(con, cmd, debug = settings["misc"]["debug-output"], stdin = endpoints.dump_json(engine["crd"]), env = settings["misc"]["remote-env"]) + result = endpoints.run_remote(con, cmd, debug = settings["misc"]["debug-output"], stdin = endpoints.dump_json(pod_group["crd"]), env = settings["misc"]["remote-env"]) endpoints.log_result(result) if result.exited != 0: - logger.error("Did not create CRD for '%s'" % (engine_name)) - failed_crds.append(engine_name) + logger.error("Did not create CRD for pod '%s'" % (pod_name)) + failed_crds.append(pod_name) else: - logger.info("Created CRD for '%s'" % (engine_name)) - created_crds.append(engine_name) - engine_details[engine_name] = { - "role": engine["role"], - "id": engine["id"] + logger.info("Created CRD for pod '%s'" % (pod_name)) + created_crds.append(pod_name) + pod_details[pod_name] = { + "engines": pod_group["engines"], + "solo": pod_group.get("solo", False) } settings["engines"]["endpoint"]["created"]["succeeded"].extend(created_crds) settings["engines"]["endpoint"]["created"]["failed"].extend(failed_crds) if len(created_crds) > 0: - logger.info("Created the CRDs for these %d engines: %s" % (len(created_crds), created_crds)) + logger.info("Created the CRDs for these %d pods: %s" % (len(created_crds), created_crds)) if len(failed_crds) > 0: - logger.error("Failed to create the CRDs for these %d engines: %s" % (len(failed_crds), failed_crds)) + logger.error("Failed to create the CRDs for these %d pods: %s" % (len(failed_crds), failed_crds)) return 2 if not "pods" in settings["engines"]["endpoint"]: settings["engines"]["endpoint"]["pods"] = dict() - pod_status = verify_pods_running(con, created_crds, engine_details, abort_event) + pod_status = verify_pods_running(con, created_crds, pod_details, abort_event) if pod_status is None: logger.error("Encountered fatal error while verifying pods") return 2 logger.info("Reviewing pods") for pod in pod_status.keys(): - logger.info("Pod '%s' is running on node '%s'" % (engine, pod_status[pod]["node"])) + logger.info("Pod '%s' is running on node '%s'" % (pod, pod_status[pod]["node"])) if pod_status[pod]["node"] not in settings["engines"]["endpoint"]["hosting-nodes"]: logger.info("Adding node '%s' to the list of hosting nodes" % (pod_status[pod]["node"])) @@ -1405,6 +1582,16 @@ def create_cs_pods(cpu_partitioning = None, abort_event = None): settings["engines"]["endpoint"]["verification"]["verified"].append(pod_status[pod]["name"]) settings["engines"]["endpoint"]["pods"][pod] = copy.deepcopy(pod_status[pod]) + + # For multi-engine pods, create engine-name aliases so + # downstream code can look up pod info by engine name + if pod in pod_details and "engines" in pod_details[pod] and not pod_details[pod].get("solo", False): + for engine in pod_details[pod]["engines"]: + engine_name = "%s-%d" % (engine["role"], engine["id"]) + if engine_name != pod: + logger.info("Creating engine-to-pod alias: '%s' -> pod '%s'" % (engine_name, pod)) + settings["engines"]["endpoint"]["pods"][engine_name] = settings["engines"]["endpoint"]["pods"][pod] + unverified = 0 for pod in created_crds: if pod not in settings["engines"]["endpoint"]["verification"]["verified"]: @@ -1416,7 +1603,7 @@ def create_cs_pods(cpu_partitioning = None, abort_event = None): return 2 logger.info("There are %d hosting nodes: %s" % (len(settings["engines"]["endpoint"]["hosting-nodes"]), settings["engines"]["endpoint"]["hosting-nodes"])) - + return 0 def determine_tools_for_node(start_tools, opt_in_tags, opt_out_tags): @@ -1805,17 +1992,21 @@ def kube_cleanup(): errors = True logger.info("Collecting engine logs") + processed_log_pods = set() pods = list(settings["engines"]["endpoint"]["pods"].keys()) pods.sort() for pod in pods: pod_name = settings["engines"]["endpoint"]["pods"][pod]["name"] + if pod_name in processed_log_pods: + continue + processed_log_pods.add(pod_name) node_name = settings["engines"]["endpoint"]["pods"][pod]["node"] logger.info("Processing pod '%s' on node '%s'" % (pod_name, node_name)) for engine in settings["engines"]["endpoint"]["pods"][pod]["containers"]: logger.info("Collecting log for engine '%s'" % (engine)) cmd = "%s logs %s-%s --namespace %s --container %s" % (settings["misc"]["k8s-bin"], endpoint_default_settings["prefix"]["pod"], - pod, + pod_name, endpoint["namespace"]["name"], engine) result = endpoints.run_remote(con, cmd, debug = settings["misc"]["debug-output"], env = settings["misc"]["remote-env"]) @@ -1866,10 +2057,15 @@ def engine_init(): """ logger.info("Building messages to send engine-specific metadata to the engines") env_vars_msgs = [] + processed_pods = set() pods = list(settings["engines"]["endpoint"]["pods"].keys()) pods.sort() for pod in pods: pod_name = settings["engines"]["endpoint"]["pods"][pod]["name"] + if pod_name in processed_pods: + logger.info("Skipping pod '%s' (alias for already processed pod '%s')" % (pod, pod_name)) + continue + processed_pods.add(pod_name) node_name = settings["engines"]["endpoint"]["pods"][pod]["node"] logger.info("Processing pod '%s' on node '%s'" % (pod_name, node_name)) for engine in settings["engines"]["endpoint"]["pods"][pod]["containers"]: diff --git a/schema/kube.json b/schema/kube.json index 10adc283..fda7be13 100644 --- a/schema/kube.json +++ b/schema/kube.json @@ -316,7 +316,55 @@ "targets", "settings" ] - } + } + }, + "pods": { + "description": "Optional grouping of engines into pods. Each entry defines one pod containing the listed engines as separate containers. If omitted, each engine gets its own pod. Engines not listed in any pod group automatically get their own solo pod.", + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "type": "object", + "properties": { + "name": { + "description": "Optional user-defined name for this pod. Must be unique across all pod groups. If omitted, a name is auto-generated.", + "type": "string", + "minLength": 1, + "pattern": "^[a-z0-9]([a-z0-9-]*[a-z0-9])?$" + }, + "engines": { + "description": "The list of engines to group into this pod. Each entry specifies a role and one or more engine IDs.", + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "type": "object", + "properties": { + "role": { + "type": "string", + "enum": [ + "client", + "server" + ] + }, + "ids": { + "description": "This parameter defines which engines with the specified role belong to this pod. The 'number-lists' definition below explains what the available formats are.", + "$ref": "#/definitions/number-lists" + } + }, + "additionalProperties": false, + "required": [ + "role", + "ids" + ] + } + } + }, + "additionalProperties": false, + "required": [ + "engines" + ] + } } }, "additionalProperties": false, From e51daf3b64c04264a56f07c540875c7f2b772eab Mon Sep 17 00:00:00 2001 From: Karl Rister Date: Wed, 8 Apr 2026 10:58:01 -0500 Subject: [PATCH 2/2] fix: initialize profiled-nodes before first use in kube endpoint The profiled-nodes list in settings was being appended to before it was initialized, causing a KeyError on multi-node clusters where worker nodes are not also masters. Single-node CI clusters masked this because the worker node was already in the profiled_nodes local list (as a master), so the append path was never reached. Move the initialization to where the other classes entries are created. Co-Authored-By: Claude Opus 4.6 (1M context) --- endpoints/kube/kube.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/endpoints/kube/kube.py b/endpoints/kube/kube.py index 6e1c9367..7ad47084 100755 --- a/endpoints/kube/kube.py +++ b/endpoints/kube/kube.py @@ -828,6 +828,7 @@ def compile_object_configs(): settings["engines"]["endpoint"]["classes"]["cpu-partitioning"] = dict() settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["with"] = [] settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["without"] = [] + settings["engines"]["endpoint"]["classes"]["profiled-nodes"] = [] for pod_group in endpoint["pods"]: # Use the first engine's settings to determine cpu-partitioning # (already validated that all engines in the group agree) @@ -1797,7 +1798,6 @@ def create_tools_pods(abort_event): logger.info("Creating node profiling pods") - settings["engines"]["endpoint"]["classes"]["profiled-nodes"] = [] tools_pod_id = 1 for node in profiled_nodes: pod = {