Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 245 additions & 1 deletion modules/develop/pages/data-transforms/build.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,18 @@ onRecordWritten((event, writer) => {

=== Write to specific output topics

You can configure your transform function to write records to specific output topics. This is useful for filtering or routing messages based on certain criteria. The following example shows a filter that outputs only valid JSON from the input topic into the output topic. Invalid JSON is written to a different output topic.
You can configure your transform function to write records to specific output topics based on message content, enabling powerful routing and fan-out patterns. This capability is useful for:

* Filtering messages by criteria and routing to different topics
* Fan-out patterns that distribute data from one input topic to multiple output topics
* Event routing based on message type or schema
* Data distribution for downstream consumers

WASM transforms provide a simpler alternative to external connectors like Kafka Connect for in-broker data routing, with lower latency and no additional infrastructure to manage.

==== Basic JSON validation example

The following example shows a filter that outputs only valid JSON from the input topic into the output topic. Invalid JSON is written to a different output topic.

[tabs]
======
Expand Down Expand Up @@ -458,6 +469,239 @@ The JavaScript SDK does not support writing records to a specific output topic.
--
======

[[multi-topic-fanout]]
==== Multi-topic fan-out with Schema Registry

This example routes batched updates from a single input topic to multiple output topics based on a routing field in each message. Messages are encoded with the xref:manage:schema-reg/schema-reg-overview.adoc#wire-format[Schema Registry wire format] for validation against the output topic schema. Consider using this pattern with Iceberg-enabled topics to fan out data directly into lakehouse tables.

.Input message example
[,json]
----
{
"updates": [
{"table": "orders", "data": {"order_id": "123", "amount": 99.99}},
{"table": "inventory", "data": {"product_id": "P456", "quantity": 50}},
{"table": "customers", "data": {"customer_id": "C789", "name": "Jane"}}
]
}
----

xref:develop:data-transforms/configure.adoc[Configure the transform] with multiple output topics:

[,yaml]
----
name: event-router
input_topic: events
output_topics:
- orders
- inventory
- customers
----

The transform extracts each update and routes it to the appropriate topic based on the `table` field. In this example, it is assumed that you have created each output topic and registered the corresponding schemas in Schema Registry.

NOTE: xref:manage:schema-reg/schema-reg-api.adoc[Register schemas in Schema Registry] before deploying the transform. The schema IDs in the code examples must match the IDs returned by Schema Registry during registration. Use the `{topic-name}-value` naming convention for schema subjects (for example, `orders-value`, `inventory-value`).

[tabs]
======
Go::
+
--
.`go.mod`
[%collapsible]
====
[,go]
----
module fanout-example

go 1.20

require github.com/redpanda-data/redpanda/src/transform-sdk/go/transform v1.1.0 // v1.1.0+ required
----
====

`transform.go`:

[,go]
----
package main

import (
"encoding/binary"
"encoding/json"
"log"
"github.com/redpanda-data/redpanda/src/transform-sdk/go/transform"
)

// Input message structure with array of updates
type BatchMessage struct {
Updates []TableUpdate `json:"updates"`
}

// Individual table update with routing field
type TableUpdate struct {
Table string `json:"table"` // Routing field - determines output topic
Data json.RawMessage `json:"data"` // The actual data to write
}

// Schema IDs for each output topic obtained from Schema Registry
// Register schemas before deploying the transform using the {topic-name}-value naming convention
var schemaIDs = map[string]int{
"orders": 1,
"inventory": 2,
"customers": 3,
}

func main() {
log.Printf("Starting fanout transform with schema IDs: %v", schemaIDs)
transform.OnRecordWritten(routeUpdates)
}

func routeUpdates(event transform.WriteEvent, writer transform.RecordWriter) error {
var batch BatchMessage
if err := json.Unmarshal(event.Record().Value, &batch); err != nil {
log.Printf("Failed to parse batch message: %v", err)
return nil // Skip invalid records
}

// Process each update in the batch
for i, update := range batch.Updates {
schemaID, exists := schemaIDs[update.Table]
if !exists {
log.Printf("Unknown table in update %d: %s", i, update.Table)
continue
}

if err := writeUpdate(update, schemaID, writer, event); err != nil {
log.Printf("Failed to write update %d to %s: %v", i, update.Table, err)
}
}

return nil
}

func writeUpdate(update TableUpdate, schemaID int, writer transform.RecordWriter, event transform.WriteEvent) error {
// Create Schema Registry wire format: [magic_byte, schema_id (4 bytes BE), data...]
value := make([]byte, 5)
value[0] = 0 // magic byte
binary.BigEndian.PutUint32(value[1:5], uint32(schemaID))
value = append(value, update.Data...)

record := transform.Record{
Key: event.Record().Key,
Value: value,
}

return writer.Write(record, transform.ToTopic(update.Table))
}
----
--

Rust::
+
--
.`Cargo.toml`
[%collapsible]
====
[,toml]
----
[package]
name = "fanout-rust-example"
version = "0.1.0"
edition = "2021"

[dependencies]
redpanda-transform-sdk = "1.1.0" # v1.1.0+ required for WriteOptions API
serde = { version = "1", features = ["derive"] }
serde_json = "1"

[profile.release]
opt-level = "z"
lto = true
strip = true
----
====

`src/main.rs`:

[,rust]
----
use redpanda_transform_sdk::*;
use serde::Deserialize;
use std::collections::HashMap;
use std::error::Error;

#[derive(Deserialize)]
struct BatchMessage {
updates: Vec<TableUpdate>,
}

#[derive(Deserialize)]
struct TableUpdate {
table: String,
data: serde_json::Value,
}

// Schema IDs for each output topic obtained from Schema Registry
// Register schemas before deploying the transform using the {topic-name}-value naming convention
static mut SCHEMA_IDS: Option<HashMap<String, i32>> = None;

fn main() {
let mut schema_ids = HashMap::new();
schema_ids.insert("orders".to_string(), 1);
schema_ids.insert("inventory".to_string(), 2);
schema_ids.insert("customers".to_string(), 3);

unsafe {
SCHEMA_IDS = Some(schema_ids);
}

on_record_written(route_updates);
}

fn write_update(
update: &TableUpdate,
schema_id: i32,
writer: &mut RecordWriter,
event: &WriteEvent,
) -> Result<(), Box<dyn Error>> {
// Create Schema Registry wire format: [magic_byte, schema_id (4 bytes BE), data...]
let mut value = vec![0u8; 5];
value[0] = 0; // magic byte
value[1..5].copy_from_slice(&schema_id.to_be_bytes());

let data_bytes = serde_json::to_vec(&update.data)?;
value.extend_from_slice(&data_bytes);

let key = event.record.key().map(|k| k.to_vec());
let record = BorrowedRecord::new(key.as_deref(), Some(&value));

writer.write_with_options(record, WriteOptions::to_topic(&update.table))?;
Ok(())
}

fn route_updates(event: WriteEvent, writer: &mut RecordWriter) -> Result<(), Box<dyn Error>> {
let batch: BatchMessage = serde_json::from_slice(event.record.value().unwrap_or_default())?;
let schema_ids = unsafe { SCHEMA_IDS.as_ref().unwrap() };

for update in batch.updates.iter() {
if let Some(&schema_id) = schema_ids.get(&update.table) {
write_update(update, schema_id, writer, &event)?;
}
}

Ok(())
}
----
--

JavaScript::
+
--
The JavaScript SDK does not support writing records to specific output topics. For multi-topic fan-out, use the Go or Rust SDK.
--
======

=== Connect to the Schema Registry

You can use the Schema Registry client library to read and write schemas as well as serialize and deserialize records. This client library is useful when working with schema-based topics in your data transforms.
Expand Down