-
Notifications
You must be signed in to change notification settings - Fork 6
feature: add support for partition ordering #523
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
feature: add support for partition ordering #523
Conversation
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Note Other AI code review bot(s) detectedCodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review. WalkthroughAdds configurable consumer pipelining strategies (Serial, FanOut) with a default of FanOut. Implements strategy-specific processing in listener, introduces per-item WaitGroup tracking, and centralizes tracing insertion. Updates config schema and tests (new YAML, new test, PHP worker), and refactors test logging/observer utilities. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Kafka
participant Listener
participant Strategy as Pipelining Strategy
participant Driver
participant Queue
participant Worker
Kafka->>Listener: Fetch records (by partition)
Listener->>Strategy: Select Serial or FanOut
alt Serial
loop each partition
loop each record
Strategy->>Strategy: new per-record WaitGroup
Strategy->>Driver: insertTracedItem(record, doneWg)
Driver->>Queue: Enqueue Item (trace context injected)
Queue->>Worker: Deliver Item
Worker-->>Driver: Ack/Nack
Driver-->>Strategy: Item.doneWg.Done()
end
end
else FanOut
par records in fetch
Strategy->>Driver: insertTracedItem(record, nil)
Driver->>Queue: Enqueue Item
Queue->>Worker: Deliver Item
Worker-->>Driver: Ack/Nack
note right of Driver: No per-record gating
end
end
sequenceDiagram
autonumber
participant Driver
participant OTel as OTel Propagation
participant Queue
Driver->>OTel: Start span from record headers
OTel-->>Driver: Span + context
Driver->>Queue: Insert Item with injected context
note over Driver,Queue: Item carries doneWg (Serial) or nil (FanOut)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Suggested labels
Poem
Pre-merge checks (1 passed, 2 warnings)❌ Failed checks (2 warnings)
✅ Passed checks (1 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
kafkajobs/item.go (2)
114-131
: Signal gating after enqueueing commit, not before.Calling Done() before the commit is accepted on commitsCh can release Serial gating even if the ack cannot be enqueued (default case). Move Done() into the success branch.
if atomic.LoadUint64(i.stopped) == 1 { return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped") } - - if i.doneWg != nil { - i.doneWg.Done() - } - select { case i.commitsCh <- i.record: - return nil + if i.doneWg != nil { + i.doneWg.Done() + } + return nil default: return errors.Str("failed to acknowledge the JOB, the pipeline is probably stopped") }
137-154
: Nack: signal gating only after a successful requeue.If requeue fails, Done() was already called; Serial gating would advance with the record neither committed nor requeued.
func (i *Item) NackWithOptions(requeue bool, _ int) error { if atomic.LoadUint64(i.stopped) == 1 { return errors.Str("failed to NackWithOptions the JOB, the pipeline is probably stopped") } - if i.doneWg != nil { - i.doneWg.Done() - } - - if requeue { - err := i.Requeue(nil, 0) - if err != nil { - return err - } - } + if requeue { + if err := i.Requeue(nil, 0); err != nil { + return err + } + } + if i.doneWg != nil { + i.doneWg.Done() + } return nil }kafkajobs/config.go (1)
281-311
: ConsumePartitions option is computed but never applied.The constructed partitions map isn’t appended to opts; consuming specific partitions is effectively ignored.
if len(v) > 0 { kgoOff := make(map[int32]kgo.Offset, len(v)) for kk, vv := range v { switch vv.Type { @@ partitions[k] = kgoOff } - kgo.ConsumePartitions(partitions) + // apply the option + opts = append(opts, kgo.ConsumePartitions(partitions)) } }
🧹 Nitpick comments (15)
tests/php_test_files/jobs/jobs_ok_pq.php (1)
20-20
: Use spaces instead of tabs for indentation (PSR-12).Minor formatting nit to keep PHP files consistent.
- sleep(15); + sleep(15);schema.json (1)
135-143
: Clarify option semantics in description.Consider clarifying that Serial preserves per-partition in-order processing while FanOut may interleave within a partition for higher throughput. Current text “within a single partition” can be read ambiguously.
- "description": "The consumption strategy for messages within a single partition.", + "description": "Per-partition consumption strategy: 'Serial' preserves in-order processing within each partition; 'FanOut' maximizes throughput and may process records from the same partition concurrently.",tests/php_test_files/jobs/jobs_ok_log_item_rand_sleep.php (2)
7-12
: Remove unused import.JsonSerializer isn’t referenced.
use Spiral\RoadRunner\Jobs\Consumer; -use Spiral\RoadRunner\Jobs\Serializer\JsonSerializer;
19-29
: Optional: add a small jitter cap to reduce flakiness.If tests assert timing/order, consider narrowing sleep to 10–50ms to reduce tail latency variance.
tests/configs/.rr-kafka-serial-consumption.yaml (1)
1-74
: Add trailing newline to satisfy linters.YAMLlint flags missing final newline.
- consume: [ "test-consume-1", "test-consume-2" ] + consume: [ "test-consume-1", "test-consume-2" ] +tests/mock/logger.go (2)
20-51
: Hook-based capture drops structured fields; prefer teeing an observer Core.zap.Hooks only sees Entry, not Fields; tests inspecting structured fields won’t see them. Recommend teeing a capture Core with the console Core and dropping JSON parsing.
func ZapTestLogger(enab zapcore.LevelEnabler) (*ZapLoggerMock, *ObservedLogs) { - logs := &ObservedLogs{} - core := zapcore.NewCore( - zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), // or zapcore.NewConsoleEncoder for plain text - zapcore.AddSync(os.Stdout), - enab, - ) - - logger := zap.New( - core, - zap.Development(), - zap.Hooks(func(entry zapcore.Entry) error { - line := strings.TrimSpace(entry.Message) - var rawFields map[string]interface{} - var c []zapcore.Field - jsonStartIndex := strings.Index(line, "{") - if jsonStartIndex > 0 { - jsonStr := line[jsonStartIndex:] - _ = json.Unmarshal([]byte(jsonStr), &rawFields) - } - - for field, value := range rawFields { - c = append(c, zap.Any(field, value)) - } - - logs.add(LoggedEntry{entry, c}) - return nil - }), - ) - - return &ZapLoggerMock{logger}, logs + // observed core for tests + console core for stdout + obsCore, logs := New(enab) // if removed, reintroduce a minimal capturing core or reuse zaptest/observer + consoleCore := zapcore.NewCore( + zapcore.NewConsoleEncoder(zap.NewDevelopmentEncoderConfig()), + zapcore.AddSync(os.Stdout), + enab, + ) + logger := zap.New(zapcore.NewTee(consoleCore, obsCore), zap.Development()) + return &ZapLoggerMock{logger}, logs }If New(enab) was intentionally removed, I can inline a small capturing core here instead.
36-39
: In case you keep the hook, fix JSON start check.Index can be 0 when the message is pure JSON; current >0 skips parsing valid JSON.
- jsonStartIndex := strings.Index(line, "{") - if jsonStartIndex > 0 { + jsonStartIndex := strings.Index(line, "{") + if jsonStartIndex >= 0 {kafkajobs/config.go (1)
312-315
: LGTM on defaulting to FanOut; add defensive validation.Setting the default to FanOut is correct and matches the schema. Consider validating user-provided values early to fail fast on typos.
if c.ConsumerOpts.PipeliningStrategy == "" { c.ConsumerOpts.PipeliningStrategy = FanOutPipelining } + // defensive: reject unknown values early + switch c.ConsumerOpts.PipeliningStrategy { + case FanOutPipelining, SerialPipelining: + // ok + default: + return nil, errors.Errorf("unknown consumer pipelining_strategy: %s", c.ConsumerOpts.PipeliningStrategy) + }tests/jobs_kafka_test.go (4)
13-13
: Drop strconv import; simplify formatting.Use %d in Sprintf and remove the extra import.
-import "strconv" +// strconv no longer needed after payload formatting changeAnd in TestKafkaPipeliningStrategy:
- Payload: []byte(fmt.Sprintf("%v:%v", strconv.Itoa(topic), strconv.Itoa(i))), + Payload: []byte(fmt.Sprintf("%d:%d", topic, i)),
327-332
: Close RPC connection/client to avoid FD leaks.Add defers for conn and client.
conn, err := net.Dial("tcp", "127.0.0.1:6001") require.NoError(t, err) +defer func() { _ = conn.Close() }() client := rpc.NewClientWithCodec(goridgeRpc.NewClientCodec(conn)) +defer func() { _ = client.Close() }()
361-363
: Avoid fixed sleep; wait until logs show expected counts (less flaky).Replace the fixed 5s sleep with a bounded wait on observed logs.
-time.Sleep(time.Second * 5) +deadline := time.Now().Add(30 * time.Second) +for { + if oLogger.FilterMessageSnippet("php consumed:1:").Len() == 20 && + oLogger.FilterMessageSnippet("php consumed:2:").Len() == 20 && + oLogger.FilterMessageSnippet("php consumed:3:").Len() == 20 { + break + } + if time.Now().After(deadline) { + t.Fatal("timeout waiting for serial consumption logs") + } + time.Sleep(200 * time.Millisecond) +}
367-369
: Duplicate assertion; likely meant to assert “processed successfully”.The same assertion is repeated twice. Change the second to check processing.
-assert.Equal(t, 60, oLogger.FilterMessageSnippet("job was pushed successfully").Len()) -assert.Equal(t, 60, oLogger.FilterMessageSnippet("job was pushed successfully").Len()) +assert.Equal(t, 60, oLogger.FilterMessageSnippet("job was pushed successfully").Len()) +assert.Equal(t, 60, oLogger.FilterMessageSnippet("job was processed successfully").Len())kafkajobs/listener.go (3)
141-142
: Noisy warn: log only when there are fetch errors.This warns every poll cycle, even with no errors.
- d.log.Warn("handling fetch errors") + if len(errs) > 0 { + d.log.Warn("handling fetch errors") + }
165-167
: Improve error message for unknown strategy.Include the invalid value to aid diagnosis.
- default: - return errors.New("unknown consumer pipeliningStrategy") + default: + return errors.New("unknown consumer pipelining_strategy: " + string(d.cfg.ConsumerOpts.PipeliningStrategy))
175-182
: Use the same propagator for extract/inject to avoid global dependency.Prefer d.prop for both Extract and Inject for consistency.
-func (d *Driver) insertTracedItem(item *Item) { - ctxT, span := d.tracer.Tracer(tracerName).Start(otel.GetTextMapPropagator().Extract(context.Background(), propagation.HeaderCarrier(item.headers)), "kafka_listener") - d.prop.Inject(ctxT, propagation.HeaderCarrier(item.headers)) +func (d *Driver) insertTracedItem(item *Item) { + ctxT, span := d.tracer.Tracer(tracerName).Start( + d.prop.Extract(context.Background(), propagation.HeaderCarrier(item.headers)), + "kafka_listener", + ) + d.prop.Inject(ctxT, propagation.HeaderCarrier(item.headers)) d.pq.Insert(item) span.End() }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
go.work.sum
is excluded by!**/*.sum
📒 Files selected for processing (11)
kafkajobs/config.go
(1 hunks)kafkajobs/item.go
(4 hunks)kafkajobs/listener.go
(3 hunks)kafkajobs/opts.go
(1 hunks)schema.json
(1 hunks)tests/configs/.rr-kafka-serial-consumption.yaml
(1 hunks)tests/jobs_kafka_test.go
(10 hunks)tests/mock/logger.go
(2 hunks)tests/mock/observer.go
(0 hunks)tests/php_test_files/jobs/jobs_ok_log_item_rand_sleep.php
(1 hunks)tests/php_test_files/jobs/jobs_ok_pq.php
(1 hunks)
💤 Files with no reviewable changes (1)
- tests/mock/observer.go
🧰 Additional context used
🧬 Code graph analysis (6)
kafkajobs/config.go (1)
kafkajobs/opts.go (3)
ConsumerOpts
(127-135)PipeliningStrategy
(137-137)FanOutPipelining
(141-141)
kafkajobs/opts.go (1)
kafkajobs/driver.go (2)
FromPipeline
(145-256)mu
(32-57)
kafkajobs/item.go (1)
kafkajobs/driver.go (2)
d
(436-496)mu
(32-57)
kafkajobs/listener.go (3)
kafkajobs/opts.go (4)
ConsumerOpts
(127-135)PipeliningStrategy
(137-137)SerialPipelining
(140-140)FanOutPipelining
(141-141)kafkajobs/driver.go (8)
Driver
(32-57)FromPipeline
(145-256)d
(436-496)d
(258-283)d
(285-303)d
(506-513)d
(320-349)d
(351-389)kafkajobs/item.go (1)
Item
(20-39)
tests/jobs_kafka_test.go (2)
tests/mock/logger.go (1)
ZapTestLogger
(20-51)tests/helpers/helpers.go (3)
PausePipelines
(78-97)ResumePipes
(25-44)DestroyPipelines
(129-155)
tests/mock/logger.go (1)
tests/mock/observer.go (6)
ObservedLogs
(48-51)LoggedEntry
(33-36)LevelEnabler
(167-171)New
(159-165)co
(188-195)co
(180-186)
🪛 YAMLlint (1.37.1)
tests/configs/.rr-kafka-serial-consumption.yaml
[error] 74-74: no new line character at the end of file
(new-line-at-end-of-file)
🔇 Additional comments (5)
kafkajobs/opts.go (2)
132-135
: LGTM: config surface extended safely.Adding PipeliningStrategy to ConsumerOpts with proper tags looks good.
137-143
: LGTM: enum constants are clear and public.The string-backed type with explicit constants is appropriate for config binding and JSON.
tests/configs/.rr-kafka-serial-consumption.yaml (1)
55-73
: Nice: tests exercise Serial across multiple topics.Config correctly pins Serial and blocks rebalances; good coverage for the new option.
tests/jobs_kafka_test.go (1)
261-280
: Quicker test teardown via GracefulShutdownTimeout looks good.Reducing endure shutdown latency should speed up the suite without altering semantics.
Also applies to: 41-41, 151-151, 389-391, 495-497, 594-596, 693-695, 782-784, 922-924, 950-952
kafkajobs/listener.go (1)
143-168
: Validate WaitGroup completion and insert backpressure
- Ack()/NackWithOptions() each invoke doneWg.Done() exactly once when doneWg ≠ nil.
- Confirm insertTracedItem (and its underlying pq.Insert) cannot block indefinitely (e.g. uses buffered or non-blocking sends) to avoid serial‐pipeline deadlock.
e84a53c
to
739b396
Compare
739b396
to
19862e9
Compare
Hey @adamsnoah98 👋🏻 Thank you for the PR 👍🏻 |
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## master #523 +/- ##
=============================
=============================
☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds configurable pipelining strategies for Kafka job consumption to support message ordering while preserving existing performance characteristics. The default behavior remains unchanged (fan-out), but users can now opt for serial processing per partition to maintain Kafka's ordering guarantees.
Key changes:
- Added
pipelining_strategy
configuration option with "Serial" and "FanOut" modes - Implemented synchronization mechanisms for serial processing using WaitGroups
- Enhanced test infrastructure with improved logging hooks and graceful shutdown timeouts
Reviewed Changes
Copilot reviewed 11 out of 12 changed files in this pull request and generated 3 comments.
Show a summary per file
File | Description |
---|---|
kafkajobs/opts.go | Adds PipeliningStrategy type and configuration constants |
kafkajobs/config.go | Implements validation and default setting for pipelining strategy |
kafkajobs/listener.go | Core logic implementing serial vs fan-out consumption patterns |
kafkajobs/item.go | Enhanced Item struct with WaitGroup support for serial processing |
schema.json | JSON schema definition for new pipelining_strategy option |
tests/mock/logger.go | Improved test logging with JSON parsing hooks |
tests/jobs_kafka_test.go | Comprehensive test for pipelining strategies |
tests/configs/.rr-kafka-serial-consumption.yaml | Test configuration for serial consumption |
tests/php_test_files/jobs/jobs_ok_log_item_rand_sleep.php | PHP worker script with variable processing time |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
I could achieve ordering with one worker, but the messages are still parallelizable per-partition and per-topic. This preserves only the ordering kafka can offer (up to the extent of keeping the polling simple) without further bottlenecking the process. Conversely, yes the RD kafka driver does achieve this, but would need to happen completely out of process. Allowing this in the driver provides a single deployable process, with higher average utilization (especially since each PHP worker takes up an unfortunately large amount of memory). This PR adds utility to roadrunner, not something achievable nowhere else, providing a super simple single config to consume all the events relevant to a service:
|
I mean, rdkafka not in the RR worker. But just a usual PHP script, executed -> died, and so on. Regarding the PR. I understand what you're doing by adding a waitgroup + different method on fetch. WaitGroup looks logical here, but should be tested extensively to test all possible cases of calling Done on a possible 0 items waitgroup (because it'd result in a panic). You may also think about channel-based approach, because it'll support an early cancellation with some timeout + context-based cancellation handling. |
Yes, I wanted to get a draft to talk through the high level concept, before doing some refinement on the branch. I can take it out of draft after adding some additional tests + addressing the pool comments (I agree on both counts) if you are accepting of the new config option conceptually. |
@adamsnoah98 Yeah, I have 0 concerns about configuration, good job 👍🏻 Conceptually, everything else is also looking good to me. |
…in serial pipelining Reuse waitgroups within listener to reduce allocations
1ff5d46
to
7153500
Compare
…on consumption
Reason for This PR
Partitions are kafka's primary mechanism to allow message ordering. Roadrunner's Jobs plugin practices fan out via a priority queue. The same-priority instability in this queue and the fluctuating latency in PHP workers handling jobs undermine this attribute of partitions. This PR adds configuration to the driver which allows synchronizing job creation for each consumed topic. This comes at the cost of throughput, which is why the default behaviour has been left as is, and because not all topics are order sensitive. Multiple kafka driver pipelines may be used if the service has both order-sensitive and order-insensitive topics to consume.
As is, this change also synchronizes the fetches to the slowest consumed partition for simplicity. I chose to value this simplicity, rather than trying to manage multiple layers of queuing or interacting more with the committed offsets. If we want more control here, I think including
FetchMaxPartitionBytes
configuration and bumpingmaxPollRecords
is the simple improvement here.Description of Changes
consumer_options.pipelining_strategy
option.FanOut
preserves existing eager queuing behaviour, whileSerial
synchronizes consumption of.License Acceptance
By submitting this pull request, I confirm that my contribution is made under
the terms of the MIT license.
PR Checklist
[Reviewer TODO: Verify that these criteria are met. Request changes if not]
git commit -s
).CHANGELOG.md
.Summary by CodeRabbit
New Features
Configuration
Tests