diff --git a/tools/pgo_bolt/train_pgo.py b/tools/pgo_bolt/train_pgo.py index 5cbeab7e1db5a..fdf1bee104a2e 100755 --- a/tools/pgo_bolt/train_pgo.py +++ b/tools/pgo_bolt/train_pgo.py @@ -24,7 +24,6 @@ # brokers) into one file. ICEBERG_SCHEMA = """ - syntax = "proto3"; import "google/protobuf/timestamp.proto"; @@ -35,22 +34,62 @@ google.protobuf.Timestamp ts = 3; } """ - ICEBERG_SAMPLE_PAYLOAD = b"\n\x1fhello my name is protobuf shady\x10\xb9`\x1a\x0b\x08\xf4\xf4\xb7\xcb\x06\x10\xc0\xb1\xc3v" -ICEBERG_TOPIC_NAME = "iceberg-topic" +ICEBERG_TOPIC_NAME = "iceberg-protobuf-topic" + +AVRO_SCHEMA = """{ + "type": "record", + "name": "Simple", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "id", "type": "int"}, + {"name": "ts", "type": {"type": "long", "logicalType": "timestamp-micros"}} + ] +}""" +AVRO_TOPIC_NAME = "iceberg-avro-topic" +AVRO_SAMPLE_PAYLOAD = json.dumps( + {"name": "hello my name is avro shady", "id": 24680, "ts": 1625079045123456} +).encode("utf8") + +JSON_SCHEMA = """{ + "$schema": "http://json-schema.org/draft-07/schema#", + "type": "object", + "title": "Simple", + "properties": { + "name": {"type": "string"}, + "id": {"type": "integer"}, + "ts": {"type": "integer"}, + "labels": {"type": "array", "items": {"type": "string"}} + }, + "required": ["name", "id", "ts", "labels"] +}""" +JSON_TOPIC_NAME = "iceberg-json-topic" +JSON_SAMPLE_PAYLOAD = json.dumps( + { + "name": "hello my name is json shady", + "id": 13579, + "ts": 1625079045123456, + "labels": ["one", "two", "three"], + } +).encode("utf8") -async def setup_iceberg_schema_registry_and_topic( - args: argparse.Namespace, tmpdir: Path +async def setup_schema_and_topic( + args: argparse.Namespace, + tmpdir: Path, + topic_name: str, + schema: str, + schema_filename: str, + iceberg_mode: str, ): - schema_path = tmpdir / "iceberg_schema.proto" - schema_path.write_text(ICEBERG_SCHEMA) + schema_path = tmpdir / schema_filename + schema_path.write_text(schema) schema_create_args: list[str] = [ str(args.rpk_binary), "registry", "schema", "create", - f"{ICEBERG_TOPIC_NAME}-value", + f"{topic_name}-value", "--schema", str(schema_path), ] @@ -60,17 +99,19 @@ async def setup_iceberg_schema_registry_and_topic( ) await proc.wait() if proc.returncode != 0: - raise RuntimeError("Failed to create iceberg schema in schema registry") + raise RuntimeError( + f"Failed to create schema for {topic_name} in schema registry" + ) topic_create_args: list[str] = [ str(args.rpk_binary), "topic", "create", - ICEBERG_TOPIC_NAME, + topic_name, "-p", "18", "-r", "3", - "--topic-config=redpanda.iceberg.mode=value_schema_latest", + f"--topic-config=redpanda.iceberg.mode={iceberg_mode}", "--topic-config=redpanda.iceberg.target.lag.ms=20000", ] proc = await asyncio.create_subprocess_exec( @@ -79,7 +120,7 @@ async def setup_iceberg_schema_registry_and_topic( ) await proc.wait() if proc.returncode != 0: - raise RuntimeError("Failed to create iceberg topic") + raise RuntimeError(f"Failed to create topic {topic_name}") async def read_until(proc: asyncio.subprocess.Process, marker: str, tag: str): @@ -248,11 +289,11 @@ def get_dir_size(path: Path) -> int: iceberg_dir = ( tmpdir - / "rp_data/minio/data/panda-bucket/redpanda-iceberg-catalog/redpanda/iceberg-topic/" + / f"rp_data/minio/data/panda-bucket/redpanda-iceberg-catalog/redpanda/{ICEBERG_TOPIC_NAME}/" ) dlq_dir = ( tmpdir - / "rp_data/minio/data/panda-bucket/redpanda-iceberg-catalog/redpanda/iceberg-topic~dlq/" + / f"rp_data/minio/data/panda-bucket/redpanda-iceberg-catalog/redpanda/{ICEBERG_TOPIC_NAME}~dlq/" ) def check_sizes() -> str: @@ -281,6 +322,31 @@ def check_sizes() -> str: raise RuntimeError(status) +async def send_iceberg_rpk_messages( + args: argparse.Namespace, topic_name: str, payload: bytes +): + print(f"Sending 100 messages to {topic_name} via rpk...") + + for _ in range(100): + produce_args: list[str] = [ + str(args.rpk_binary), + "topic", + "produce", + topic_name, + "--schema-id=topic", + ] + proc = await asyncio.create_subprocess_exec( + *produce_args, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + start_new_session=True, + ) + await proc.communicate(input=payload) + if proc.returncode != 0: + raise RuntimeError(f"Failed to send message to {topic_name}") + + async def terminate(proc: asyncio.subprocess.Process, name: str) -> int: try: print(f"Terminating {name} (pid {proc.pid})") @@ -307,8 +373,40 @@ async def profile(args: argparse.Namespace, tmpdir: Path, redpanda_bin: Path): await read_until(cluster_proc, CLUSTER_STARTUP_MARKER, "cluster") cluster_task = asyncio.create_task(continue_stream(cluster_proc, "cluster")) - await setup_iceberg_schema_registry_and_topic(args, tmpdir) + # rpk doesn't understand value_schema_latest so for rpk the topic needs + # to use value_schema_id_prefix for the rpk based loads + # do very simple avro iceberg training + await setup_schema_and_topic( + args, + tmpdir, + AVRO_TOPIC_NAME, + AVRO_SCHEMA, + "avro_schema.avsc", + "value_schema_id_prefix", + ) + await send_iceberg_rpk_messages(args, AVRO_TOPIC_NAME, AVRO_SAMPLE_PAYLOAD) + + # do very simple json schema iceberg training + await setup_schema_and_topic( + args, + tmpdir, + JSON_TOPIC_NAME, + JSON_SCHEMA, + "json_schema.json", + "value_schema_id_prefix", + ) + await send_iceberg_rpk_messages(args, JSON_TOPIC_NAME, JSON_SAMPLE_PAYLOAD) + + # full produce plus protobuf iceberg + await setup_schema_and_topic( + args, + tmpdir, + ICEBERG_TOPIC_NAME, + ICEBERG_SCHEMA, + "iceberg_schema.proto", + "value_schema_latest", + ) omb_proc, omb_target = await start_omb(tmpdir, args.omb_benchmark) await read_until(omb_proc, BENCH_START_MARKER, "omb") await asyncio.create_task(continue_stream(omb_proc, "omb"))