Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/tools/integration/packages.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
{"path": "./internal/impl/gcp/enterprise/changestreams/metadata"},
{"path": "./internal/impl/hdfs"},
{"path": "./internal/impl/influxdb"},
{"path": "./internal/impl/kafka"},
{"path": "./internal/impl/kafka", "timeout": "10m"},
{"path": "./internal/impl/kafka/enterprise"},
{"path": "./internal/impl/memcached"},
{"path": "./internal/impl/mssqlserver", "timeout": "10m"},
Expand Down
2 changes: 1 addition & 1 deletion internal/impl/kafka/franz_reader_ordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestPartitionCacheOrdering(t *testing.T) {
atomic.StoreInt64(&commitOffset, r.Offset)
})

batches, batchSize := 1000000, 10
batches, batchSize := 100000, 10

go func() {
for bid := range batches {
Expand Down
5 changes: 4 additions & 1 deletion internal/impl/kafka/input_sarama_kafka_parts.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ func (k *kafkaReader) connectExplicitTopics(ctx context.Context, config *sarama.
msgChan := make(chan asyncMessage)
ctx, doneFn := context.WithCancel(context.Background())

// Assign msgChan before spawning partition consumer goroutines to avoid a
// data race: goroutines read k.msgChan immediately upon starting.
k.msgChan = msgChan
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moving k.msgChan = msgChan here fixes the goroutine-spawn race, but leaves k.msgChan set on the error path. If consumer.ConsumePartition fails inside the loop, this function returns an error while k.msgChan is left pointing at an orphaned channel: the deferred cleanup only closes consumer/coordinator/client, and the goroutine that resets k.msgChan = nil is only spawned after the for loop completes.

That breaks the invariant relied on by Connect: the next retry sees k.msgChan != nil, short-circuits with return nil, and ReadBatch then blocks forever on a dead channel. Suggest resetting k.msgChan = nil in the deferred cleanup when err != nil.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 7bf43b9: reset k.msgChan = nil in the deferred cleanup when err != nil so a subsequent Connect retry observes the disconnected sentinel and re-runs the connection setup rather than short-circuiting on a stale channel reference. Verified the kafka integration suite still passes (TestRedpandaIntegration, TestIntegrationUnordered, TestRedpandaRecordOrder{Integration,SoakTest}, TestIntegrationCache{,Standardized}, TestIntegrationSarama{Redpanda,OutputFixedTimestamp,CheckpointOneLockUp}, TestIntegrationUnorderedSasl, TestRedpandaConnectionTest{,Sasl,PrematureConnect}Integration, TestRedpandaSaslIntegration).


for topic, partitions := range k.topicPartitions {
for _, partition := range partitions {
topic := topic
Expand Down Expand Up @@ -301,6 +305,5 @@ func (k *kafkaReader) connectExplicitTopics(ctx context.Context, config *sarama.
k.consumerCloseFn = doneFn
k.consumerDoneCtx = doneCtx
k.session = offsetTracker
k.msgChan = msgChan
return nil
}
2 changes: 1 addition & 1 deletion internal/impl/kafka/integration_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func TestIntegrationCacheStandardized(t *testing.T) {
cache_resources:
- label: testcache
redpanda:
seed_brokers: ["localhost:$PORT"]
seed_brokers: ["127.0.0.1:$PORT"]
topic: "topic-$ID"
`
t.Run("single partition", func(t *testing.T) {
Expand Down
21 changes: 20 additions & 1 deletion internal/impl/kafka/integration_ordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,26 @@ func TestRedpandaRecordOrderSoakTest(t *testing.T) {
// nohup go test -timeout 0 -v -count 1000 -run ^TestRedpandaRecordOrderSoakTest$ ./internal/impl/kafka/ > soak.log 2>&1 &
integration.CheckSkip(t)

const soakDuration = 3 * time.Minute
// Derive soak duration from the test deadline so the test doesn't get
// killed by a timeout when other tests in the package run first. Reserve
// time for container setup and teardown. If not enough time remains, skip.
const (
setupTeardownBudget = 2 * time.Minute
minSoakDuration = 30 * time.Second
defaultSoakDuration = 1 * time.Minute
)

soakDuration := defaultSoakDuration
if dl, ok := t.Deadline(); ok {
remaining := time.Until(dl) - setupTeardownBudget
if remaining < minSoakDuration {
t.Skipf("not enough time for soak test: %s remaining after reserving %s for setup/teardown (need at least %s)", time.Until(dl).Round(time.Second), setupTeardownBudget, minSoakDuration)
}
if remaining < soakDuration {
soakDuration = remaining
t.Logf("Adjusted soak duration to %s based on test deadline", soakDuration.Round(time.Second))
}
}

// --- infrastructure ---

Expand Down
16 changes: 8 additions & 8 deletions internal/impl/kafka/integration_sarama_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestIntegrationSaramaCheckpointOneLockUp(t *testing.T) {
inBuilder := service.NewStreamBuilder()
require.NoError(t, inBuilder.AddOutputYAML(fmt.Sprintf(`
kafka:
addresses: [ "localhost:%v" ]
addresses: [ "127.0.0.1:%v" ]
topic: topic-wcotesttopic
max_in_flight: 1
`, kafkaPortStr)))
Expand All @@ -75,7 +75,7 @@ kafka:

outBuilderConf := fmt.Sprintf(`
kafka:
addresses: [ "localhost:%v" ]
addresses: [ "127.0.0.1:%v" ]
topics: [ topic-wcotesttopic ]
consumer_group: wcotestgroup
checkpoint_limit: 1
Expand Down Expand Up @@ -170,7 +170,7 @@ func TestIntegrationSaramaRedpanda(t *testing.T) {
template := `
output:
kafka:
addresses: [ localhost:$PORT ]
addresses: [ 127.0.0.1:$PORT ]
topic: topic-$ID
max_in_flight: $MAX_IN_FLIGHT
retry_as_batch: $VAR3
Expand All @@ -181,7 +181,7 @@ output:

input:
kafka:
addresses: [ localhost:$PORT ]
addresses: [ 127.0.0.1:$PORT ]
topics: [ topic-$ID$VAR1 ]
consumer_group: "$VAR4"
checkpoint_limit: $VAR2
Expand Down Expand Up @@ -338,7 +338,7 @@ input:
templateManualPartitioner := `
output:
kafka:
addresses: [ localhost:$PORT ]
addresses: [ 127.0.0.1:$PORT ]
topic: topic-$ID
max_in_flight: $MAX_IN_FLIGHT
retry_as_batch: $VAR3
Expand All @@ -351,7 +351,7 @@ output:

input:
kafka:
addresses: [ localhost:$PORT ]
addresses: [ 127.0.0.1:$PORT ]
topics: [ topic-$ID$VAR1 ]
consumer_group: "$VAR4"
checkpoint_limit: $VAR2
Expand Down Expand Up @@ -388,13 +388,13 @@ func TestIntegrationSaramaOutputFixedTimestamp(t *testing.T) {
template := `
output:
kafka:
addresses: [ localhost:$PORT ]
addresses: [ 127.0.0.1:$PORT ]
topic: topic-$ID
timestamp_ms: 1000000000000

input:
kafka:
addresses: [ localhost:$PORT ]
addresses: [ 127.0.0.1:$PORT ]
topics: [ topic-$ID ]
consumer_group: "blobfish"
processors:
Expand Down
42 changes: 27 additions & 15 deletions internal/impl/kafka/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,18 @@ input:
integration.StreamTestSendBatchCount(10),
)

// Lighter suite for partition-specific groups that only need to validate
// routing correctness, not throughput. Keeps total runtime well within
// the parent test deadline so later groups don't get starved.
partitionSuite := integration.StreamTests(
integration.StreamTestOpenClose(),
integration.StreamTestMetadata(),
integration.StreamTestSendBatch(10),
integration.StreamTestStreamSequential(100),
integration.StreamTestStreamParallel(100),
integration.StreamTestSendBatchCount(10),
)

suite.Run(
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) {
Expand All @@ -184,7 +196,7 @@ input:
)

t.Run("only one partition", func(t *testing.T) {
suite.Run(
partitionSuite.Run(
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) {
vars.General["VAR4"] = "group" + vars.ID
Expand All @@ -196,7 +208,7 @@ input:
})

t.Run("explicit partitions", func(t *testing.T) {
suite.Run(
partitionSuite.Run(
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) {
topicName := "topic-" + vars.ID
Expand All @@ -207,19 +219,19 @@ input:
integration.StreamTestOptSleepAfterInput(time.Second*3),
integration.StreamTestOptVarSet("VAR4", ""),
)
})

t.Run("range of partitions", func(t *testing.T) {
suite.Run(
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) {
require.NoError(t, createKafkaTopic(ctx, brokerAddr, vars.ID, 4))
}),
integration.StreamTestOptPort(kafkaPortStr),
integration.StreamTestOptSleepAfterInput(time.Second*3),
integration.StreamTestOptVarSet("VAR1", ":0-3"),
integration.StreamTestOptVarSet("VAR4", ""),
)
})
t.Run("range of partitions", func(t *testing.T) {
partitionSuite.Run(
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) {
require.NoError(t, createKafkaTopic(ctx, brokerAddr, vars.ID, 4))
}),
integration.StreamTestOptPort(kafkaPortStr),
integration.StreamTestOptSleepAfterInput(time.Second*3),
integration.StreamTestOptVarSet("VAR1", ":0-3"),
integration.StreamTestOptVarSet("VAR4", ""),
)
})

manualPartitionTemplate := `
Expand All @@ -244,7 +256,7 @@ input:
commit_period: "1s"
`
t.Run("manual_partitioner", func(t *testing.T) {
suite.Run(
partitionSuite.Run(
t, manualPartitionTemplate,
integration.StreamTestOptPreTest(func(t testing.TB, _ context.Context, vars *integration.StreamTestConfigVars) {
vars.General["VAR4"] = "group" + vars.ID
Expand Down
48 changes: 30 additions & 18 deletions internal/impl/kafka/integration_unordered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func TestIntegrationUnordered(t *testing.T) {
template := `
output:
redpanda:
seed_brokers: [ localhost:$PORT ]
seed_brokers: [ 127.0.0.1:$PORT ]
topic: topic-$ID
max_in_flight: $MAX_IN_FLIGHT
timeout: "5s"
Expand All @@ -46,7 +46,7 @@ output:

input:
redpanda:
seed_brokers: [ localhost:$PORT ]
seed_brokers: [ 127.0.0.1:$PORT ]
topics: [ topic-$ID$VAR1 ]
consumer_group: "$VAR4"
commit_period: "1s"
Expand All @@ -67,10 +67,21 @@ input:
integration.StreamTestStreamSaturatedUnacked(200),
)

// Lighter suite for partition-specific groups that only need to validate
// routing correctness, not throughput. Keeps total runtime well within
// the parent test deadline so later groups don't get starved.
partitionSuite := integration.StreamTests(
integration.StreamTestOpenClose(),
integration.StreamTestMetadata(),
integration.StreamTestSendBatch(10),
integration.StreamTestStreamSequential(100),
integration.StreamTestStreamParallel(100),
)

// In some modes include testing input level batching
var suiteExt integration.StreamTestList
suiteExt = append(suiteExt, suite...)
suiteExt = append(suiteExt, integration.StreamTestReceiveBatchCount(10))
var partitionSuiteExt integration.StreamTestList
partitionSuiteExt = append(partitionSuiteExt, partitionSuite...)
partitionSuiteExt = append(partitionSuiteExt, integration.StreamTestReceiveBatchCount(10))

suite.Run(
t, template,
Expand All @@ -84,7 +95,7 @@ input:

t.Run("only one partition", func(t *testing.T) {
t.Parallel()
suiteExt.Run(
partitionSuiteExt.Run(
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) {
vars.General["VAR4"] = "group" + vars.ID
Expand All @@ -97,7 +108,7 @@ input:

t.Run("explicit partitions", func(t *testing.T) {
t.Parallel()
suite.Run(
partitionSuite.Run(
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) {
topicName := "topic-" + vars.ID
Expand All @@ -111,7 +122,7 @@ input:

t.Run("range of partitions", func(t *testing.T) {
t.Parallel()
suite.Run(
partitionSuite.Run(
t, template,
integration.StreamTestOptPreTest(func(t testing.TB, ctx context.Context, vars *integration.StreamTestConfigVars) {
require.NoError(t, createKafkaTopic(ctx, brokerAddr, vars.ID, 4))
Expand All @@ -127,7 +138,7 @@ input:
manualPartitionTemplate := `
output:
redpanda:
seed_brokers: [ localhost:$PORT ]
seed_brokers: [ 127.0.0.1:$PORT ]
topic: topic-$ID
max_in_flight: $MAX_IN_FLIGHT
timeout: "5s"
Expand All @@ -138,7 +149,7 @@ output:

input:
redpanda:
seed_brokers: [ localhost:$PORT ]
seed_brokers: [ 127.0.0.1:$PORT ]
topics: [ topic-$ID$VAR1 ]
consumer_group: "$VAR4"
unordered_processing:
Expand All @@ -147,7 +158,8 @@ input:
commit_period: "1s"
`
t.Run("manual_partitioner", func(t *testing.T) {
suite.Run(
t.Parallel()
partitionSuite.Run(
t, manualPartitionTemplate,
integration.StreamTestOptPreTest(func(t testing.TB, _ context.Context, vars *integration.StreamTestConfigVars) {
vars.General["VAR4"] = "group" + vars.ID
Expand All @@ -171,7 +183,7 @@ func TestIntegrationUnorderedSasl(t *testing.T) {
template := `
output:
redpanda:
seed_brokers: [ localhost:$PORT ]
seed_brokers: [ 127.0.0.1:$PORT ]
topic: topic-$ID
max_in_flight: $MAX_IN_FLIGHT
metadata:
Expand All @@ -183,7 +195,7 @@ output:

input:
redpanda:
seed_brokers: [ localhost:$PORT ]
seed_brokers: [ 127.0.0.1:$PORT ]
topics: [ topic-$ID$VAR1 ]
consumer_group: "$VAR4"
sasl:
Expand All @@ -198,9 +210,9 @@ input:
integration.StreamTestOpenClose(),
integration.StreamTestMetadata(),
integration.StreamTestSendBatch(10),
integration.StreamTestStreamSequential(1000),
integration.StreamTestStreamParallel(1000),
integration.StreamTestStreamParallelLossy(1000),
integration.StreamTestStreamSequential(100),
integration.StreamTestStreamParallel(100),
integration.StreamTestStreamParallelLossy(100),
)

suite.Run(
Expand Down Expand Up @@ -228,7 +240,7 @@ func BenchmarkIntegrationUnordered(b *testing.B) {
template := `
output:
redpanda:
seed_brokers: [ localhost:$PORT ]
seed_brokers: [ 127.0.0.1:$PORT ]
topic: topic-$ID
max_in_flight: 128
timeout: "5s"
Expand All @@ -237,7 +249,7 @@ output:

input:
redpanda:
seed_brokers: [ localhost:$PORT ]
seed_brokers: [ 127.0.0.1:$PORT ]
topics: [ topic-$ID ]
consumer_group: "$VAR3"
checkpoint_limit: 100
Expand Down
18 changes: 11 additions & 7 deletions internal/impl/kafka/testcontainers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,7 @@ func startRedpanda(t testing.TB) (brokerAddr, port string) {
brokerAddr, err = ctr.KafkaSeedBroker(t.Context())
require.NoError(t, err)

// Extract port from "host:port"
parts := strings.Split(brokerAddr, ":")
port = parts[len(parts)-1]

brokerAddr, port = brokerAddrPort(brokerAddr)
return brokerAddr, port
}

Expand Down Expand Up @@ -75,8 +72,15 @@ func startRedpandaWithSASL(t testing.TB) (brokerAddr, port string) {
brokerAddr, err = ctr.KafkaSeedBroker(t.Context())
require.NoError(t, err)

parts := strings.Split(brokerAddr, ":")
port = parts[len(parts)-1]

brokerAddr, port = brokerAddrPort(brokerAddr)
return brokerAddr, port
}

// brokerAddrPort normalises a broker address returned by testcontainers,
// replacing "localhost" with "127.0.0.1" to avoid DNS resolution failures
// in environments where the system resolver cannot resolve "localhost".
func brokerAddrPort(addr string) (brokerAddr, port string) {
addr = strings.Replace(addr, "localhost", "127.0.0.1", 1)
parts := strings.Split(addr, ":")
return addr, parts[len(parts)-1]
}