diff --git a/experiments/.gitignore b/experiments/.gitignore new file mode 100644 index 00000000..5c71ff5f --- /dev/null +++ b/experiments/.gitignore @@ -0,0 +1 @@ +logs_p2p_debug/ diff --git a/experiments/10_p2p_nccl_debug.sh b/experiments/10_p2p_nccl_debug.sh new file mode 100755 index 00000000..2ab5c584 --- /dev/null +++ b/experiments/10_p2p_nccl_debug.sh @@ -0,0 +1,698 @@ +#!/usr/bin/env bash +# ============================================================================= +# Debug Script: P2P NCCL hang harness for kvcached + vLLM +# +# Purpose: +# Start a minimal 1-prefill / 1-decode vLLM PD-disagg setup with +# P2pNcclConnector, route requests through a tiny local proxy, and capture the +# kvcached P2P debug breadcrumbs from both sides. +# +# Usage: +# chmod +x experiments/10_p2p_nccl_debug.sh +# ./experiments/10_p2p_nccl_debug.sh +# +# Useful overrides: +# MODEL=Qwen/Qwen2.5-1.5B-Instruct ./experiments/10_p2p_nccl_debug.sh +# TIMEOUT_REQUEST=300 KVCACHED_P2P_WAIT_LOG_INTERVAL_S=5 ./experiments/10_p2p_nccl_debug.sh +# RUN_WITH_KVCACHED=0 ./experiments/10_p2p_nccl_debug.sh +# KEEP_ALIVE_ON_FAIL=1 ./experiments/10_p2p_nccl_debug.sh +# +# Logs are saved to experiments/logs_p2p_debug//. +# ============================================================================= + +set -euo pipefail + +MODEL="${MODEL:-Qwen/Qwen2.5-1.5B-Instruct}" +SERVED_MODEL_NAME="${SERVED_MODEL_NAME:-base_model}" +VLLM_VERSION="${VLLM_VERSION:-0.19.0}" +DTYPE="${DTYPE:-float16}" +MAX_MODEL_LEN="${MAX_MODEL_LEN:-1024}" +MAX_NUM_BATCHED_TOKENS="${MAX_NUM_BATCHED_TOKENS:-$MAX_MODEL_LEN}" +MAX_NUM_SEQS="${MAX_NUM_SEQS:-32}" +MAX_TOKENS="${MAX_TOKENS:-20}" +GPU_MEM_UTIL="${GPU_MEM_UTIL:-0.8}" +PREFILL_GPU_MEM_UTIL="${PREFILL_GPU_MEM_UTIL:-$GPU_MEM_UTIL}" +DECODE_GPU_MEM_UTIL="${DECODE_GPU_MEM_UTIL:-$GPU_MEM_UTIL}" + +PREFILL_GPU="${PREFILL_GPU:-0}" +DECODE_GPU="${DECODE_GPU:-1}" +PREFILL_PORT="${PREFILL_PORT:-20001}" +DECODE_PORT="${DECODE_PORT:-20002}" +PROXY_PORT="${PROXY_PORT:-10001}" +PROXY_REGISTER_PORT="${PROXY_REGISTER_PORT:-30001}" +PROXY_BIND_IP="${PROXY_BIND_IP:-0.0.0.0}" +PROXY_IP="${PROXY_IP:-127.0.0.1}" +PREFILL_KV_PORT="${PREFILL_KV_PORT:-21001}" +DECODE_KV_PORT="${DECODE_KV_PORT:-22001}" +PREFILL_KV_BUFFER_SIZE="${PREFILL_KV_BUFFER_SIZE:-1e1}" +DECODE_KV_BUFFER_SIZE="${DECODE_KV_BUFFER_SIZE:-8e9}" +P2P_SEND_TYPE="${P2P_SEND_TYPE:-PUT_ASYNC}" +P2P_NCCL_NUM_CHANNELS="${P2P_NCCL_NUM_CHANNELS:-8}" +P2P_MEM_POOL_SIZE_GB="${P2P_MEM_POOL_SIZE_GB:-32}" + +TIMEOUT_STARTUP="${TIMEOUT_STARTUP:-240}" +TIMEOUT_REQUEST="${TIMEOUT_REQUEST:-90}" +RUN_WITH_KVCACHED="${RUN_WITH_KVCACHED:-1}" +INSTALL_DEPS="${INSTALL_DEPS:-1}" +INSTALL_KVCACHED="${INSTALL_KVCACHED:-1}" +SKIP_MODEL_DOWNLOAD="${SKIP_MODEL_DOWNLOAD:-0}" +CLEAN_STALE="${CLEAN_STALE:-1}" +KEEP_ALIVE_ON_FAIL="${KEEP_ALIVE_ON_FAIL:-0}" +EXTRA_VLLM_ARGS="${EXTRA_VLLM_ARGS:-}" +LOG_ROOT="${LOG_ROOT:-experiments/logs_p2p_debug}" +RUN_ID="${RUN_ID:-$(date +%Y%m%d_%H%M%S)}" + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" +RUN_DIR="$REPO_DIR/$LOG_ROOT/$RUN_ID" +LATEST_LINK="$REPO_DIR/$LOG_ROOT/latest" + +mkdir -p "$RUN_DIR" +ln -sfn "$RUN_DIR" "$LATEST_LINK" + +if [ "$RUN_WITH_KVCACHED" = "1" ]; then + export ENABLE_KVCACHED=true + export KVCACHED_AUTOPATCH=1 +else + export ENABLE_KVCACHED=false + export KVCACHED_AUTOPATCH=0 +fi +export KVCACHED_LOG_LEVEL="${KVCACHED_LOG_LEVEL:-INFO}" +export KVCACHED_P2P_WAIT_LOG_INTERVAL_S="${KVCACHED_P2P_WAIT_LOG_INTERVAL_S:-5}" +export KVCACHED_P2P_TRACE_TENSORS="${KVCACHED_P2P_TRACE_TENSORS:-true}" +export PYTHONUNBUFFERED=1 + +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' + +declare -a CHILD_PIDS=() + +log_pass() { echo -e "${GREEN}[PASS]${NC} $1"; } +log_fail() { echo -e "${RED}[FAIL]${NC} $1"; } +log_info() { echo -e "${YELLOW}[INFO]${NC} $1"; } + +kill_port() { + local port=$1 + if command -v lsof >/dev/null 2>&1; then + lsof -ti "tcp:$port" | xargs -r kill 2>/dev/null || true + sleep 1 + lsof -ti "tcp:$port" | xargs -r kill -9 2>/dev/null || true + fi +} + +cleanup_stale() { + if [ "$CLEAN_STALE" != "1" ]; then + return 0 + fi + log_info "Cleaning stale local processes and ports for this harness..." + kill_port "$PREFILL_PORT" + kill_port "$DECODE_PORT" + kill_port "$PROXY_PORT" + kill_port "$PROXY_REGISTER_PORT" + kill_port "$PREFILL_KV_PORT" + kill_port "$DECODE_KV_PORT" + pkill -f "vllm serve" 2>/dev/null || true + pkill -f "VLLM::EngineCore" 2>/dev/null || true + sleep 2 +} + +summarize_debug_logs() { + log_info "========== P2P debug summary ==========" + for name in proxy prefill decode; do + local logfile="$RUN_DIR/$name.log" + if [ ! -f "$logfile" ]; then + continue + fi + echo "--- $name: $logfile ---" + grep -E \ + "kvcached p2p debug|P2pNcclEngine init|handle_request|REGISTER|request_id" \ + "$logfile" 2>/dev/null | tail -80 || true + grep -E \ + "Traceback|ERROR|WARNING|ncclCommInitRank" \ + "$logfile" 2>/dev/null | tail -80 || tail -80 "$logfile" || true + done + + local prefill_last decode_last + prefill_last=$(grep "kvcached p2p debug" "$RUN_DIR/prefill.log" 2>/dev/null | tail -1 || true) + decode_last=$(grep "kvcached p2p debug" "$RUN_DIR/decode.log" 2>/dev/null | tail -1 || true) + echo "--- last p2p event per side ---" + echo "prefill: ${prefill_last:-}" + echo "decode: ${decode_last:-}" + + if grep -q "recv_tensor still waiting" "$RUN_DIR/decode.log" 2>/dev/null; then + log_info "Classifier: decode reached recv_tensor and is waiting for a missing tensor_id." + fi + if grep -q "send_sync path" "$RUN_DIR/prefill.log" 2>/dev/null \ + && ! grep -q "nccl send begin" "$RUN_DIR/prefill.log" 2>/dev/null; then + log_info "Classifier: prefill entered send_sync but did not reach NCCL send." + fi + if grep -q "listener received" "$RUN_DIR/decode.log" 2>/dev/null; then + log_info "Classifier: decode listener received at least one ZMQ command." + fi +} + +cleanup() { + local status=${1:-0} + if [ "$status" != "0" ] && [ "$KEEP_ALIVE_ON_FAIL" = "1" ]; then + log_info "KEEP_ALIVE_ON_FAIL=1, leaving child processes running." + log_info "Logs: $RUN_DIR" + return 0 + fi + + log_info "Cleaning up child processes..." + for pid in "${CHILD_PIDS[@]}"; do + kill "$pid" 2>/dev/null || true + done + sleep 2 + for pid in "${CHILD_PIDS[@]}"; do + kill -9 "$pid" 2>/dev/null || true + done + wait 2>/dev/null || true + pkill -f "VLLM::EngineCore" 2>/dev/null || true +} + +on_exit() { + local status=$? + if [ "$status" != "0" ]; then + log_fail "Harness failed with exit status $status" + summarize_debug_logs || true + fi + cleanup "$status" + exit "$status" +} +trap on_exit EXIT + +wait_for_server() { + local port=$1 + local name=$2 + local logfile=$3 + local elapsed=0 + + log_info "Waiting for $name on port $port (timeout ${TIMEOUT_STARTUP}s)..." + while [ "$elapsed" -lt "$TIMEOUT_STARTUP" ]; do + if curl -s "http://localhost:$port/health" >/dev/null 2>&1; then + log_pass "$name is ready (${elapsed}s)" + return 0 + fi + if [ -f "$logfile" ] \ + && grep -q "Traceback (most recent call last)" "$logfile" 2>/dev/null; then + log_fail "$name crashed during startup. See $logfile" + tail -40 "$logfile" + return 1 + fi + if [ $((elapsed % 30)) -eq 0 ] && [ "$elapsed" -gt 0 ]; then + local last_line + last_line=$(tail -1 "$logfile" 2>/dev/null | cut -c1-100) + log_info " ...${elapsed}s elapsed. Last log: ${last_line:-}" + fi + sleep 2 + elapsed=$((elapsed + 2)) + done + log_fail "$name did not start within ${TIMEOUT_STARTUP}s. See $logfile" + tail -40 "$logfile" + return 1 +} + +wait_for_registration() { + local elapsed=0 + log_info "Waiting for proxy to register one prefill and one decode..." + while [ "$elapsed" -lt "$TIMEOUT_STARTUP" ]; do + if curl -s "http://localhost:$PROXY_PORT/health" | python3 -c ' +import json, sys +try: + data = json.load(sys.stdin) +except Exception: + sys.exit(1) +sys.exit(0 if data.get("prefill_count", 0) >= 1 and data.get("decode_count", 0) >= 1 else 1) +'; then + log_pass "Proxy registered prefill and decode (${elapsed}s)" + return 0 + fi + sleep 2 + elapsed=$((elapsed + 2)) + done + log_fail "Proxy did not register both instances. See $RUN_DIR/proxy.log" + tail -80 "$RUN_DIR/proxy.log" || true + return 1 +} + +phase0_setup() { + log_info "========== PHASE 0: Environment setup ==========" + cleanup_stale + + if ! command -v nvidia-smi >/dev/null 2>&1; then + log_fail "nvidia-smi not found. Run this on a GPU node." + return 1 + fi + local gpu_count + gpu_count=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) + if [ "$gpu_count" -lt 2 ]; then + log_fail "Need at least 2 GPUs, found $gpu_count" + return 1 + fi + log_pass "Found $gpu_count GPUs" + + if ! python3 -c "import vllm" 2>/dev/null; then + log_info "Installing vLLM==$VLLM_VERSION..." + pip install -q "vllm==$VLLM_VERSION" + fi + log_pass "vLLM $(python3 -c 'import vllm; print(vllm.__version__)')" + + if [ "$INSTALL_DEPS" = "1" ]; then + python3 - <<'PY' || pip install -q aiohttp msgpack pyzmq transformers huggingface_hub +import aiohttp # noqa: F401 +import msgpack # noqa: F401 +import zmq # noqa: F401 +PY + fi + + if [ "$SKIP_MODEL_DOWNLOAD" != "1" ]; then + if ! python3 -c \ + "from transformers import AutoConfig; AutoConfig.from_pretrained('$MODEL')" \ + 2>/dev/null; then + log_info "Downloading model $MODEL..." + HF_HUB_ENABLE_HF_TRANSFER=0 huggingface-cli download "$MODEL" --quiet + fi + log_pass "Model available" + fi + + if [ "$RUN_WITH_KVCACHED" = "1" ] && [ "$INSTALL_KVCACHED" = "1" ]; then + log_info "Installing kvcached from this checkout with the normal install path..." + pip uninstall -y kvcached >/dev/null 2>&1 || true + CUDA_HOME="${CUDA_HOME:-/usr/local/cuda}" \ + pip install -q "$REPO_DIR" --no-build-isolation + log_pass "kvcached installed from $REPO_DIR" + fi + + if [ "$RUN_WITH_KVCACHED" = "1" ]; then + python3 - <<'PY' +import inspect +from pathlib import Path +import site + +import kvcached.integration.vllm.autopatch as autopatch + +source = inspect.getsource(autopatch._patch_p2p_nccl_debug) +assert "P2pNcclConnector" in source +assert "P2pNcclEngine" in source +print(f"kvcached autopatch source: {autopatch.__file__}") + +site_dirs = site.getsitepackages() +user_site = site.getusersitepackages() +if user_site: + site_dirs.append(user_site) +pth_files = [ + Path(site_dir) / "kvcached_autopatch.pth" + for site_dir in site_dirs + if (Path(site_dir) / "kvcached_autopatch.pth").exists() +] +assert pth_files, "kvcached_autopatch.pth was not installed into site-packages" +print(f"kvcached autopatch pth: {pth_files[0]}") +PY + log_pass "P2P debug autopatch source and .pth hook are present" + else + log_info "RUN_WITH_KVCACHED=0: running plain vLLM P2P with kvcached disabled." + log_info "Existing kvcached installs are ignored because ENABLE_KVCACHED=false." + fi +} + +write_proxy() { + cat > "$RUN_DIR/p2p_debug_proxy.py" <<'PY' +import os +import threading +import time +import uuid + +import aiohttp +import msgpack +import zmq +from aiohttp import web + +prefill_instances = {} +decode_instances = {} +instances_lock = threading.Lock() +request_count = 0 +PING_TTL_SECONDS = 8 + + +def prune_locked(now=None): + now = now or time.time() + for instances in (prefill_instances, decode_instances): + for key, value in list(instances.items()): + if value["expires_at"] <= now: + print(f"EXPIRE HTTP:{key} ZMQ:{value['zmq_address']}", flush=True) + instances.pop(key, None) + + +def listen_for_register(bind_ip, register_port): + context = zmq.Context() + router_socket = context.socket(zmq.ROUTER) + router_socket.bind(f"tcp://{bind_ip}:{register_port}") + poller = zmq.Poller() + poller.register(router_socket, zmq.POLLIN) + print(f"REGISTER listener tcp://{bind_ip}:{register_port}", flush=True) + + while True: + socks = dict(poller.poll(1000)) + if router_socket not in socks: + continue + remote_address, message = router_socket.recv_multipart() + data = msgpack.loads(message) + role = data.get("type") + http_address = data.get("http_address") + zmq_address = data.get("zmq_address") + if role not in ("P", "D") or not http_address or not zmq_address: + print(f"REGISTER unexpected remote={remote_address!r} data={data}", flush=True) + continue + + with instances_lock: + target = prefill_instances if role == "P" else decode_instances + is_new = http_address not in target + target[http_address] = { + "zmq_address": zmq_address, + "expires_at": time.time() + PING_TTL_SECONDS, + } + prune_locked() + if is_new: + print( + f"REGISTER role={role} HTTP:{http_address} ZMQ:{zmq_address}", + flush=True, + ) + + +def choose_pair(): + global request_count + with instances_lock: + prune_locked() + prefill_list = list(prefill_instances.items()) + decode_list = list(decode_instances.items()) + if not prefill_list or not decode_list: + raise web.HTTPServiceUnavailable( + text="proxy has not registered both prefill and decode yet" + ) + prefill_http, prefill_data = prefill_list[request_count % len(prefill_list)] + decode_http, decode_data = decode_list[request_count % len(decode_list)] + request_count += 1 + return ( + prefill_http, + prefill_data["zmq_address"], + decode_http, + decode_data["zmq_address"], + request_count - 1, + ) + + +async def health(_request): + with instances_lock: + prune_locked() + return web.json_response( + { + "prefill_count": len(prefill_instances), + "decode_count": len(decode_instances), + "prefill": prefill_instances, + "decode": decode_instances, + } + ) + + +async def forward(session, url, payload, request_id): + headers = {"X-Request-Id": request_id} + api_key = os.environ.get("OPENAI_API_KEY") + if api_key: + headers["Authorization"] = f"Bearer {api_key}" + async with session.post(url=url, json=payload, headers=headers) as response: + body = await response.read() + content_type = response.headers.get("Content-Type", "application/json") + return response.status, content_type, body + + +async def handle_request(request): + original = await request.json() + prefill = dict(original) + prefill["max_tokens"] = 1 + if "max_completion_tokens" in prefill: + prefill["max_completion_tokens"] = 1 + prefill["stream"] = False + + ( + prefill_http, + prefill_zmq, + decode_http, + decode_zmq, + count, + ) = choose_pair() + request_id = ( + f"___prefill_addr_{prefill_zmq}___decode_addr_{decode_zmq}_" + f"{uuid.uuid4().hex}" + ) + print( + f"handle_request count={count} request_id={request_id} " + f"prefill_http={prefill_http} decode_http={decode_http}", + flush=True, + ) + + timeout = aiohttp.ClientTimeout(total=6 * 60 * 60) + async with aiohttp.ClientSession(timeout=timeout) as session: + prefill_status, _prefill_type, prefill_body = await forward( + session, + f"http://{prefill_http}{request.path}", + prefill, + request_id, + ) + print( + f"prefill_done request_id={request_id} status={prefill_status} " + f"bytes={len(prefill_body)}", + flush=True, + ) + if prefill_status >= 400: + return web.Response( + body=prefill_body, + status=prefill_status, + content_type="application/json", + ) + + decode_status, decode_type, decode_body = await forward( + session, + f"http://{decode_http}{request.path}", + original, + request_id, + ) + print( + f"decode_done request_id={request_id} status={decode_status} " + f"bytes={len(decode_body)}", + flush=True, + ) + return web.Response( + body=decode_body, + status=decode_status, + content_type=decode_type.split(";")[0], + ) + + +def main(): + bind_ip = os.environ.get("PROXY_BIND_IP", "0.0.0.0") + register_port = int(os.environ.get("PROXY_REGISTER_PORT", "30001")) + http_port = int(os.environ.get("PROXY_PORT", "10001")) + listener = threading.Thread( + target=listen_for_register, + args=(bind_ip, register_port), + daemon=True, + ) + listener.start() + + app = web.Application() + app.router.add_get("/health", health) + app.router.add_post("/v1/completions", handle_request) + app.router.add_post("/v1/chat/completions", handle_request) + web.run_app(app, host=bind_ip, port=http_port) + + +if __name__ == "__main__": + main() +PY +} + +make_kv_config() { + local role=$1 + local kv_port=$2 + local http_port=$3 + local buffer_size=$4 + python3 - "$role" "$kv_port" "$http_port" "$buffer_size" <<'PY' +import json +import os +import sys + +role, kv_port, http_port, buffer_size = sys.argv[1:5] +config = { + "kv_connector": "P2pNcclConnector", + "kv_role": role, + "kv_buffer_size": buffer_size, + "kv_port": str(kv_port), + "kv_connector_extra_config": { + "proxy_ip": os.environ["PROXY_IP"], + "proxy_port": os.environ["PROXY_REGISTER_PORT"], + "http_port": int(http_port), + "send_type": os.environ["P2P_SEND_TYPE"], + "nccl_num_channels": os.environ["P2P_NCCL_NUM_CHANNELS"], + "mem_pool_size_gb": os.environ["P2P_MEM_POOL_SIZE_GB"], + }, +} +print(json.dumps(config, separators=(",", ":"))) +PY +} + +start_proxy() { + write_proxy + log_info "Starting P2P proxy on http://localhost:$PROXY_PORT" + PROXY_BIND_IP="$PROXY_BIND_IP" \ + PROXY_REGISTER_PORT="$PROXY_REGISTER_PORT" \ + PROXY_PORT="$PROXY_PORT" \ + python3 "$RUN_DIR/p2p_debug_proxy.py" > "$RUN_DIR/proxy.log" 2>&1 & + CHILD_PIDS+=("$!") + wait_for_server "$PROXY_PORT" "Proxy" "$RUN_DIR/proxy.log" +} + +start_vllm_instance() { + local name=$1 + local gpu=$2 + local http_port=$3 + local kv_port=$4 + local role=$5 + local kv_buffer_size=$6 + local gpu_mem_util=$7 + local logfile="$RUN_DIR/$name.log" + local kv_config + kv_config=$(make_kv_config "$role" "$kv_port" "$http_port" "$kv_buffer_size") + + log_info "Starting $name on GPU $gpu, HTTP $http_port, KV $kv_port" + echo "$kv_config" > "$RUN_DIR/$name.kv_transfer_config.json" + ( + export CUDA_VISIBLE_DEVICES="$gpu" + export ENABLE_KVCACHED + export KVCACHED_AUTOPATCH + export KVCACHED_LOG_LEVEL + export KVCACHED_P2P_WAIT_LOG_INTERVAL_S + export KVCACHED_P2P_TRACE_TENSORS + exec vllm serve "$MODEL" \ + --host 0.0.0.0 \ + --port "$http_port" \ + --tensor-parallel-size 1 \ + --seed 1024 \ + --served-model-name "$SERVED_MODEL_NAME" \ + --dtype "$DTYPE" \ + --max-model-len "$MAX_MODEL_LEN" \ + --max-num-batched-tokens "$MAX_NUM_BATCHED_TOKENS" \ + --max-num-seqs "$MAX_NUM_SEQS" \ + --trust-remote-code \ + --gpu-memory-utilization "$gpu_mem_util" \ + --kv-transfer-config "$kv_config" \ + $EXTRA_VLLM_ARGS + ) > "$logfile" 2>&1 & + CHILD_PIDS+=("$!") +} + +make_payload() { + local prompt=$1 + python3 - "$SERVED_MODEL_NAME" "$prompt" "$MAX_TOKENS" <<'PY' +import json +import sys + +model, prompt, max_tokens = sys.argv[1], sys.argv[2], int(sys.argv[3]) +print(json.dumps({ + "model": model, + "prompt": prompt, + "max_tokens": max_tokens, + "temperature": 0, + "stream": False, +})) +PY +} + +send_request() { + local prompt=$1 + local label=$2 + local payload_file="$RUN_DIR/${label}_request.json" + local response_file="$RUN_DIR/${label}_response.json" + local status_file="$RUN_DIR/${label}_curl_status.txt" + + make_payload "$prompt" > "$payload_file" + log_info "Sending $label through proxy: $prompt" + + set +e + local http_code + http_code=$(curl -sS \ + --max-time "$TIMEOUT_REQUEST" \ + -o "$response_file" \ + -w "%{http_code}" \ + "http://localhost:$PROXY_PORT/v1/completions" \ + -H "Content-Type: application/json" \ + --data-binary "@$payload_file" 2>"$status_file") + local curl_status=$? + set -e + + if [ "$curl_status" -ne 0 ]; then + log_fail "$label failed or timed out after ${TIMEOUT_REQUEST}s" + cat "$status_file" || true + return 1 + fi + if [ "$http_code" != "200" ]; then + log_fail "$label returned HTTP $http_code" + cat "$response_file" || true + return 1 + fi + + python3 - "$response_file" <<'PY' +import json +import sys + +with open(sys.argv[1]) as f: + data = json.load(f) +text = data["choices"][0]["text"] +if not text: + raise SystemExit("empty completion text") +print("Completion:", text[:160].replace("\n", "\\n")) +PY + log_pass "$label completed" +} + +run_harness() { + log_info "Logs: $RUN_DIR" + if [ "$RUN_WITH_KVCACHED" = "1" ]; then + log_info "Mode: vLLM P2P with kvcached enabled and debug instrumentation" + else + log_info "Mode: plain vLLM P2P baseline, kvcached disabled" + fi + log_info "========== PHASE 1: Start proxy ==========" + start_proxy + + log_info "========== PHASE 2: Start P2P vLLM instances ==========" + start_vllm_instance \ + "prefill" "$PREFILL_GPU" "$PREFILL_PORT" "$PREFILL_KV_PORT" \ + "kv_producer" "$PREFILL_KV_BUFFER_SIZE" "$PREFILL_GPU_MEM_UTIL" + start_vllm_instance \ + "decode" "$DECODE_GPU" "$DECODE_PORT" "$DECODE_KV_PORT" \ + "kv_consumer" "$DECODE_KV_BUFFER_SIZE" "$DECODE_GPU_MEM_UTIL" + + wait_for_server "$PREFILL_PORT" "Prefill" "$RUN_DIR/prefill.log" + wait_for_server "$DECODE_PORT" "Decode" "$RUN_DIR/decode.log" + wait_for_registration + + log_info "========== PHASE 3: Send requests ==========" + send_request "The capital of France is" "request_1" + send_request "Two plus two equals" "request_2" + + summarize_debug_logs + log_pass "P2P debug harness completed" + log_info "Logs: $RUN_DIR" +} + +export PROXY_IP +export PROXY_REGISTER_PORT +export P2P_SEND_TYPE +export P2P_NCCL_NUM_CHANNELS +export P2P_MEM_POOL_SIZE_GB + +phase0_setup +run_harness diff --git a/experiments/11_vllm_p2p_nccl_direct.sh b/experiments/11_vllm_p2p_nccl_direct.sh new file mode 100755 index 00000000..52fa2a07 --- /dev/null +++ b/experiments/11_vllm_p2p_nccl_direct.sh @@ -0,0 +1,532 @@ +#!/usr/bin/env bash +# ============================================================================= +# Direct upstream vLLM P2P NCCL experiment +# +# Purpose: +# Validate P2pNcclConnector using the upstream vLLM example proxy and direct +# vllm serve processes, with kvcached disabled. This intentionally does not +# use the kvcached debug proxy from 10_p2p_nccl_debug.sh. +# +# Usage: +# ./experiments/11_vllm_p2p_nccl_direct.sh +# +# Useful overrides: +# MODEL=Qwen/Qwen2.5-1.5B-Instruct ./experiments/11_vllm_p2p_nccl_direct.sh +# TIMEOUT_REQUEST=300 ./experiments/11_vllm_p2p_nccl_direct.sh +# VLLM_DISABLE_REQUEST_ID_RANDOMIZATION=1 ./experiments/11_vllm_p2p_nccl_direct.sh +# DISABLE_REQUEST_ID_RANDOMIZATION=1 ./experiments/11_vllm_p2p_nccl_direct.sh +# KEEP_ALIVE_ON_FAIL=1 ./experiments/11_vllm_p2p_nccl_direct.sh +# +# Logs are saved to experiments/logs_vllm_p2p_direct//. +# ============================================================================= + +set -euo pipefail + +MODEL="${MODEL:-Qwen/Qwen2.5-1.5B-Instruct}" +VLLM_VERSION="${VLLM_VERSION:-0.19.0}" +VLLM_REPO_URL="${VLLM_REPO_URL:-https://github.com/vllm-project/vllm.git}" +VLLM_REPO_DIR="${VLLM_REPO_DIR:-$HOME/vllm-upstream}" +VLLM_REF="${VLLM_REF:-main}" +SYNC_VLLM_REPO="${SYNC_VLLM_REPO:-1}" + +DTYPE="${DTYPE:-float16}" +MAX_MODEL_LEN="${MAX_MODEL_LEN:-1024}" +MAX_NUM_BATCHED_TOKENS="${MAX_NUM_BATCHED_TOKENS:-$MAX_MODEL_LEN}" +MAX_NUM_SEQS="${MAX_NUM_SEQS:-32}" +MAX_TOKENS="${MAX_TOKENS:-20}" +PREFILL_GPU="${PREFILL_GPU:-0}" +DECODE_GPU="${DECODE_GPU:-1}" +PREFILL_HTTP_PORT="${PREFILL_HTTP_PORT:-20003}" +DECODE_HTTP_PORT="${DECODE_HTTP_PORT:-20005}" +PREFILL_KV_PORT="${PREFILL_KV_PORT:-21001}" +DECODE_KV_PORT="${DECODE_KV_PORT:-22001}" +PROXY_HTTP_PORT="${PROXY_HTTP_PORT:-10001}" +PROXY_REGISTER_PORT="${PROXY_REGISTER_PORT:-30001}" +PROXY_CONNECT_IP="${PROXY_CONNECT_IP:-0.0.0.0}" +P2P_SEND_TYPE="${P2P_SEND_TYPE:-PUT_ASYNC}" +P2P_NCCL_NUM_CHANNELS="${P2P_NCCL_NUM_CHANNELS:-16}" +PREFILL_KV_BUFFER_SIZE="${PREFILL_KV_BUFFER_SIZE:-1e1}" +DECODE_KV_BUFFER_SIZE="${DECODE_KV_BUFFER_SIZE:-8e9}" +PREFILL_GPU_MEM_UTIL="${PREFILL_GPU_MEM_UTIL:-0.8}" +DECODE_GPU_MEM_UTIL="${DECODE_GPU_MEM_UTIL:-0.7}" +EXTRA_VLLM_ARGS="${EXTRA_VLLM_ARGS:-}" + +INSTALL_DEPS="${INSTALL_DEPS:-1}" +INSTALL_VLLM="${INSTALL_VLLM:-1}" +SKIP_MODEL_DOWNLOAD="${SKIP_MODEL_DOWNLOAD:-0}" +CLEAN_STALE="${CLEAN_STALE:-1}" +KEEP_ALIVE_ON_FAIL="${KEEP_ALIVE_ON_FAIL:-0}" +DISABLE_REQUEST_ID_RANDOMIZATION="${DISABLE_REQUEST_ID_RANDOMIZATION:-0}" +TIMEOUT_STARTUP="${TIMEOUT_STARTUP:-300}" +TIMEOUT_REQUEST="${TIMEOUT_REQUEST:-300}" +LOG_ROOT="${LOG_ROOT:-experiments/logs_vllm_p2p_direct}" +RUN_ID="${RUN_ID:-$(date +%Y%m%d_%H%M%S)}" + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_DIR="$(cd "$SCRIPT_DIR/.." && pwd)" +RUN_DIR="$REPO_DIR/$LOG_ROOT/$RUN_ID" +LATEST_LINK="$REPO_DIR/$LOG_ROOT/latest" + +mkdir -p "$RUN_DIR" +ln -sfn "$RUN_DIR" "$LATEST_LINK" + +export ENABLE_KVCACHED=false +export KVCACHED_AUTOPATCH=0 +export PYTHONUNBUFFERED=1 +export OPENAI_API_KEY="${OPENAI_API_KEY:-EMPTY}" +if [ "$DISABLE_REQUEST_ID_RANDOMIZATION" = "1" ]; then + export VLLM_DISABLE_REQUEST_ID_RANDOMIZATION=1 +fi + +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' + +declare -a CHILD_PIDS=() + +log_pass() { echo -e "${GREEN}[PASS]${NC} $1"; } +log_fail() { echo -e "${RED}[FAIL]${NC} $1"; } +log_info() { echo -e "${YELLOW}[INFO]${NC} $1"; } + +kill_port() { + local port=$1 + if command -v lsof >/dev/null 2>&1; then + lsof -ti "tcp:$port" | xargs -r kill 2>/dev/null || true + sleep 1 + lsof -ti "tcp:$port" | xargs -r kill -9 2>/dev/null || true + fi +} + +cleanup_stale() { + if [ "$CLEAN_STALE" != "1" ]; then + return 0 + fi + + log_info "Cleaning stale local processes and ports for direct vLLM run..." + kill_port "$PROXY_HTTP_PORT" + kill_port "$PROXY_REGISTER_PORT" + kill_port "$PREFILL_HTTP_PORT" + kill_port "$DECODE_HTTP_PORT" + kill_port "$PREFILL_KV_PORT" + kill_port "$DECODE_KV_PORT" + pkill -f "disagg_proxy_p2p_nccl_xpyd.py" 2>/dev/null || true + pkill -f "vllm serve" 2>/dev/null || true + pkill -f "VLLM::EngineCore" 2>/dev/null || true + sleep 2 +} + +summarize_logs() { + log_info "========== direct vLLM P2P summary ==========" + for name in proxy prefill decode; do + local logfile="$RUN_DIR/$name.log" + if [ ! -f "$logfile" ]; then + continue + fi + echo "--- $name: $logfile ---" + grep -E \ + "Add \\[HTTP|handle_request|request_id|P2pNcclEngine|ncclCommInitRank|P2pNccl|PUT|GET|ERROR|Error|Traceback|WARNING" \ + "$logfile" 2>/dev/null | tail -120 || tail -80 "$logfile" || true + done +} + +cleanup() { + local status=${1:-0} + if [ "$status" != "0" ] && [ "$KEEP_ALIVE_ON_FAIL" = "1" ]; then + log_info "KEEP_ALIVE_ON_FAIL=1, leaving child processes running." + log_info "Logs: $RUN_DIR" + return 0 + fi + + log_info "Cleaning up child processes..." + for pid in "${CHILD_PIDS[@]}"; do + kill "$pid" 2>/dev/null || true + done + sleep 2 + for pid in "${CHILD_PIDS[@]}"; do + kill -9 "$pid" 2>/dev/null || true + done + wait 2>/dev/null || true + pkill -f "disagg_proxy_p2p_nccl_xpyd.py" 2>/dev/null || true + pkill -f "vllm serve" 2>/dev/null || true + pkill -f "VLLM::EngineCore" 2>/dev/null || true +} + +on_exit() { + local status=$? + if [ "$status" != "0" ]; then + log_fail "Direct vLLM experiment failed with exit status $status" + summarize_logs || true + fi + cleanup "$status" + exit "$status" +} +trap on_exit EXIT + +wait_for_http() { + local port=$1 + local name=$2 + local logfile=$3 + local path=${4:-/health} + local elapsed=0 + + log_info "Waiting for $name on port $port (timeout ${TIMEOUT_STARTUP}s)..." + while [ "$elapsed" -lt "$TIMEOUT_STARTUP" ]; do + if curl -s "http://localhost:$port$path" >/dev/null 2>&1; then + log_pass "$name is ready (${elapsed}s)" + return 0 + fi + if [ -f "$logfile" ] \ + && grep -q "Traceback (most recent call last)" "$logfile" 2>/dev/null; then + log_fail "$name crashed during startup. See $logfile" + tail -60 "$logfile" + return 1 + fi + if [ $((elapsed % 30)) -eq 0 ] && [ "$elapsed" -gt 0 ]; then + local last_line + last_line=$(tail -1 "$logfile" 2>/dev/null | cut -c1-120) + log_info " ...${elapsed}s elapsed. Last log: ${last_line:-}" + fi + sleep 2 + elapsed=$((elapsed + 2)) + done + log_fail "$name did not start within ${TIMEOUT_STARTUP}s. See $logfile" + tail -60 "$logfile" || true + return 1 +} + +wait_for_proxy_port() { + local elapsed=0 + local logfile="$RUN_DIR/proxy.log" + + log_info "Waiting for upstream proxy HTTP port $PROXY_HTTP_PORT..." + while [ "$elapsed" -lt "$TIMEOUT_STARTUP" ]; do + if python3 - "$PROXY_HTTP_PORT" <<'PY' >/dev/null 2>&1 +import socket +import sys + +with socket.create_connection(("127.0.0.1", int(sys.argv[1])), timeout=1): + pass +PY + then + log_pass "Upstream proxy HTTP port is open (${elapsed}s)" + return 0 + fi + if [ -f "$logfile" ] \ + && grep -q "Traceback (most recent call last)" "$logfile" 2>/dev/null; then + log_fail "Proxy crashed during startup. See $logfile" + tail -60 "$logfile" + return 1 + fi + sleep 2 + elapsed=$((elapsed + 2)) + done + log_fail "Proxy did not open port $PROXY_HTTP_PORT. See $logfile" + tail -60 "$logfile" || true + return 1 +} + +wait_for_registration() { + local elapsed=0 + local logfile="$RUN_DIR/proxy.log" + + log_info "Waiting for upstream proxy to log prefill/decode registration..." + while [ "$elapsed" -lt "$TIMEOUT_STARTUP" ]; do + if grep -q "HTTP:.*:${PREFILL_HTTP_PORT}.*ZMQ" "$logfile" 2>/dev/null \ + && grep -q "HTTP:.*:${DECODE_HTTP_PORT}.*ZMQ" "$logfile" 2>/dev/null; then + log_pass "Proxy registered prefill and decode (${elapsed}s)" + return 0 + fi + sleep 2 + elapsed=$((elapsed + 2)) + done + + log_fail "Proxy did not log both registrations. See $logfile" + tail -80 "$logfile" || true + return 1 +} + +phase0_setup() { + log_info "========== PHASE 0: Environment setup ==========" + cleanup_stale + + if ! command -v nvidia-smi >/dev/null 2>&1; then + log_fail "nvidia-smi not found. Run this on a GPU node." + return 1 + fi + local gpu_count + gpu_count=$(nvidia-smi --query-gpu=name --format=csv,noheader | wc -l) + if [ "$gpu_count" -lt 2 ]; then + log_fail "Need at least 2 GPUs, found $gpu_count" + return 1 + fi + log_pass "Found $gpu_count GPUs" + + if ! command -v git >/dev/null 2>&1; then + log_fail "git is required to fetch upstream vLLM examples" + return 1 + fi + + if ! python3 -c "import vllm" 2>/dev/null; then + if [ "$INSTALL_VLLM" != "1" ]; then + log_fail "vLLM is not installed and INSTALL_VLLM=0" + return 1 + fi + log_info "Installing vLLM==$VLLM_VERSION..." + pip install -q "vllm==$VLLM_VERSION" + fi + log_pass "vLLM $(python3 -c 'import vllm; print(vllm.__version__)')" + + if [ "$INSTALL_DEPS" = "1" ]; then + if ! python3 - <<'PY' +import aiohttp # noqa: F401 +import huggingface_hub # noqa: F401 +import msgpack # noqa: F401 +import quart # noqa: F401 +import transformers # noqa: F401 +import zmq # noqa: F401 +PY + then + log_info "Installing direct vLLM example dependencies..." + python3 -m pip install -q --ignore-installed blinker + python3 -m pip install -q \ + quart aiohttp msgpack pyzmq transformers huggingface_hub + fi + fi + + if [ "$SKIP_MODEL_DOWNLOAD" != "1" ]; then + if ! python3 -c \ + "from transformers import AutoConfig; AutoConfig.from_pretrained('$MODEL')" \ + 2>/dev/null; then + log_info "Downloading model $MODEL..." + HF_HUB_ENABLE_HF_TRANSFER=0 huggingface-cli download "$MODEL" --quiet + fi + log_pass "Model available" + fi + + log_info "kvcached disabled: ENABLE_KVCACHED=$ENABLE_KVCACHED KVCACHED_AUTOPATCH=$KVCACHED_AUTOPATCH" + log_info "VLLM_DISABLE_REQUEST_ID_RANDOMIZATION=${VLLM_DISABLE_REQUEST_ID_RANDOMIZATION:-0}" + + if [ "$PROXY_HTTP_PORT" != "10001" ] || [ "$PROXY_REGISTER_PORT" != "30001" ]; then + log_fail "The upstream proxy hardcodes HTTP 10001 and register 30001." + log_fail "Leave PROXY_HTTP_PORT=10001 and PROXY_REGISTER_PORT=30001 for this direct run." + return 1 + fi +} + +sync_upstream_vllm() { + log_info "========== PHASE 1: Fetch upstream vLLM example ==========" + if [ ! -d "$VLLM_REPO_DIR/.git" ]; then + log_info "Cloning $VLLM_REPO_URL ($VLLM_REF) into $VLLM_REPO_DIR..." + git clone --depth 1 --branch "$VLLM_REF" "$VLLM_REPO_URL" "$VLLM_REPO_DIR" + elif [ "$SYNC_VLLM_REPO" = "1" ]; then + log_info "Updating existing upstream vLLM checkout at $VLLM_REPO_DIR..." + git -C "$VLLM_REPO_DIR" fetch --depth 1 origin "$VLLM_REF" \ + || git -C "$VLLM_REPO_DIR" fetch origin "$VLLM_REF" + git -C "$VLLM_REPO_DIR" checkout -q FETCH_HEAD \ + || git -C "$VLLM_REPO_DIR" checkout -q "$VLLM_REF" + else + log_info "Using existing upstream vLLM checkout at $VLLM_REPO_DIR" + fi + + local candidate_dir + for candidate_dir in \ + "$VLLM_REPO_DIR/examples/disaggregated/p2p_nccl_xpyd" \ + "$VLLM_REPO_DIR/examples/online_serving/disaggregated_serving_p2p_nccl_xpyd"; do + if [ -f "$candidate_dir/disagg_proxy_p2p_nccl_xpyd.py" ]; then + EXAMPLE_DIR="$candidate_dir" + PROXY_SCRIPT="$EXAMPLE_DIR/disagg_proxy_p2p_nccl_xpyd.py" + break + fi + done + + if [ -z "${PROXY_SCRIPT:-}" ]; then + log_fail "Upstream proxy script not found in known vLLM example paths." + log_info "Nearby files:" + find "$VLLM_REPO_DIR/examples" -maxdepth 4 -type f \ + | grep -E "p2p|nccl|disagg_proxy" \ + | sort \ + | tail -50 || true + return 1 + fi + + git -C "$VLLM_REPO_DIR" rev-parse --short HEAD > "$RUN_DIR/vllm_example_commit.txt" + log_pass "Using upstream vLLM example commit $(cat "$RUN_DIR/vllm_example_commit.txt")" + log_info "Upstream example directory: $EXAMPLE_DIR" +} + +make_kv_config() { + local role=$1 + local kv_port=$2 + local http_port=$3 + local buffer_size=$4 + python3 - "$role" "$kv_port" "$http_port" "$buffer_size" <<'PY' +import json +import os +import sys + +role, kv_port, http_port, buffer_size = sys.argv[1:5] +config = { + "kv_connector": "P2pNcclConnector", + "kv_role": role, + "kv_buffer_size": buffer_size, + "kv_port": str(kv_port), + "kv_connector_extra_config": { + "proxy_ip": os.environ["PROXY_CONNECT_IP"], + "proxy_port": os.environ["PROXY_REGISTER_PORT"], + "http_port": int(http_port), + "send_type": os.environ["P2P_SEND_TYPE"], + "nccl_num_channels": os.environ["P2P_NCCL_NUM_CHANNELS"], + }, +} +print(json.dumps(config, separators=(",", ":"))) +PY +} + +start_upstream_proxy() { + log_info "========== PHASE 2: Start upstream vLLM proxy ==========" + log_info "Starting upstream proxy on http://localhost:$PROXY_HTTP_PORT" + ( + cd "$EXAMPLE_DIR" + exec python3 "$PROXY_SCRIPT" + ) > "$RUN_DIR/proxy.log" 2>&1 & + CHILD_PIDS+=("$!") + wait_for_proxy_port +} + +start_vllm_instance() { + local name=$1 + local gpu=$2 + local http_port=$3 + local kv_port=$4 + local role=$5 + local kv_buffer_size=$6 + local gpu_mem_util=$7 + local logfile="$RUN_DIR/$name.log" + local kv_config + kv_config=$(make_kv_config "$role" "$kv_port" "$http_port" "$kv_buffer_size") + + log_info "Starting direct vLLM $name on GPU $gpu, HTTP $http_port, KV $kv_port" + echo "$kv_config" > "$RUN_DIR/$name.kv_transfer_config.json" + ( + export CUDA_VISIBLE_DEVICES="$gpu" + export ENABLE_KVCACHED + export KVCACHED_AUTOPATCH + export VLLM_DISABLE_REQUEST_ID_RANDOMIZATION="${VLLM_DISABLE_REQUEST_ID_RANDOMIZATION:-0}" + exec vllm serve "$MODEL" \ + --enforce-eager \ + --host 0.0.0.0 \ + --port "$http_port" \ + --tensor-parallel-size 1 \ + --seed 1024 \ + --dtype "$DTYPE" \ + --max-model-len "$MAX_MODEL_LEN" \ + --max-num-batched-tokens "$MAX_NUM_BATCHED_TOKENS" \ + --max-num-seqs "$MAX_NUM_SEQS" \ + --trust-remote-code \ + --gpu-memory-utilization "$gpu_mem_util" \ + --kv-transfer-config "$kv_config" \ + $EXTRA_VLLM_ARGS + ) > "$logfile" 2>&1 & + CHILD_PIDS+=("$!") +} + +make_payload() { + local prompt=$1 + python3 - "$MODEL" "$prompt" "$MAX_TOKENS" <<'PY' +import json +import sys + +model, prompt, max_tokens = sys.argv[1], sys.argv[2], int(sys.argv[3]) +print(json.dumps({ + "model": model, + "prompt": prompt, + "max_tokens": max_tokens, + "temperature": 0, + "stream": False, +})) +PY +} + +send_request() { + local prompt=$1 + local label=$2 + local payload_file="$RUN_DIR/${label}_request.json" + local response_file="$RUN_DIR/${label}_response.json" + local status_file="$RUN_DIR/${label}_curl_status.txt" + + make_payload "$prompt" > "$payload_file" + log_info "Sending $label through upstream proxy: $prompt" + + set +e + local http_code + http_code=$(curl -sS \ + --max-time "$TIMEOUT_REQUEST" \ + -o "$response_file" \ + -w "%{http_code}" \ + "http://localhost:$PROXY_HTTP_PORT/v1/completions" \ + -H "Content-Type: application/json" \ + --data-binary "@$payload_file" 2>"$status_file") + local curl_status=$? + set -e + + if [ "$curl_status" -ne 0 ]; then + log_fail "$label failed or timed out after ${TIMEOUT_REQUEST}s" + cat "$status_file" || true + return 1 + fi + if [ "$http_code" != "200" ]; then + log_fail "$label returned HTTP $http_code" + cat "$response_file" || true + return 1 + fi + + python3 - "$response_file" <<'PY' +import json +import sys + +with open(sys.argv[1]) as f: + data = json.load(f) +text = data["choices"][0]["text"] +if not text: + raise SystemExit("empty completion text") +print("Completion:", text[:160].replace("\n", "\\n")) +PY + log_pass "$label completed" +} + +run_experiment() { + log_info "Logs: $RUN_DIR" + log_info "Mode: direct upstream vLLM P2P NCCL, kvcached disabled" + log_info "Upstream proxy script: $PROXY_SCRIPT" + + start_upstream_proxy + + log_info "========== PHASE 3: Start direct vLLM P2P instances ==========" + start_vllm_instance \ + "prefill" "$PREFILL_GPU" "$PREFILL_HTTP_PORT" "$PREFILL_KV_PORT" \ + "kv_producer" "$PREFILL_KV_BUFFER_SIZE" "$PREFILL_GPU_MEM_UTIL" + start_vllm_instance \ + "decode" "$DECODE_GPU" "$DECODE_HTTP_PORT" "$DECODE_KV_PORT" \ + "kv_consumer" "$DECODE_KV_BUFFER_SIZE" "$DECODE_GPU_MEM_UTIL" + + wait_for_http "$PREFILL_HTTP_PORT" "Prefill" "$RUN_DIR/prefill.log" + wait_for_http "$DECODE_HTTP_PORT" "Decode" "$RUN_DIR/decode.log" + wait_for_registration + + log_info "========== PHASE 4: Send request ==========" + send_request "The capital of France is" "request_1" + + summarize_logs + log_pass "Direct upstream vLLM P2P NCCL experiment completed" + log_info "Logs: $RUN_DIR" +} + +export PROXY_CONNECT_IP +export PROXY_REGISTER_PORT +export P2P_SEND_TYPE +export P2P_NCCL_NUM_CHANNELS + +phase0_setup +sync_upstream_vllm +run_experiment diff --git a/experiments/p2p_nccl_vllm_validation.md b/experiments/p2p_nccl_vllm_validation.md new file mode 100644 index 00000000..ba4e3183 --- /dev/null +++ b/experiments/p2p_nccl_vllm_validation.md @@ -0,0 +1,243 @@ +# P2P NCCL vLLM Baseline Validation + +## PR Description Draft + +This PR adds debug-only instrumentation and a reproducible harness for investigating +`P2pNcclConnector` hangs in PD disaggregation. + +No behavioral fix is included. The goal is to determine whether the observed hang is +caused by kvcached integration or by upstream vLLM P2P NCCL request/key behavior. + +## What This Adds + +- Diagnostic logging around `P2pNcclConnector` and `P2pNcclEngine`. +- A runnable 2-GPU P2P NCCL harness: + - kvcached-enabled mode for instrumented debugging. + - kvcached-disabled mode for plain vLLM baseline validation. +- Logs that show: + - producer and consumer request IDs + - generated `tensor_id = request_id#layer_name` + - ZMQ registration and `PUT`/`GET` flow + - NCCL send/recv begin/end + - consumer wait heartbeat when a tensor key is missing + +## Current Finding + +The observed hang is not at NCCL send, NCCL recv, or ZMQ ack. In the instrumented +run, producer sends all layer tensors successfully and decode receives entries in +`recv_store`. + +Decode then waits forever for a different `tensor_id`. + +Example producer key: + +```text +cmpl-___prefill_addr_172.23.0.2:21001___decode_addr_172.23.0.2:22001_-0-b5ce4b84#model.layers.0.self_attn.attn +``` + +Example consumer key: + +```text +cmpl-___prefill_addr_172.23.0.2:21001___decode_addr_172.23.0.2:22001_-0-a08a4814#model.layers.0.self_attn.attn +``` + +The embedded external request address prefix matches, but the vLLM internal +randomized request suffix differs between prefill and decode. + +## Validation Needed + +Before proposing a fix, run pure upstream vLLM P2P NCCL without kvcached/autopatch. + +Decision tree: + +- If plain vLLM hangs the same way, this is likely an upstream `P2pNcclConnector` + keying issue. +- If plain vLLM succeeds, kvcached integration is changing request identity, + scheduling, or transfer behavior. +- If `VLLM_DISABLE_REQUEST_ID_RANDOMIZATION=1` makes the run pass, that strongly + confirms randomized internal request ID mismatch as the root cause. + +## Scope + +This PR is intentionally observational only. + +No layout changes, tensor reshaping, `.contiguous()`, timeout, exception, or +production fix is included. + +## Run 1: Same Harness, kvcached Disabled + +This keeps our exact proxy/topology/request shape while disabling kvcached. It is the +fastest controlled baseline. + +```bash +cd /root/kvcached +git pull + +RUN_WITH_KVCACHED=0 \ +TIMEOUT_REQUEST=300 \ +./experiments/10_p2p_nccl_debug.sh +``` + +Expected notes: + +- No `kvcached p2p debug` lines should appear. +- The script should skip kvcached install/autopatch checks. +- If this hangs, the issue is likely reproducible in plain vLLM P2P NCCL. +- If this succeeds, compare against the kvcached-enabled run. + +## Run 2: Same Harness, Disable vLLM Request ID Randomization + +This tests the current leading hypothesis directly. + +```bash +cd /root/kvcached + +RUN_WITH_KVCACHED=0 \ +VLLM_DISABLE_REQUEST_ID_RANDOMIZATION=1 \ +TIMEOUT_REQUEST=300 \ +./experiments/10_p2p_nccl_debug.sh +``` + +Expected interpretation: + +- If Run 1 hangs and Run 2 passes, the internal randomized request suffix is almost + certainly the problem. +- If both hang, the problem is deeper than request ID randomization. +- If both pass, the kvcached-enabled debug run is changing behavior and needs a + narrower comparison. + +## Run 3: Official vLLM P2P NCCL Example + +Use this as a sanity check that we are not accidentally testing only our harness. +The official example is documented here: + + + +The example source lives here: + + + +Automated local wrapper: + +```bash +cd /root/kvcached +git pull + +TIMEOUT_REQUEST=300 \ +./experiments/11_vllm_p2p_nccl_direct.sh +``` + +Repeat with request ID randomization disabled: + +```bash +cd /root/kvcached + +DISABLE_REQUEST_ID_RANDOMIZATION=1 \ +TIMEOUT_REQUEST=300 \ +./experiments/11_vllm_p2p_nccl_direct.sh +``` + +That wrapper uses the upstream vLLM proxy from the cloned vLLM repo and direct +`vllm serve` producer/decode processes. It does not use the kvcached debug proxy, +and it sends one small request instead of running the upstream benchmark. + +Manual equivalent: + +Setup: + +```bash +cd /root +git clone https://github.com/vllm-project/vllm.git vllm-upstream +cd /root/vllm-upstream + +pip install -U "vllm==0.19.0" quart pyzmq msgpack aiohttp pandas datasets +``` + +The upstream script currently checks `HF_TOKEN`, even for models that may not need +auth. If it refuses to start, export a token: + +```bash +export HF_TOKEN=hf_... +``` + +Run a 1P1D A100 baseline: + +```bash +cd /root/vllm-upstream/examples/disaggregated/p2p_nccl_xpyd + +MODEL=Qwen/Qwen2.5-1.5B-Instruct \ +PREFILL_GPUS=0 \ +DECODE_GPUS=1 \ +PREFILL_PORTS=20003 \ +DECODE_PORTS=20005 \ +PROXY_PORT=30001 \ +TIMEOUT_SECONDS=1200 \ +bash disagg_example_p2p_nccl_xpyd.sh +``` + +In another shell, send a request through the proxy: + +```bash +curl --max-time 300 -sS http://localhost:10001/v1/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "Qwen/Qwen2.5-1.5B-Instruct", + "prompt": "The capital of France is", + "max_tokens": 16, + "temperature": 0 + }' +``` + +Then repeat with request ID randomization disabled: + +```bash +cd /root/vllm-upstream/examples/disaggregated/p2p_nccl_xpyd + +VLLM_DISABLE_REQUEST_ID_RANDOMIZATION=1 \ +MODEL=Qwen/Qwen2.5-1.5B-Instruct \ +PREFILL_GPUS=0 \ +DECODE_GPUS=1 \ +PREFILL_PORTS=20003 \ +DECODE_PORTS=20005 \ +PROXY_PORT=30001 \ +TIMEOUT_SECONDS=1200 \ +bash disagg_example_p2p_nccl_xpyd.sh +``` + +## What To Capture + +For the kvcached harness runs, capture the final summary and the log directory: + +```bash +ls -ltr /root/kvcached/experiments/logs_p2p_debug +``` + +For the upstream vLLM example, capture: + +```bash +tail -n 200 proxy.log +tail -n 200 prefill1.log +tail -n 200 decode1.log +``` + +Also capture request ID randomization state: + +```bash +python3 - <<'PY' +import os +print("VLLM_DISABLE_REQUEST_ID_RANDOMIZATION=", os.getenv("VLLM_DISABLE_REQUEST_ID_RANDOMIZATION")) +PY +``` + +## Expected Root-Cause Signal + +The strongest confirmation is: + +1. Plain vLLM P2P NCCL hangs. +2. The same run passes with `VLLM_DISABLE_REQUEST_ID_RANDOMIZATION=1`. +3. The kvcached debug logs show producer and consumer using the same embedded + external address prefix but different internal randomized suffixes. + +If that happens, the next discussion should be about how to key P2P transfer tensors +by a stable transfer request ID rather than the per-engine randomized internal vLLM +request ID. diff --git a/kvcached/integration/vllm/autopatch.py b/kvcached/integration/vllm/autopatch.py index ea31c3a4..33fbcb04 100644 --- a/kvcached/integration/vllm/autopatch.py +++ b/kvcached/integration/vllm/autopatch.py @@ -2,7 +2,9 @@ # SPDX-License-Identifier: Apache-2.0 import os +import time import types +from typing import Any from wrapt.importer import when_imported @@ -27,8 +29,273 @@ def _env_enabled() -> bool: return os.getenv("KVCACHED_AUTOPATCH", "false").lower() in ("true", "1") +def _truthy_env(name: str, default: str = "false") -> bool: + return os.getenv(name, default).lower() in ("true", "1", "yes", "on") + + +def _p2p_trace_tensors_enabled() -> bool: + return _truthy_env("KVCACHED_P2P_TRACE_TENSORS", "true") + + +def _p2p_wait_log_interval_s() -> float: + raw = os.getenv("KVCACHED_P2P_WAIT_LOG_INTERVAL_S", "5") + try: + interval = float(raw) + except ValueError: + logger.warning( + "kvcached p2p debug: invalid KVCACHED_P2P_WAIT_LOG_INTERVAL_S=%r; using 5s", + raw, + ) + return 5.0 + return interval if interval > 0 else 5.0 + + +def _safe_len(value: Any) -> Any: + try: + return len(value) + except Exception: + return "unknown" + + +def _safe_getattr(value: Any, name: str, default: Any = None) -> Any: + try: + return getattr(value, name, default) + except Exception: + return default + + +def _safe_call(value: Any, name: str) -> Any: + method = _safe_getattr(value, name) + if method is None: + return "unavailable" + try: + return method() + except Exception as exc: + return f"error:{type(exc).__name__}" + + +def _p2p_tensor_summary(tensor: Any) -> str: + if tensor is None: + return "tensor=None" + if not _p2p_trace_tensors_enabled(): + return "tensor=" + + shape = _safe_getattr(tensor, "shape", "unavailable") + dtype = _safe_getattr(tensor, "dtype", "unavailable") + device = _safe_getattr(tensor, "device", "unavailable") + numel = _safe_call(tensor, "numel") + contiguous = _safe_call(tensor, "is_contiguous") + stride = _safe_call(tensor, "stride") + return ( + f"shape={shape} dtype={dtype} device={device} " + f"numel={numel} contiguous={contiguous} stride={stride}" + ) + + +def _p2p_block_count(block_ids: Any) -> Any: + if block_ids is None: + return 0 + return _safe_len(block_ids) + + +def _p2p_request_summary(request: Any) -> str: + request_id = _safe_getattr(request, "request_id", _safe_getattr(request, "req_id", "?")) + block_ids = _safe_getattr(request, "block_ids", None) + num_tokens = _safe_getattr(request, "num_tokens", "?") + if isinstance(block_ids, (list, tuple)) and block_ids and isinstance(block_ids[0], list): + block_count = [len(group) for group in block_ids] + else: + block_count = _p2p_block_count(block_ids) + return f"request_id={request_id} tokens={num_tokens} blocks={block_count}" + + +def _p2p_meta_summary(metadata: Any) -> str: + requests = _safe_getattr(metadata, "requests", []) + return "[" + "; ".join(_p2p_request_summary(req) for req in requests) + "]" + + +def _p2p_role(self: Any) -> str: + if _safe_getattr(self, "is_producer", False): + return "producer" + return "consumer" + + +def _p2p_parse_remote_address(connector: Any, request_id: str, is_prefill: bool) -> str: + try: + ip, port = connector.parse_request_id(request_id, is_prefill) + rank = int(_safe_getattr(connector, "_rank", 0)) + return f"{ip}:{port + rank}" + except Exception as exc: + return f"parse-error:{type(exc).__name__}:{exc}" + + +def _p2p_expected_shape(layer: Any, block_ids: Any, attn_metadata: Any) -> Any: + shape = _safe_getattr(layer, "shape") + if shape is None: + return "unknown" + + block_count = _p2p_block_count(block_ids) + try: + from vllm.model_executor.layers.attention.mla_attention import MLACommonMetadata + + is_mla = isinstance(attn_metadata, MLACommonMetadata) + except Exception: + is_mla = False + + try: + if is_mla or shape[1] == 2: + return (block_count, *tuple(shape[1:])) + if shape[0] == 2: + return (shape[0], block_count, *tuple(shape[2:])) + except Exception: + return "unknown" + return "unsupported-layout" + + +class _DebugSocketProxy: + """Log P2P ZMQ traffic while delegating all behavior to the real socket.""" + + def __init__(self, socket: Any, label: str, socket_kind: str) -> None: + self._socket = socket + self._label = label + self._socket_kind = socket_kind + + def __getattr__(self, name: str) -> Any: + return getattr(self._socket, name) + + def send(self, *args: Any, **kwargs: Any) -> Any: + logger.info( + "kvcached p2p debug: socket send begin kind=%s label=%s bytes=%s", + self._socket_kind, + self._label, + _safe_len(args[0]) if args else "unknown", + ) + result = self._socket.send(*args, **kwargs) + logger.info( + "kvcached p2p debug: socket send end kind=%s label=%s", + self._socket_kind, + self._label, + ) + return result + + def recv(self, *args: Any, **kwargs: Any) -> Any: + logger.info( + "kvcached p2p debug: socket recv begin kind=%s label=%s", + self._socket_kind, + self._label, + ) + result = self._socket.recv(*args, **kwargs) + logger.info( + "kvcached p2p debug: socket recv end kind=%s label=%s bytes=%s", + self._socket_kind, + self._label, + _safe_len(result), + ) + return result + + def send_multipart(self, *args: Any, **kwargs: Any) -> Any: + parts = args[0] if args else [] + logger.info( + "kvcached p2p debug: socket send_multipart begin kind=%s label=%s parts=%s", + self._socket_kind, + self._label, + _safe_len(parts), + ) + result = self._socket.send_multipart(*args, **kwargs) + logger.info( + "kvcached p2p debug: socket send_multipart end kind=%s label=%s", + self._socket_kind, + self._label, + ) + return result + + def recv_multipart(self, *args: Any, **kwargs: Any) -> Any: + logger.info( + "kvcached p2p debug: socket recv_multipart begin kind=%s label=%s", + self._socket_kind, + self._label, + ) + result = self._socket.recv_multipart(*args, **kwargs) + _log_p2p_multipart_payload(self._socket_kind, self._label, result) + return result + + +def _wrap_debug_socket(socket: Any, label: str, socket_kind: str) -> Any: + if isinstance(socket, _DebugSocketProxy): + return socket + return _DebugSocketProxy(socket, label, socket_kind) + + +def _log_p2p_multipart_payload(socket_kind: str, label: str, result: Any) -> None: + remote = "unknown" + cmd = "unknown" + tensor_id = None + shape = None + dtype = None + try: + remote, message = result + if isinstance(remote, bytes): + remote = remote.decode() + import msgpack + + data = msgpack.loads(message) + cmd = data.get("cmd", "unknown") + tensor_id = data.get("tensor_id") + shape = data.get("shape") + dtype = data.get("dtype") + except Exception as exc: + cmd = f"decode-error:{type(exc).__name__}" + + logger.info( + "kvcached p2p debug: listener received kind=%s label=%s remote=%s " + "cmd=%s tensor_id=%s shape=%s dtype=%s", + socket_kind, + label, + remote, + cmd, + tensor_id, + shape, + dtype, + ) + + +def _wrap_router_socket_recv_multipart(engine: Any) -> None: + if getattr(engine, "__kvcached_p2p_router_recv_wrapped__", False): + return + + socket = _safe_getattr(engine, "router_socket") + original_recv_multipart = _safe_getattr(socket, "recv_multipart") + if original_recv_multipart is None: + return + + label = str(_safe_getattr(engine, "zmq_address", "unknown")) + + def _debug_recv_multipart(*args: Any, **kwargs: Any) -> Any: + logger.info( + "kvcached p2p debug: socket recv_multipart begin kind=router label=%s", + label, + ) + result = original_recv_multipart(*args, **kwargs) + _log_p2p_multipart_payload("router", label, result) + return result + + try: + setattr(socket, "recv_multipart", _debug_recv_multipart) + except Exception as exc: + logger.warning( + "kvcached p2p debug: could not wrap router recv_multipart " + "for listener command logs: %s", + exc, + ) + setattr(engine, "__kvcached_p2p_router_recv_wrapped__", True) + + @when_imported("vllm") def _patch_vllm(_vllm: types.ModuleType) -> None: + # Diagnostic-only P2P NCCL instrumentation is intentionally unconditional + # in this debug branch so hangs always leave breadcrumbs. + _patch_p2p_nccl_debug() + if not _env_enabled(): logger.debug("Disabled by KVCACHED_AUTOPATCH") return @@ -52,3 +319,322 @@ def _patch_vllm(_vllm: types.ModuleType) -> None: # Log results log_patch_results("vllm", results) + + +def _patch_p2p_nccl_debug() -> None: + """Patch P2pNcclConnector/P2pNcclEngine with debug-only logging wrappers.""" + try: + from vllm.distributed.kv_transfer.kv_connector.v1.p2p.p2p_nccl_connector import ( + P2pNcclConnector, + ) + from vllm.distributed.kv_transfer.kv_connector.v1.p2p.p2p_nccl_engine import ( + P2pNcclEngine, + ) + except ImportError as exc: + logger.info("kvcached p2p debug: P2pNcclConnector unavailable: %s", exc) + return + + _patch_p2p_connector_debug(P2pNcclConnector) + _patch_p2p_engine_debug(P2pNcclEngine) + logger.info("kvcached p2p debug: patched P2pNcclConnector/P2pNcclEngine") + + +def _patch_p2p_connector_debug(P2pNcclConnector: Any) -> None: + if getattr(P2pNcclConnector, "__kvcached_p2p_debug_patched__", False): + return + + original_build_connector_meta = P2pNcclConnector.build_connector_meta + original_save_kv_layer = P2pNcclConnector.save_kv_layer + original_start_load_kv = P2pNcclConnector.start_load_kv + + def _debug_build_connector_meta(self: Any, scheduler_output: Any, *args: Any, + **kwargs: Any) -> Any: + new_reqs = _safe_getattr(scheduler_output, "scheduled_new_reqs", []) + cached_reqs = _safe_getattr(scheduler_output, "scheduled_cached_reqs", None) + cached_req_ids = _safe_getattr(cached_reqs, "req_ids", []) + logger.info( + "kvcached p2p debug: build_connector_meta start role=%s " + "scheduled_new=%s scheduled_cached=%s requests_need_load=%s chunked=%s", + _p2p_role(self), + [_safe_getattr(req, "req_id", "?") for req in new_reqs], + list(cached_req_ids) if cached_req_ids is not None else [], + list(_safe_getattr(self, "_requests_need_load", {}).keys()), + list(_safe_getattr(self, "chunked_prefill", {}).keys()), + ) + meta = original_build_connector_meta(self, scheduler_output, *args, **kwargs) + requests = _safe_getattr(meta, "requests", []) + logger.info( + "kvcached p2p debug: build_connector_meta end role=%s meta_count=%s meta=%s", + _p2p_role(self), + _safe_len(requests), + _p2p_meta_summary(meta), + ) + return meta + + def _debug_save_kv_layer(self: Any, layer_name: str, kv_layer: Any, attn_metadata: Any, + *args: Any, **kwargs: Any) -> Any: + metadata = None + try: + metadata = self._get_connector_metadata() + except Exception as exc: + logger.info( + "kvcached p2p debug: save_kv_layer metadata unavailable " + "layer=%s error=%s", + layer_name, + exc, + ) + + for request in _safe_getattr(metadata, "requests", []): + request_id = _safe_getattr(request, "request_id", "?") + block_ids = _safe_getattr(request, "block_ids", None) + logger.info( + "kvcached p2p debug: save_kv_layer before-send role=%s " + "tensor_id=%s#%s layer=%s remote_address=%s block_count=%s " + "expected_send_shape=%s kv_layer=%s", + _p2p_role(self), + request_id, + layer_name, + layer_name, + _p2p_parse_remote_address(self, request_id, True), + _p2p_block_count(block_ids), + _p2p_expected_shape(kv_layer, block_ids, attn_metadata), + _p2p_tensor_summary(kv_layer), + ) + + result = original_save_kv_layer( + self, layer_name, kv_layer, attn_metadata, *args, **kwargs + ) + logger.info( + "kvcached p2p debug: save_kv_layer end role=%s layer=%s", + _p2p_role(self), + layer_name, + ) + return result + + def _debug_start_load_kv(self: Any, forward_context: Any, *args: Any, + **kwargs: Any) -> Any: + try: + metadata = self._get_connector_metadata() + except Exception as exc: + metadata = None + logger.info("kvcached p2p debug: start_load_kv metadata unavailable: %s", exc) + + no_compile_layers = _safe_getattr(forward_context, "no_compile_layers", {}) or {} + for request in _safe_getattr(metadata, "requests", []): + request_id = _safe_getattr(request, "request_id", "?") + block_ids = _safe_getattr(request, "block_ids", None) + remote_address = _p2p_parse_remote_address(self, request_id, False) + for layer_name, layer in no_compile_layers.items(): + kv_cache = _safe_getattr(layer, "kv_cache") + if kv_cache is None: + continue + logger.info( + "kvcached p2p debug: start_load_kv waiting role=%s " + "tensor_id=%s#%s layer=%s remote_address=%s block_count=%s " + "target_kv_cache=%s", + _p2p_role(self), + request_id, + layer_name, + layer_name, + remote_address, + _p2p_block_count(block_ids), + _p2p_tensor_summary(kv_cache), + ) + + result = original_start_load_kv(self, forward_context, *args, **kwargs) + logger.info("kvcached p2p debug: start_load_kv end role=%s", _p2p_role(self)) + return result + + P2pNcclConnector.build_connector_meta = _debug_build_connector_meta + P2pNcclConnector.save_kv_layer = _debug_save_kv_layer + P2pNcclConnector.start_load_kv = _debug_start_load_kv + P2pNcclConnector.__kvcached_p2p_debug_patched__ = True + + +def _patch_p2p_engine_debug(P2pNcclEngine: Any) -> None: + if getattr(P2pNcclEngine, "__kvcached_p2p_debug_patched__", False): + return + + original_init = P2pNcclEngine.__init__ + original_create_connect = P2pNcclEngine.create_connect + original_send_tensor = P2pNcclEngine.send_tensor + original_recv_tensor = P2pNcclEngine.recv_tensor + original_listen_for_requests = P2pNcclEngine.listen_for_requests + original_send_sync = P2pNcclEngine.send_sync + original_send = P2pNcclEngine.send + original_recv = P2pNcclEngine.recv + + def _debug_init(self: Any, *args: Any, **kwargs: Any) -> None: + original_init(self, *args, **kwargs) + logger.info( + "kvcached p2p debug: engine init rank=%s local_rank=%s " + "zmq_address=%s send_type=%s buffer_threshold=%s", + _safe_getattr(self, "rank", "?"), + _safe_getattr(self, "local_rank", "?"), + _safe_getattr(self, "zmq_address", "?"), + _safe_getattr(self, "send_type", "?"), + _safe_getattr(self, "buffer_size_threshold", "?"), + ) + + def _debug_create_connect(self: Any, remote_address: str | None = None) -> Any: + logger.info( + "kvcached p2p debug: create_connect begin local=%s remote=%s known=%s", + _safe_getattr(self, "zmq_address", "?"), + remote_address, + remote_address in _safe_getattr(self, "socks", {}), + ) + result = original_create_connect(self, remote_address) + if remote_address is not None and remote_address in self.socks: + self.socks[remote_address] = _wrap_debug_socket( + self.socks[remote_address], str(remote_address), "dealer" + ) + result = (self.socks[remote_address], self.comms[remote_address]) + logger.info( + "kvcached p2p debug: create_connect end local=%s remote=%s comm_known=%s", + _safe_getattr(self, "zmq_address", "?"), + remote_address, + remote_address in _safe_getattr(self, "comms", {}), + ) + return result + + def _debug_send_tensor(self: Any, tensor_id: str, tensor: Any, + remote_address: str | None = None) -> Any: + logger.info( + "kvcached p2p debug: send_tensor enter local=%s send_type=%s " + "tensor_id=%s remote=%s %s", + _safe_getattr(self, "zmq_address", "?"), + _safe_getattr(self, "send_type", "?"), + tensor_id, + remote_address, + _p2p_tensor_summary(tensor), + ) + result = original_send_tensor(self, tensor_id, tensor, remote_address) + logger.info( + "kvcached p2p debug: send_tensor exit local=%s tensor_id=%s result=%s", + _safe_getattr(self, "zmq_address", "?"), + tensor_id, + result, + ) + return result + + def _debug_recv_tensor(self: Any, tensor_id: str, + remote_address: str | None = None) -> Any: + send_type = _safe_getattr(self, "send_type", "?") + logger.info( + "kvcached p2p debug: recv_tensor enter local=%s send_type=%s " + "tensor_id=%s remote=%s", + _safe_getattr(self, "zmq_address", "?"), + send_type, + tensor_id, + remote_address, + ) + if send_type in ("PUT", "PUT_ASYNC"): + start = time.monotonic() + interval = _p2p_wait_log_interval_s() + with self.recv_store_cv: + while tensor_id not in self.recv_store: + logger.info( + "kvcached p2p debug: recv_tensor waiting local=%s " + "tensor_id=%s remote=%s recv_store_size=%s interval_s=%s", + _safe_getattr(self, "zmq_address", "?"), + tensor_id, + remote_address, + _safe_len(self.recv_store), + interval, + ) + self.recv_store_cv.wait(timeout=interval) + if tensor_id not in self.recv_store: + logger.warning( + "kvcached p2p debug: recv_tensor still waiting local=%s " + "tensor_id=%s remote=%s elapsed_s=%.3f recv_store_size=%s", + _safe_getattr(self, "zmq_address", "?"), + tensor_id, + remote_address, + time.monotonic() - start, + _safe_len(self.recv_store), + ) + + result = original_recv_tensor(self, tensor_id, remote_address) + logger.info( + "kvcached p2p debug: recv_tensor exit local=%s tensor_id=%s %s", + _safe_getattr(self, "zmq_address", "?"), + tensor_id, + _p2p_tensor_summary(result), + ) + return result + + def _debug_listen_for_requests(self: Any) -> Any: + logger.info( + "kvcached p2p debug: listen_for_requests start local=%s", + _safe_getattr(self, "zmq_address", "?"), + ) + _wrap_router_socket_recv_multipart(self) + return original_listen_for_requests(self) + + def _debug_send_sync(self: Any, item: Any) -> Any: + tensor_id = _safe_getattr(item, "tensor_id", "?") + remote_address = _safe_getattr(item, "remote_address", "?") + logger.info( + "kvcached p2p debug: send_sync begin local=%s tensor_id=%s " + "remote=%s %s", + _safe_getattr(self, "zmq_address", "?"), + tensor_id, + remote_address, + _p2p_tensor_summary(_safe_getattr(item, "tensor")), + ) + logger.info( + "kvcached p2p debug: send_sync path local=%s tensor_id=%s " + "if create_connect ends and nccl send does not begin, likely waiting for PUT ack", + _safe_getattr(self, "zmq_address", "?"), + tensor_id, + ) + result = original_send_sync(self, item) + logger.info( + "kvcached p2p debug: send_sync end local=%s tensor_id=%s result=%s", + _safe_getattr(self, "zmq_address", "?"), + tensor_id, + result, + ) + return result + + def _debug_send(self: Any, comm: Any, tensor: Any, dst: int, + stream: Any = None) -> Any: + logger.info( + "kvcached p2p debug: nccl send begin local=%s dst=%s %s", + _safe_getattr(self, "zmq_address", "?"), + dst, + _p2p_tensor_summary(tensor), + ) + result = original_send(self, comm, tensor, dst, stream) + logger.info( + "kvcached p2p debug: nccl send end local=%s dst=%s", + _safe_getattr(self, "zmq_address", "?"), + dst, + ) + return result + + def _debug_recv(self: Any, comm: Any, tensor: Any, src: int, + stream: Any = None) -> Any: + logger.info( + "kvcached p2p debug: nccl recv begin local=%s src=%s %s", + _safe_getattr(self, "zmq_address", "?"), + src, + _p2p_tensor_summary(tensor), + ) + result = original_recv(self, comm, tensor, src, stream) + logger.info( + "kvcached p2p debug: nccl recv end local=%s src=%s", + _safe_getattr(self, "zmq_address", "?"), + src, + ) + return result + + P2pNcclEngine.__init__ = _debug_init + P2pNcclEngine.create_connect = _debug_create_connect + P2pNcclEngine.send_tensor = _debug_send_tensor + P2pNcclEngine.recv_tensor = _debug_recv_tensor + P2pNcclEngine.listen_for_requests = _debug_listen_for_requests + P2pNcclEngine.send_sync = _debug_send_sync + P2pNcclEngine.send = _debug_send + P2pNcclEngine.recv = _debug_recv + P2pNcclEngine.__kvcached_p2p_debug_patched__ = True