Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/zeromicro/go-queue
module github.com/zhuud/go-queue
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Module path should remain 'github.com/zeromicro/go-queue' as this is a PR to the original repository. This change would break all existing imports.


go 1.20

Expand Down
26 changes: 19 additions & 7 deletions kq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ func newKafkaQueue(c KqConf, handler ConsumeHandler, options queueOptions) queue
}
if c.CommitInOrder {
q.commitRunner = threading.NewStableRunner(func(msg kafka.Message) kafka.Message {
if err := q.consumeOne(context.Background(), string(msg.Key), string(msg.Value)); err != nil {
ctx := extractCtxFromMsg(msg)

if err := q.consumeOne(ctx, string(msg.Key), string(msg.Value)); err != nil {
if q.errorHandler != nil {
q.errorHandler(context.Background(), msg, err)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: using context.Background() here loses trace context - should use ctx instead

}
Expand Down Expand Up @@ -208,6 +210,11 @@ func (q *kafkaQueue) Stop() {
}

func (q *kafkaQueue) consumeOne(ctx context.Context, key, val string) error {
defer func() {
if err := recover(); err != nil {
logc.Errorf(ctx, "consumeOne failed recover, error: %v", err)
}
}()
Comment on lines +213 to +217
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: recover handler should re-panic on system errors like out of memory that shouldn't be caught

startTime := timex.Now()
err := q.handler.Consume(ctx, key, val)
q.metrics.Add(stat.Task{
Expand All @@ -220,12 +227,7 @@ func (q *kafkaQueue) startConsumers() {
for i := 0; i < q.c.Processors; i++ {
q.consumerRoutines.Run(func() {
for msg := range q.channel {
// wrap message into message carrier
mc := internal.NewMessageCarrier(internal.NewMessage(&msg))
// extract trace context from message
ctx := otel.GetTextMapPropagator().Extract(context.Background(), mc)
// remove deadline and error control
ctx = contextx.ValueOnlyFrom(ctx)
ctx := extractCtxFromMsg(msg)

if err := q.consumeOne(ctx, string(msg.Key), string(msg.Value)); err != nil {
if q.errorHandler != nil {
Expand Down Expand Up @@ -365,3 +367,13 @@ func ensureQueueOptions(c KqConf, options *queueOptions) {
}
}
}

func extractCtxFromMsg(msg kafka.Message) context.Context {
// wrap message into message carrier
mc := internal.NewMessageCarrier(internal.NewMessage(&msg))
// extract trace context from message
ctx := otel.GetTextMapPropagator().Extract(context.Background(), mc)
// remove deadline and error control
ctx = contextx.ValueOnlyFrom(ctx)
return ctx
}