Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 113 additions & 15 deletions tools/pgo_bolt/train_pgo.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
# brokers) into one file.

ICEBERG_SCHEMA = """

syntax = "proto3";

import "google/protobuf/timestamp.proto";
Expand All @@ -35,22 +34,62 @@
google.protobuf.Timestamp ts = 3;
}
"""

ICEBERG_SAMPLE_PAYLOAD = b"\n\x1fhello my name is protobuf shady\x10\xb9`\x1a\x0b\x08\xf4\xf4\xb7\xcb\x06\x10\xc0\xb1\xc3v"
ICEBERG_TOPIC_NAME = "iceberg-topic"
ICEBERG_TOPIC_NAME = "iceberg-protobuf-topic"

AVRO_SCHEMA = """{
"type": "record",
"name": "Simple",
"fields": [
{"name": "name", "type": "string"},
{"name": "id", "type": "int"},
{"name": "ts", "type": {"type": "long", "logicalType": "timestamp-micros"}}
]
}"""
AVRO_TOPIC_NAME = "iceberg-avro-topic"
AVRO_SAMPLE_PAYLOAD = json.dumps(
{"name": "hello my name is avro shady", "id": 24680, "ts": 1625079045123456}
).encode("utf8")

JSON_SCHEMA = """{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"title": "Simple",
"properties": {
"name": {"type": "string"},
"id": {"type": "integer"},
"ts": {"type": "integer"},
"labels": {"type": "array", "items": {"type": "string"}}
},
"required": ["name", "id", "ts", "labels"]
}"""
JSON_TOPIC_NAME = "iceberg-json-topic"
JSON_SAMPLE_PAYLOAD = json.dumps(
{
"name": "hello my name is json shady",
"id": 13579,
"ts": 1625079045123456,
"labels": ["one", "two", "three"],
}
).encode("utf8")


async def setup_iceberg_schema_registry_and_topic(
args: argparse.Namespace, tmpdir: Path
async def setup_schema_and_topic(
args: argparse.Namespace,
tmpdir: Path,
topic_name: str,
schema: str,
schema_filename: str,
iceberg_mode: str,
):
schema_path = tmpdir / "iceberg_schema.proto"
schema_path.write_text(ICEBERG_SCHEMA)
schema_path = tmpdir / schema_filename
schema_path.write_text(schema)
schema_create_args: list[str] = [
str(args.rpk_binary),
"registry",
"schema",
"create",
f"{ICEBERG_TOPIC_NAME}-value",
f"{topic_name}-value",
"--schema",
str(schema_path),
]
Expand All @@ -60,17 +99,19 @@ async def setup_iceberg_schema_registry_and_topic(
)
await proc.wait()
if proc.returncode != 0:
raise RuntimeError("Failed to create iceberg schema in schema registry")
raise RuntimeError(
f"Failed to create schema for {topic_name} in schema registry"
)
topic_create_args: list[str] = [
str(args.rpk_binary),
"topic",
"create",
ICEBERG_TOPIC_NAME,
topic_name,
"-p",
"18",
"-r",
"3",
"--topic-config=redpanda.iceberg.mode=value_schema_latest",
f"--topic-config=redpanda.iceberg.mode={iceberg_mode}",
"--topic-config=redpanda.iceberg.target.lag.ms=20000",
]
proc = await asyncio.create_subprocess_exec(
Expand All @@ -79,7 +120,7 @@ async def setup_iceberg_schema_registry_and_topic(
)
await proc.wait()
if proc.returncode != 0:
raise RuntimeError("Failed to create iceberg topic")
raise RuntimeError(f"Failed to create topic {topic_name}")


async def read_until(proc: asyncio.subprocess.Process, marker: str, tag: str):
Expand Down Expand Up @@ -248,11 +289,11 @@ def get_dir_size(path: Path) -> int:

iceberg_dir = (
tmpdir
/ "rp_data/minio/data/panda-bucket/redpanda-iceberg-catalog/redpanda/iceberg-topic/"
/ f"rp_data/minio/data/panda-bucket/redpanda-iceberg-catalog/redpanda/{ICEBERG_TOPIC_NAME}/"
)
dlq_dir = (
tmpdir
/ "rp_data/minio/data/panda-bucket/redpanda-iceberg-catalog/redpanda/iceberg-topic~dlq/"
/ f"rp_data/minio/data/panda-bucket/redpanda-iceberg-catalog/redpanda/{ICEBERG_TOPIC_NAME}~dlq/"
)

def check_sizes() -> str:
Expand Down Expand Up @@ -281,6 +322,31 @@ def check_sizes() -> str:
raise RuntimeError(status)


async def send_iceberg_rpk_messages(
args: argparse.Namespace, topic_name: str, payload: bytes
):
print(f"Sending 100 messages to {topic_name} via rpk...")

for _ in range(100):
produce_args: list[str] = [
str(args.rpk_binary),
"topic",
"produce",
topic_name,
"--schema-id=topic",
]
proc = await asyncio.create_subprocess_exec(
*produce_args,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
start_new_session=True,
)
await proc.communicate(input=payload)
if proc.returncode != 0:
raise RuntimeError(f"Failed to send message to {topic_name}")


async def terminate(proc: asyncio.subprocess.Process, name: str) -> int:
try:
print(f"Terminating {name} (pid {proc.pid})")
Expand All @@ -307,8 +373,40 @@ async def profile(args: argparse.Namespace, tmpdir: Path, redpanda_bin: Path):
await read_until(cluster_proc, CLUSTER_STARTUP_MARKER, "cluster")
cluster_task = asyncio.create_task(continue_stream(cluster_proc, "cluster"))

await setup_iceberg_schema_registry_and_topic(args, tmpdir)
# rpk doesn't understand value_schema_latest so for rpk the topic needs
# to use value_schema_id_prefix for the rpk based loads

# do very simple avro iceberg training
await setup_schema_and_topic(
args,
tmpdir,
AVRO_TOPIC_NAME,
AVRO_SCHEMA,
"avro_schema.avsc",
"value_schema_id_prefix",
)
await send_iceberg_rpk_messages(args, AVRO_TOPIC_NAME, AVRO_SAMPLE_PAYLOAD)

# do very simple json schema iceberg training
await setup_schema_and_topic(
args,
tmpdir,
JSON_TOPIC_NAME,
JSON_SCHEMA,
"json_schema.json",
"value_schema_id_prefix",
)
await send_iceberg_rpk_messages(args, JSON_TOPIC_NAME, JSON_SAMPLE_PAYLOAD)

# full produce plus protobuf iceberg
await setup_schema_and_topic(
args,
tmpdir,
ICEBERG_TOPIC_NAME,
ICEBERG_SCHEMA,
"iceberg_schema.proto",
"value_schema_latest",
)
omb_proc, omb_target = await start_omb(tmpdir, args.omb_benchmark)
await read_until(omb_proc, BENCH_START_MARKER, "omb")
await asyncio.create_task(continue_stream(omb_proc, "omb"))
Expand Down