Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion kafka-client-examples/e2e-test/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,14 @@ dependencies {
implementation("org.apache.commons:commons-text:1.10.0")

implementation(libs.guava)
implementation(libs.slf4j.log4j2)
implementation(libs.bundles.scylla)
implementation(libs.jackson)
implementation(libs.mongodb.driver.core)

testImplementation(project(":responsive-test-utils"))
testImplementation(testlibs.bundles.base)
testImplementation(testlibs.bundles.testcontainers)

}

java {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
import dev.responsive.examples.regression.RegressionSchema;
import dev.responsive.examples.regression.ResultsComparatorService;
import dev.responsive.examples.regression.tests.AbstractKSExampleService;
import dev.responsive.examples.regression.tests.FullTopologyExample;
import dev.responsive.examples.regression.tests.KeyBatchExample;
import dev.responsive.examples.regression.tests.STJoinExample;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
Expand Down Expand Up @@ -55,10 +55,10 @@ public static void main(String[] args) throws IOException {
startRegressionDriver(rawCfg, RegressionSchema.EnrichedOrderDeserializer.class);
break;
case REGRESSION_ST_JOIN:
startRegressionTest(new STJoinExample(rawCfg, true));
startRegressionTest(new FullTopologyExample(rawCfg, true));
break;
case REGRESSION_ST_BASELINE:
startRegressionTest(new STJoinExample(rawCfg, false));
startRegressionTest(new FullTopologyExample(rawCfg, false));
break;

case REGRESSION_BATCH_DRIVER:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

package dev.responsive.examples.regression;

import static dev.responsive.examples.regression.RegConstants.CUSTOMERS;
import static dev.responsive.examples.regression.RegConstants.CUSTOMER_ID_TO_NAME;
import static dev.responsive.examples.regression.RegConstants.CUSTOMER_NAME_TO_LOCATION;
import static dev.responsive.examples.regression.RegConstants.ORDERS;
import static dev.responsive.examples.regression.RegConstants.resultsTopic;

Expand Down Expand Up @@ -45,15 +46,15 @@ public class OrderAndCustomerDriver extends AbstractExecutionThreadService {
private final Map<String, Object> props;

private final KafkaProducer<String, Order> orderProducer;
private final KafkaProducer<String, Customer> customerProducer;
private final KafkaProducer<String, String> customerProducer;
private final RateLimiter rateLimiter = RateLimiter.create(RECORDS_PER_SECOND);
private final CustomerGen customerGen = new CustomerGen(random);
private final OrderGen orderGen = new OrderGen(random, customerGen);

public OrderAndCustomerDriver(final Map<String, Object> props) {
this.props = new HashMap<>(props);
this.orderProducer = getProducer(props, RegressionSchema.OrderSerializer.class);
this.customerProducer = getProducer(props, RegressionSchema.CustomerSerializer.class);
this.customerProducer = getProducer(props, StringSerializer.class);
}

private static <V> KafkaProducer<String, V> getProducer(
Expand All @@ -76,7 +77,12 @@ protected void startUp() {
E2ETestUtils.maybeCreateTopics(
props,
RegConstants.NUM_PARTITIONS,
List.of(ORDERS, CUSTOMERS, resultsTopic(true), resultsTopic(false))
List.of(ORDERS,
CUSTOMER_NAME_TO_LOCATION,
CUSTOMER_ID_TO_NAME,
resultsTopic(true),
resultsTopic(false)
)
);
LOG.info("Created topics.");
}
Expand All @@ -89,7 +95,7 @@ protected void shutDown() {
protected void run() throws ExecutionException, InterruptedException {
LOG.info("Running OrderAndCustomerDriver...");
// create the first customer so that orders will have a valid customer id
customerProducer.send(newCustomer()).get();
sendNewCustomer();

int orders = 0;
int customers = 1;
Expand All @@ -100,10 +106,10 @@ protected void run() throws ExecutionException, InterruptedException {
final boolean isOrder = random.nextByte() % 8 != 0; // 8:1 ratio of orders to customers
if (isOrder) {
orders++;
orderProducer.send(newOrder()).get();
sendNewOrder();
} else {
customers++;
customerProducer.send(newCustomer()).get();
sendNewCustomer();
}

if ((orders + customers) % 1000 == 0) {
Expand All @@ -112,25 +118,46 @@ protected void run() throws ExecutionException, InterruptedException {
}
}

private ProducerRecord<String, Customer> newCustomer() {
// 1 customer is actually two records/topics -- one for name-id info and one for id-location
// we send both at the same time and with the same timestamp to ensure these get joined
private void sendNewCustomer() throws ExecutionException, InterruptedException {

final Customer customer = customerGen.next();
final long timestamp = System.currentTimeMillis();

final boolean isTombstone = random.nextByte() % 5 == 0; // 5% chance of a tombstone

return new ProducerRecord<>(
CUSTOMERS,
final var idToNameRecord = new ProducerRecord<>(
CUSTOMER_ID_TO_NAME,
null,
timestamp,
customer.customerId(),
isTombstone ? null : customer
isTombstone ? null : customer.customerName()
);
final var nameToLocationRecord = new ProducerRecord<>(
CUSTOMER_NAME_TO_LOCATION,
null,
timestamp,
customer.customerName(),
isTombstone ? null : customer.location()
);

customerProducer.send(idToNameRecord).get();
customerProducer.send(nameToLocationRecord).get();
}

private ProducerRecord<String, Order> newOrder() {
private void sendNewOrder() throws ExecutionException, InterruptedException {
// key on customer id so that the order can be joined with the customer
// without a repartition (which would introduce indeterminate results)
final Order order = orderGen.next();
return new ProducerRecord<>(
ORDERS,
order.customerId(),
order
);
orderProducer.send(
new ProducerRecord<>(
ORDERS,
null,
System.currentTimeMillis(),
order.customerId(),
order
)
).get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@

public class RegConstants {
public static final String ORDERS = "orders";
public static final String CUSTOMERS = "customers";
public static final String CUSTOMER_ID_TO_NAME = "customer-names-to-id";
public static final String CUSTOMER_NAME_TO_LOCATION = "customer-ids-to-location";
public static final int NUM_PARTITIONS = 8;
private static final String RESULTS = "enriched-orders";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import dev.responsive.examples.common.JsonSerde;
import dev.responsive.examples.common.JsonSerializer;
import dev.responsive.examples.regression.model.Customer;
import dev.responsive.examples.regression.model.CustomerInfo;
import dev.responsive.examples.regression.model.EnrichedOrder;
import dev.responsive.examples.regression.model.GroupedOrder;
import dev.responsive.examples.regression.model.Order;
Expand All @@ -32,6 +33,10 @@ public static Serde<Customer> customerSerde() {
return new JsonSerde<>(Customer.class);
}

public static Serde<CustomerInfo> customerInfoSerde() {
return new JsonSerde<>(CustomerInfo.class);
}

public static Serde<EnrichedOrder> enrichedOrderSerde() {
return new JsonSerde<>(EnrichedOrder.class);
}
Expand All @@ -50,6 +55,18 @@ public CustomerSerializer() {
}
}

public static class CustomerInfoSerializer extends JsonSerializer<CustomerInfo> {
public CustomerInfoSerializer() {
super(CustomerInfo.class);
}
}

public static class CustomerInfoDeserializer extends JsonDeserializer<CustomerInfo> {
public CustomerInfoDeserializer() {
super(CustomerInfo.class);
}
}

public static class OrderSerializer extends JsonSerializer<Order> {
public OrderSerializer() {
super(Order.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2024 Responsive Computing, Inc.
*
* This source code is licensed under the Responsive Business Source License Agreement v1.0
* available at:
*
* https://www.responsive.dev/legal/responsive-bsl-10
*
* This software requires a valid Commercial License Key for production use. Trial and commercial
* licenses can be obtained at https://www.responsive.dev
*/

package dev.responsive.examples.regression.model;

import com.fasterxml.jackson.annotation.JsonProperty;

public record CustomerInfo(
@JsonProperty("customerName") String customerName,
@JsonProperty("location") String location
) {}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@

package dev.responsive.examples.regression.tests;

import static dev.responsive.examples.regression.RegConstants.CUSTOMERS;
import static dev.responsive.examples.regression.RegConstants.CUSTOMER_ID_TO_NAME;
import static dev.responsive.examples.regression.RegConstants.CUSTOMER_NAME_TO_LOCATION;
import static dev.responsive.examples.regression.RegConstants.NUM_PARTITIONS;
import static dev.responsive.examples.regression.RegConstants.ORDERS;

Expand Down Expand Up @@ -72,7 +73,8 @@ protected final void startUp() throws Exception {
NUM_PARTITIONS,
List.of(
ORDERS,
CUSTOMERS,
CUSTOMER_NAME_TO_LOCATION,
CUSTOMER_ID_TO_NAME,
resultsTopic()
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,16 @@

package dev.responsive.examples.regression.tests;

import static dev.responsive.examples.regression.RegConstants.CUSTOMERS;
import static dev.responsive.examples.regression.RegConstants.CUSTOMER_ID_TO_NAME;
import static dev.responsive.examples.regression.RegConstants.CUSTOMER_NAME_TO_LOCATION;
import static dev.responsive.examples.regression.RegConstants.ORDERS;

import dev.responsive.examples.common.InjectedE2ETestException;
import dev.responsive.examples.e2etest.Params;
import dev.responsive.examples.e2etest.UrandomGenerator;
import dev.responsive.examples.regression.RegressionSchema;
import dev.responsive.examples.regression.model.Customer;
import dev.responsive.examples.regression.model.CustomerInfo;
import dev.responsive.examples.regression.model.EnrichedOrder;
import dev.responsive.examples.regression.model.Order;
import java.time.Duration;
Expand All @@ -35,11 +37,16 @@
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.TimeWindows;

public class STJoinExample extends AbstractKSExampleService {
/**
* Slightly-more-complex example application featuring multiple types of joins (ST and FKJ) and
* a windowed aggregation. As we add more features it would be good to expand this
* topology further with more kinds of operators
*/
public class FullTopologyExample extends AbstractKSExampleService {

private final UrandomGenerator randomGenerator = new UrandomGenerator();

public STJoinExample(final Map<String, Object> props, final boolean responsive) {
public FullTopologyExample(final Map<String, Object> props, final boolean responsive) {
super(
"stream-table-join",
props,
Expand All @@ -51,20 +58,32 @@ public STJoinExample(final Map<String, Object> props, final boolean responsive)
// we should work on making it more robust and cover more DSL operations (perhaps as
// individual tests)
@Override
protected Topology buildTopology() {
public Topology buildTopology() {
final StreamsBuilder builder = new StreamsBuilder();

// Read orders from the orders topic
// Read orders keyed by customer id
final KStream<String, Order> orders =
builder.stream(ORDERS, Consumed.with(Serdes.String(), RegressionSchema.orderSerde()));

// Read customer names keyed by customer id
final KTable<String, String> customerIdToName =
builder.table(CUSTOMER_ID_TO_NAME, Consumed.with(Serdes.String(), Serdes.String()));

// Read customer location keyed by customer name
final KTable<String, String> customerNameToLocation =
builder.table(CUSTOMER_NAME_TO_LOCATION, Consumed.with(Serdes.String(), Serdes.String()));

// Join customer tables to get full Customer metadata keyed by customer id
final KTable<String, Customer> customers = customerIdToName
.join(customerNameToLocation,
id -> id, // join key is customer name --> extract value from customerIdToName
(location, name) -> new CustomerInfo(name, location),
Materialized.with(Serdes.String(), RegressionSchema.customerInfoSerde()))
.mapValues((k, v) -> new Customer(k, v.customerName(), v.location()));

// Read customers from the customers topic
final KTable<String, Customer> customers =
builder.table(CUSTOMERS, Consumed.with(Serdes.String(), RegressionSchema.customerSerde()));

// Enrich orders with customer information by joining the orders
// stream with the customers table
// stream with the customers table, keyed by customer id
KStream<String, EnrichedOrder> enrichedOrders = orders
.join(
customers,
Expand All @@ -87,7 +106,7 @@ protected Topology buildTopology() {
}
})
.groupByKey()
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(1), Duration.ofHours(12)))
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofHours(1), Duration.ofHours(12)))
.reduce(EnrichedOrder::combineWith,
Materialized.with(Serdes.String(), RegressionSchema.enrichedOrderSerde()))
.toStream()
Expand Down
Loading
Loading