diff --git a/endpoints/kube/kube.py b/endpoints/kube/kube.py index a2bbb4d1..990a817b 100755 --- a/endpoints/kube/kube.py +++ b/endpoints/kube/kube.py @@ -271,6 +271,139 @@ def normalize_endpoint_settings(endpoint, rickshaw): if engine_id not in endpoint["engines"]["verifications"][engine_role]: endpoint["engines"]["verifications"][engine_role][engine_id] = endpoint["engines"]["defaults"]["verifications"] + # Normalize pod groups + # Build the list of all engines for validation + all_engines = [] + for engine_role in [ "client", "server" ]: + if engine_role in endpoint["engines"]: + for engine_id in endpoint["engines"][engine_role]: + all_engines.append({"role": engine_role, "id": engine_id}) + + if "pods" in endpoint: + # Expand IDs and validate pod groups + assigned_engines = set() + pod_names = set() + auto_pod_id = 1 + + for pod_group_idx, pod_group in enumerate(endpoint["pods"]): + # Validate and track pod name + if "name" in pod_group: + if pod_group["name"] in pod_names: + msg = "Duplicate pod name '%s' found in pod group at index %d" % (pod_group["name"], pod_group_idx) + if args.validate: + endpoints.validate_error(msg) + else: + logger.error(msg) + return None + pod_names.add(pod_group["name"]) + + # Expand engine IDs in pod group + expanded_engines = [] + for engine_entry in pod_group["engines"]: + try: + engine_entry["ids"] = endpoints.expand_ids(engine_entry["ids"]) + except ValueError as e: + msg = "While expanding IDs for role '%s' in pod group at index %d encountered exception '%s'" % (engine_entry["role"], pod_group_idx, str(e)) + if args.validate: + endpoints.validate_error(msg) + else: + logger.error(msg) + return None + + for engine_id in engine_entry["ids"]: + engine_key = "%s-%d" % (engine_entry["role"], engine_id) + + # Validate engine exists + if engine_entry["role"] not in endpoint["engines"] or engine_id not in endpoint["engines"][engine_entry["role"]]: + msg = "Engine '%s' in pod group at index %d does not exist in endpoint engines" % (engine_key, pod_group_idx) + if args.validate: + endpoints.validate_error(msg) + else: + logger.error(msg) + return None + + # Validate no duplicate assignment + if engine_key in assigned_engines: + msg = "Engine '%s' in pod group at index %d is already assigned to another pod group" % (engine_key, pod_group_idx) + if args.validate: + endpoints.validate_error(msg) + else: + logger.error(msg) + return None + + assigned_engines.add(engine_key) + expanded_engines.append({"role": engine_entry["role"], "id": engine_id}) + + pod_group["expanded_engines"] = expanded_engines + + # Auto-generate pod name if not provided + if "name" not in pod_group: + while ("pod-%d" % auto_pod_id) in pod_names: + auto_pod_id += 1 + pod_group["name"] = "pod-%d" % auto_pod_id + pod_names.add(pod_group["name"]) + auto_pod_id += 1 + + # Create solo pod groups for any unassigned engines + for engine in all_engines: + engine_key = "%s-%d" % (engine["role"], engine["id"]) + if engine_key not in assigned_engines: + solo_name = engine_key + endpoint["pods"].append({ + "name": solo_name, + "engines": [{"role": engine["role"], "ids": [engine["id"]]}], + "expanded_engines": [{"role": engine["role"], "id": engine["id"]}], + "solo": True + }) + else: + # No pods config: create a solo pod group per engine (backward compatible) + endpoint["pods"] = [] + for engine in all_engines: + engine_key = "%s-%d" % (engine["role"], engine["id"]) + endpoint["pods"].append({ + "name": engine_key, + "engines": [{"role": engine["role"], "ids": [engine["id"]]}], + "expanded_engines": [{"role": engine["role"], "id": engine["id"]}], + "solo": True + }) + + # Validate pod-level setting consistency within each pod group + pod_level_keys = [ "cpu-partitioning", "nodeSelector", "hostNetwork", "runtimeClassName", "annotations" ] + for pod_group in endpoint["pods"]: + if len(pod_group["expanded_engines"]) <= 1: + continue + + first_engine = pod_group["expanded_engines"][0] + first_settings = endpoint["engines"]["settings"][first_engine["role"]][first_engine["id"]] + + for engine in pod_group["expanded_engines"][1:]: + engine_settings = endpoint["engines"]["settings"][engine["role"]][engine["id"]] + engine_key = "%s-%d" % (engine["role"], engine["id"]) + + for key in pod_level_keys: + first_val = first_settings.get(key) + engine_val = engine_settings.get(key) + if first_val != engine_val: + first_key = "%s-%d" % (first_engine["role"], first_engine["id"]) + msg = "Engines '%s' and '%s' in pod group '%s' have different values for pod-level setting '%s'" % (first_key, engine_key, pod_group["name"], key) + if args.validate: + endpoints.validate_error(msg) + else: + logger.error(msg) + return None + + # Check securityContext.pod specifically + first_sc_pod = first_settings.get("securityContext", {}).get("pod") + engine_sc_pod = engine_settings.get("securityContext", {}).get("pod") + if first_sc_pod != engine_sc_pod: + first_key = "%s-%d" % (first_engine["role"], first_engine["id"]) + msg = "Engines '%s' and '%s' in pod group '%s' have different values for pod-level setting 'securityContext.pod'" % (first_key, engine_key, pod_group["name"]) + if args.validate: + endpoints.validate_error(msg) + else: + logger.error(msg) + return None + return endpoint def find_k8s_bin(validate, connection, remote_env): @@ -613,26 +746,30 @@ def get_k8s_config(): logger.error("Did not find any nodes") return 1 - settings["misc"]["k8s"]["nodes"]["endpoint"]["masters"] = [] - settings["misc"]["k8s"]["nodes"]["endpoint"]["workers"] = [] + masters = set() + workers = set() for node in settings["misc"]["k8s"]["nodes"]["cluster"]["items"]: name = node["metadata"]["name"] + labels = node["metadata"]["labels"] # OCP - if "node-role.kubernetes.io/worker" in node["metadata"]["labels"]: - settings["misc"]["k8s"]["nodes"]["endpoint"]["masters"].append(name) - if "node-role.kubernetes.io/master" in node["metadata"]["labels"]: - settings["misc"]["k8s"]["nodes"]["endpoint"]["workers"].append(name) + if "node-role.kubernetes.io/master" in labels: + masters.add(name) + if "node-role.kubernetes.io/worker" in labels: + workers.add(name) - # microk8s - if "node.kubernetes.io/microk8s-controlplane" in node["metadata"]["labels"]: - settings["misc"]["k8s"]["nodes"]["endpoint"]["masters"].append(name) - settings["misc"]["k8s"]["nodes"]["endpoint"]["workers"].append(name) + # microk8s - single-node, acts as both master and worker + if "node.kubernetes.io/microk8s-controlplane" in labels: + masters.add(name) + workers.add(name) # k3s - single-node like microk8s - if "node-role.kubernetes.io/control-plane" in node["metadata"]["labels"]: - settings["misc"]["k8s"]["nodes"]["endpoint"]["masters"].append(name) - settings["misc"]["k8s"]["nodes"]["endpoint"]["workers"].append(name) + if "node-role.kubernetes.io/control-plane" in labels: + masters.add(name) + workers.add(name) + + settings["misc"]["k8s"]["nodes"]["endpoint"]["masters"] = sorted(masters) + settings["misc"]["k8s"]["nodes"]["endpoint"]["workers"] = sorted(workers) node_count_fault = False logger.info("Found %d masters: %s" % (len(settings["misc"]["k8s"]["nodes"]["endpoint"]["masters"]), settings["misc"]["k8s"]["nodes"]["endpoint"]["masters"])) @@ -691,31 +828,38 @@ def compile_object_configs(): settings["engines"]["endpoint"]["classes"]["cpu-partitioning"] = dict() settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["with"] = [] settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["without"] = [] - for role in roles: - if role in endpoint["engines"]["settings"]: - csids = list(endpoint["engines"]["settings"][role].keys()) - csids.sort() - for csid in csids: - engine = { - "role": role, - "id": int(csid) - } + for pod_group in endpoint["pods"]: + # Use the first engine's settings to determine cpu-partitioning + # (already validated that all engines in the group agree) + first_engine = pod_group["expanded_engines"][0] + first_settings = endpoint["engines"]["settings"][first_engine["role"]][first_engine["id"]] + + pod_group_entry = { + "name": pod_group["name"], + "engines": pod_group["expanded_engines"], + "solo": pod_group.get("solo", False) + } - if endpoint["engines"]["settings"][role][csid]["cpu-partitioning"]: - settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["with"].append(engine) - else: - settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["without"].append(engine) + if first_settings["cpu-partitioning"]: + settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["with"].append(pod_group_entry) + else: + settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["without"].append(pod_group_entry) endpoints.log_settings(settings, mode = "engines") return 0 -def create_pod_crd(role = None, id = None, node = None, node_tools = None): +def create_pod_crd(role = None, id = None, node = None, node_tools = None, cs_engines = None, pod_name = None): """ Create a pod CRD Args: - None + role (str): The role for profiler pods ('master' or 'worker'), or None for CS pods using cs_engines + id (int): The pod ID for profiler pods, or None for CS pods using cs_engines + node (str): The node name for profiler pods (required when role is 'master' or 'worker') + node_tools (list): List of tools for profiler pods + cs_engines (list): List of engine dicts [{"role": ..., "id": ...}, ...] for CS pods + pod_name (str): The pod name (without prefix) for CS pods Globals: args (namespace): the script's CLI parameters @@ -727,28 +871,45 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): crd, 0: success crd, 1: failure """ - logger.info("Creating CRD for engine %s-%d" % (role, id)) + is_cs_pod = cs_engines is not None + is_profiler_pod = role is not None - if role is None or id is None: - logger.error("You must define role and id when calling this function") + if is_cs_pod: + logger.info("Creating CRD for CS pod '%s' with engines: %s" % (pod_name, ["%s-%d" % (e["role"], e["id"]) for e in cs_engines])) + else: + logger.info("Creating CRD for profiler engine %s-%d" % (role, id)) + + if not is_cs_pod and not is_profiler_pod: + logger.error("You must define either cs_engines or role/id when calling this function") return None, 1 - if role == "master" or role == "worker": - if node is None: - logger.error("You must define node when role is either 'master' or 'worker'") + if is_profiler_pod: + if id is None: + logger.error("You must define id when role is specified") return None, 1 + if role == "master" or role == "worker": + if node is None: + logger.error("You must define node when role is either 'master' or 'worker'") + return None, 1 + + if is_cs_pod: + name = pod_name + else: + name = "%s-%d" % (role, id) - name = "%s-%d" % (role, id) endpoint = settings["run-file"]["endpoints"][args.endpoint_index] + # For CS pods, use the first engine's settings for pod-level config + # (all engines in the group are validated to have matching pod-level settings) pod_settings = None - if role == "client" or role == "server": - pod_settings = endpoint["engines"]["settings"][role][id] + if is_cs_pod: + first_engine = cs_engines[0] + pod_settings = endpoint["engines"]["settings"][first_engine["role"]][first_engine["id"]] elif role == "worker" or role == "master": pod_settings = endpoint["engines"]["defaults"]["settings"] if pod_settings is None: logger.error("Could not find mapping for pod settings") - return None,1 + return None, 1 crd = { "apiVersion": "v1", @@ -762,7 +923,7 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): } } - if role == "client" or role == "server": + if is_cs_pod: crd["metadata"]["labels"] = { "app": crd["metadata"]["name"] } @@ -829,7 +990,7 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): has_hugepages = False - if role == "client" or role == "server": + if is_cs_pod: if "securityContext" in pod_settings and "pod" in pod_settings["securityContext"]: crd["spec"]["securityContext"] = copy.deepcopy(pod_settings["securityContext"]["pod"]) @@ -842,40 +1003,44 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): if "hostNetwork" in pod_settings: crd["spec"]["hostNetwork"] = pod_settings["hostNetwork"] - if "volumes" in pod_settings: - for volume in pod_settings["volumes"]: - new_volume = { - "name": volume["name"] - } - - # this loop should only execute once; the JSON schema - # defines the min and max properties as 1, but by - # writing it this way we don't have to handle the - # individual values (of which there are many) -- the - # loop is generic and should adapt to the different - # values automatically - for key in volume["volume"].keys(): - new_volume[key] = copy.deepcopy(volume["volume"][key]) - - crd["spec"]["volumes"].append(new_volume) - - if "resources" in pod_settings: - for key in pod_settings["resources"].keys(): - if "hugepages" in key: - has_hugepages = True - - crd["spec"]["volumes"].append({ - "name": "hugepage", - "emptyDir": { - "medium": "HugePages" + # Accumulate volumes from all engines in the pod, deduplicating by name + volume_names_added = set() + for engine in cs_engines: + engine_settings = endpoint["engines"]["settings"][engine["role"]][engine["id"]] + if "volumes" in engine_settings: + for volume in engine_settings["volumes"]: + if volume["name"] not in volume_names_added: + new_volume = { + "name": volume["name"] } - }) - break + for key in volume["volume"].keys(): + new_volume[key] = copy.deepcopy(volume["volume"][key]) + + crd["spec"]["volumes"].append(new_volume) + volume_names_added.add(volume["name"]) + + if "resources" in engine_settings and not has_hugepages: + for key in engine_settings["resources"].keys(): + if "hugepages" in key: + has_hugepages = True + + crd["spec"]["volumes"].append({ + "name": "hugepage", + "emptyDir": { + "medium": "HugePages" + } + }) + + break container_names = [] - if role == "client" or role == "server": - container_names.append(name) + container_engine_map = {} + if is_cs_pod: + for engine in cs_engines: + cn = "%s-%d" % (engine["role"], engine["id"]) + container_names.append(cn) + container_engine_map[cn] = engine if role == "worker" or role == "master": for tool in settings["engines"]["profiler-mapping"].keys(): if node_tools is not None and tool not in node_tools: @@ -885,14 +1050,15 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): engine_name = "%s-%s-%d" % ("profiler", mapping["label"], id) mapping["ids"].append(engine_name) container_names.append(engine_name) - pass crd["spec"]["containers"] = [] for container_name in container_names: logger.info("Adding container '%s' to pod" % (container_name)) image = None - if role == "client" or role == "server": - image = endpoints.get_engine_id_image(settings, role, id, pod_settings["userenv"]) + if is_cs_pod: + engine = container_engine_map[container_name] + engine_settings = endpoint["engines"]["settings"][engine["role"]][engine["id"]] + image = endpoints.get_engine_id_image(settings, engine["role"], engine["id"], engine_settings["userenv"]) elif role == "worker" or role == "master": userenv = endpoints.get_profiler_userenv(settings, container_name) if userenv is None: @@ -952,6 +1118,12 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): else: logger.info("Image pull secret '%s' already added to pod spec" % (secret_name)) + # For CS pods, resolve per-container settings from the engine's own config + if is_cs_pod: + container_settings = endpoint["engines"]["settings"][container_engine_map[container_name]["role"]][container_engine_map[container_name]["id"]] + else: + container_settings = pod_settings + container = { "name": container_name, "image": image, @@ -959,7 +1131,7 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): "env": [ { "name": "rickshaw_host", - "value": pod_settings["controller-ip-address"] + "value": container_settings["controller-ip-address"] }, { "name": "base_run_dir", @@ -979,7 +1151,7 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): }, { "name": "osruntime", - "value": pod_settings["osruntime"] + "value": container_settings["osruntime"] }, { "name": "roadblock_passwd", @@ -1004,9 +1176,9 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): ] } - if role == "client" or role == "server": + if is_cs_pod: cpu_partitioning = None - if pod_settings["cpu-partitioning"]: + if container_settings["cpu-partitioning"]: cpu_partitioning = 1 else: cpu_partitioning = 0 @@ -1044,12 +1216,12 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): ] ) - if role == "client" or role == "server": - if "securityContext" in pod_settings and "container" in pod_settings["securityContext"]: - container["securityContext"] = copy.deepcopy(pod_settings["securityContext"]["container"]) + if is_cs_pod: + if "securityContext" in container_settings and "container" in container_settings["securityContext"]: + container["securityContext"] = copy.deepcopy(container_settings["securityContext"]["container"]) - if "volumes" in pod_settings: - for volume in pod_settings["volumes"]: + if "volumes" in container_settings: + for volume in container_settings["volumes"]: new_volume_mount = { "name": volume["name"] } @@ -1065,8 +1237,8 @@ def create_pod_crd(role = None, id = None, node = None, node_tools = None): "name": "hugepage" }) - if "resources" in pod_settings: - container["resources"] = copy.deepcopy(pod_settings["resources"]) + if "resources" in container_settings: + container["resources"] = copy.deepcopy(container_settings["resources"]) crd["spec"]["containers"].append(container) @@ -1150,10 +1322,19 @@ def verify_pods_running(connection, pods, pod_details, abort_event): running_containers.append(container["name"]) pod_verifications = None - if pod_details[pod_name]["role"] in [ "client", "server" ]: - pod_verifications = endpoint["engines"]["verifications"][pod_details[pod_name]["role"]][pod_details[pod_name]["id"]] - elif pod_details[pod_name]["role"] in [ "worker", "master" ]: - pod_verifications = endpoint["engines"]["defaults"]["verifications"] + if "role" in pod_details[pod_name]: + # profiler pod format + if pod_details[pod_name]["role"] in [ "client", "server" ]: + pod_verifications = endpoint["engines"]["verifications"][pod_details[pod_name]["role"]][pod_details[pod_name]["id"]] + elif pod_details[pod_name]["role"] in [ "worker", "master" ]: + pod_verifications = endpoint["engines"]["defaults"]["verifications"] + elif "engines" in pod_details[pod_name]: + # CS pod group format: resolve verification from container name + fields = container["name"].split("-") + container_role = fields[0] + container_id = int(fields[1]) + if container_role in [ "client", "server" ]: + pod_verifications = endpoint["engines"]["verifications"][container_role][container_id] if pod_verifications is None: logger.error("Could not find mapping for pod verifications") @@ -1240,40 +1421,40 @@ def create_cs_pods(cpu_partitioning = None, abort_event = None): endpoint = settings["run-file"]["endpoints"][args.endpoint_index] - engines = None + pod_groups = None if cpu_partitioning is None: logger.error("You must define cpu_partitioning when calling this function") return 1 elif cpu_partitioning: - engines = settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["with"] + pod_groups = settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["with"] else: - engines = settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["without"] + pod_groups = settings["engines"]["endpoint"]["classes"]["cpu-partitioning"]["without"] - if len(engines) > 0: - logger.info("Going to create %d engines" % (len(engines))) + if len(pod_groups) > 0: + logger.info("Going to create %d pod groups" % (len(pod_groups))) else: - logger.info("No engines to create") + logger.info("No pod groups to create") return 0 - for engine in engines: - logger.info("Creating engine %s-%d" % (engine["role"], engine["id"])) + for pod_group in pod_groups: + logger.info("Creating pod group '%s' with engines: %s" % (pod_group["name"], ["%s-%d" % (e["role"], e["id"]) for e in pod_group["engines"]])) - engine["crd"], rc = create_pod_crd(engine["role"], engine["id"]) + pod_group["crd"], rc = create_pod_crd(cs_engines = pod_group["engines"], pod_name = pod_group["name"]) if rc == 1: - logger.error("Failed to create CRD for %s-%d" % (engine["role"], engine["id"])) - if crd is None: - logger.error("No CRD available for %s-%d" % (engine["role"], engine["id"])) + logger.error("Failed to create CRD for pod group '%s'" % (pod_group["name"])) + if pod_group["crd"] is None: + logger.error("No CRD available for pod group '%s'" % (pod_group["name"])) else: - logger.error("CRD generated for %s-%d is:\n%s" % (engine["role"], engine["id"], endpoints.dump_json(engine["crd"]))) + logger.error("CRD generated for pod group '%s' is:\n%s" % (pod_group["name"], endpoints.dump_json(pod_group["crd"]))) else: - crd_json_str = endpoints.dump_json(engine["crd"]) - - logger.info("Created CRD for %s-%d:\n%s" % (engine["role"], engine["id"], crd_json_str)) + crd_json_str = endpoints.dump_json(pod_group["crd"]) + + logger.info("Created CRD for pod group '%s':\n%s" % (pod_group["name"], crd_json_str)) - crd_filename = settings["dirs"]["local"]["crds"]["pods"] + "/%s-%s.json" % (engine["role"], engine["id"]) + crd_filename = settings["dirs"]["local"]["crds"]["pods"] + "/%s.json" % (pod_group["name"]) with open(crd_filename, "w", encoding = "ascii") as crd_fp: crd_fp.write(crd_json_str) - logger.info("Wrote CRD for %s-%s to %s" % (engine["role"], engine["id"], crd_filename)) + logger.info("Wrote CRD for pod group '%s' to %s" % (pod_group["name"], crd_filename)) if not "validation" in settings["engines"]["endpoint"]: settings["engines"]["endpoint"]["validation"] = { @@ -1318,24 +1499,24 @@ def create_cs_pods(cpu_partitioning = None, abort_event = None): logger.info("Validating CRDs") invalid_crds = [] valid_crds = [] - for engine in engines: - engine_name = "%s-%d" % (engine["role"], engine["id"]) - logger.info("Validating CRD for '%s'" % (engine_name)) + for pod_group in pod_groups: + pod_name = pod_group["name"] + logger.info("Validating CRD for pod '%s'" % (pod_name)) cmd = "%s create --filename - --dry-run=server --validate=strict" % (settings["misc"]["k8s-bin"]) - result = endpoints.run_remote(con, cmd, debug = settings["misc"]["debug-output"], stdin = endpoints.dump_json(engine["crd"]), env = settings["misc"]["remote-env"]) + result = endpoints.run_remote(con, cmd, debug = settings["misc"]["debug-output"], stdin = endpoints.dump_json(pod_group["crd"]), env = settings["misc"]["remote-env"]) endpoints.log_result(result) if result.exited != 0: - logger.error("Did not validate CRD for '%s'" % (engine_name)) - invalid_crds.append(engine_name) + logger.error("Did not validate CRD for pod '%s'" % (pod_name)) + invalid_crds.append(pod_name) else: - logger.info("Validated CRD for '%s'" % (engine_name)) - valid_crds.append(engine_name) + logger.info("Validated CRD for pod '%s'" % (pod_name)) + valid_crds.append(pod_name) settings["engines"]["endpoint"]["validation"]["valid"].extend(valid_crds) settings["engines"]["endpoint"]["validation"]["invalid"].extend(invalid_crds) if len(valid_crds) > 0: - logger.info("Validated the CRDs for these %d engines: %s" % (len(valid_crds), valid_crds)) + logger.info("Validated the CRDs for these %d pods: %s" % (len(valid_crds), valid_crds)) if len(invalid_crds) > 0: - logger.error("Failed to validate the CRDs for these %d engines: %s" % (len(invalid_crds), invalid_crds)) + logger.error("Failed to validate the CRDs for these %d pods: %s" % (len(invalid_crds), invalid_crds)) return 1 if "pull-secrets" in settings["misc"] and len(settings["misc"]["pull-secrets"]): @@ -1358,41 +1539,41 @@ def create_cs_pods(cpu_partitioning = None, abort_event = None): logger.info("Creating CRDs") failed_crds = [] created_crds = [] - engine_details = dict() - for engine in engines: - engine_name = "%s-%d" % (engine["role"], engine["id"]) - logger.info("Creating CRD for '%s'" % (engine_name)) + pod_details = dict() + for pod_group in pod_groups: + pod_name = pod_group["name"] + logger.info("Creating CRD for pod '%s'" % (pod_name)) cmd = "%s create --filename -" % (settings["misc"]["k8s-bin"]) - result = endpoints.run_remote(con, cmd, debug = settings["misc"]["debug-output"], stdin = endpoints.dump_json(engine["crd"]), env = settings["misc"]["remote-env"]) + result = endpoints.run_remote(con, cmd, debug = settings["misc"]["debug-output"], stdin = endpoints.dump_json(pod_group["crd"]), env = settings["misc"]["remote-env"]) endpoints.log_result(result) if result.exited != 0: - logger.error("Did not create CRD for '%s'" % (engine_name)) - failed_crds.append(engine_name) + logger.error("Did not create CRD for pod '%s'" % (pod_name)) + failed_crds.append(pod_name) else: - logger.info("Created CRD for '%s'" % (engine_name)) - created_crds.append(engine_name) - engine_details[engine_name] = { - "role": engine["role"], - "id": engine["id"] + logger.info("Created CRD for pod '%s'" % (pod_name)) + created_crds.append(pod_name) + pod_details[pod_name] = { + "engines": pod_group["engines"], + "solo": pod_group.get("solo", False) } settings["engines"]["endpoint"]["created"]["succeeded"].extend(created_crds) settings["engines"]["endpoint"]["created"]["failed"].extend(failed_crds) if len(created_crds) > 0: - logger.info("Created the CRDs for these %d engines: %s" % (len(created_crds), created_crds)) + logger.info("Created the CRDs for these %d pods: %s" % (len(created_crds), created_crds)) if len(failed_crds) > 0: - logger.error("Failed to create the CRDs for these %d engines: %s" % (len(failed_crds), failed_crds)) + logger.error("Failed to create the CRDs for these %d pods: %s" % (len(failed_crds), failed_crds)) return 2 if not "pods" in settings["engines"]["endpoint"]: settings["engines"]["endpoint"]["pods"] = dict() - pod_status = verify_pods_running(con, created_crds, engine_details, abort_event) + pod_status = verify_pods_running(con, created_crds, pod_details, abort_event) if pod_status is None: logger.error("Encountered fatal error while verifying pods") return 2 logger.info("Reviewing pods") for pod in pod_status.keys(): - logger.info("Pod '%s' is running on node '%s'" % (engine, pod_status[pod]["node"])) + logger.info("Pod '%s' is running on node '%s'" % (pod, pod_status[pod]["node"])) if pod_status[pod]["node"] not in settings["engines"]["endpoint"]["hosting-nodes"]: logger.info("Adding node '%s' to the list of hosting nodes" % (pod_status[pod]["node"])) @@ -1401,6 +1582,16 @@ def create_cs_pods(cpu_partitioning = None, abort_event = None): settings["engines"]["endpoint"]["verification"]["verified"].append(pod_status[pod]["name"]) settings["engines"]["endpoint"]["pods"][pod] = copy.deepcopy(pod_status[pod]) + + # For multi-engine pods, create engine-name aliases so + # downstream code can look up pod info by engine name + if pod in pod_details and "engines" in pod_details[pod] and not pod_details[pod].get("solo", False): + for engine in pod_details[pod]["engines"]: + engine_name = "%s-%d" % (engine["role"], engine["id"]) + if engine_name != pod: + logger.info("Creating engine-to-pod alias: '%s' -> pod '%s'" % (engine_name, pod)) + settings["engines"]["endpoint"]["pods"][engine_name] = settings["engines"]["endpoint"]["pods"][pod] + unverified = 0 for pod in created_crds: if pod not in settings["engines"]["endpoint"]["verification"]["verified"]: @@ -1412,7 +1603,7 @@ def create_cs_pods(cpu_partitioning = None, abort_event = None): return 2 logger.info("There are %d hosting nodes: %s" % (len(settings["engines"]["endpoint"]["hosting-nodes"]), settings["engines"]["endpoint"]["hosting-nodes"])) - + return 0 def determine_tools_for_node(start_tools, opt_in_tags, opt_out_tags): @@ -1801,17 +1992,21 @@ def kube_cleanup(): errors = True logger.info("Collecting engine logs") + processed_log_pods = set() pods = list(settings["engines"]["endpoint"]["pods"].keys()) pods.sort() for pod in pods: pod_name = settings["engines"]["endpoint"]["pods"][pod]["name"] + if pod_name in processed_log_pods: + continue + processed_log_pods.add(pod_name) node_name = settings["engines"]["endpoint"]["pods"][pod]["node"] logger.info("Processing pod '%s' on node '%s'" % (pod_name, node_name)) for engine in settings["engines"]["endpoint"]["pods"][pod]["containers"]: logger.info("Collecting log for engine '%s'" % (engine)) cmd = "%s logs %s-%s --namespace %s --container %s" % (settings["misc"]["k8s-bin"], endpoint_default_settings["prefix"]["pod"], - pod, + pod_name, endpoint["namespace"]["name"], engine) result = endpoints.run_remote(con, cmd, debug = settings["misc"]["debug-output"], env = settings["misc"]["remote-env"]) @@ -1862,10 +2057,15 @@ def engine_init(): """ logger.info("Building messages to send engine-specific metadata to the engines") env_vars_msgs = [] + processed_pods = set() pods = list(settings["engines"]["endpoint"]["pods"].keys()) pods.sort() for pod in pods: pod_name = settings["engines"]["endpoint"]["pods"][pod]["name"] + if pod_name in processed_pods: + logger.info("Skipping pod '%s' (alias for already processed pod '%s')" % (pod, pod_name)) + continue + processed_pods.add(pod_name) node_name = settings["engines"]["endpoint"]["pods"][pod]["node"] logger.info("Processing pod '%s' on node '%s'" % (pod_name, node_name)) for engine in settings["engines"]["endpoint"]["pods"][pod]["containers"]: diff --git a/schema/kube.json b/schema/kube.json index 10adc283..fda7be13 100644 --- a/schema/kube.json +++ b/schema/kube.json @@ -316,7 +316,55 @@ "targets", "settings" ] - } + } + }, + "pods": { + "description": "Optional grouping of engines into pods. Each entry defines one pod containing the listed engines as separate containers. If omitted, each engine gets its own pod. Engines not listed in any pod group automatically get their own solo pod.", + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "type": "object", + "properties": { + "name": { + "description": "Optional user-defined name for this pod. Must be unique across all pod groups. If omitted, a name is auto-generated.", + "type": "string", + "minLength": 1, + "pattern": "^[a-z0-9]([a-z0-9-]*[a-z0-9])?$" + }, + "engines": { + "description": "The list of engines to group into this pod. Each entry specifies a role and one or more engine IDs.", + "type": "array", + "minItems": 1, + "uniqueItems": true, + "items": { + "type": "object", + "properties": { + "role": { + "type": "string", + "enum": [ + "client", + "server" + ] + }, + "ids": { + "description": "This parameter defines which engines with the specified role belong to this pod. The 'number-lists' definition below explains what the available formats are.", + "$ref": "#/definitions/number-lists" + } + }, + "additionalProperties": false, + "required": [ + "role", + "ids" + ] + } + } + }, + "additionalProperties": false, + "required": [ + "engines" + ] + } } }, "additionalProperties": false,