Skip to content

Commit 5254156

Browse files
authored
Fix eventhub subscription for basic tier(#2352)
1 parent 21ed961 commit 5254156

File tree

3 files changed

+213
-55
lines changed

3 files changed

+213
-55
lines changed

docs/advanced-guide/using-publisher-subscriber/page.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,7 +344,7 @@ Azure Event Hub is supported as an external PubSub provider such that if you are
344344
Import the external driver for `eventhub` using the following command.
345345

346346
```bash
347-
go get gofr.dev/pkg/gofr/datasources/pubsub/eventhub
347+
go get gofr.dev/pkg/gofr/datasource/pubsub/eventhub
348348
```
349349

350350
Use the `AddPubSub` method of GoFr's app to connect
@@ -359,6 +359,7 @@ app := gofr.New()
359359
StorageServiceURL: "https://gofrdev.windows.net/",
360360
StorageContainerName: "test",
361361
EventhubName: "test1",
362+
ConsumerGroup: "$Default",
362363
}))
363364
```
364365

pkg/gofr/datasource/pubsub/eventhub/eventhub.go

Lines changed: 201 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@ package eventhub
33
import (
44
"context"
55
"errors"
6+
"fmt"
7+
"os"
68
"strings"
79
"time"
810

11+
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
912
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
1013
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
1114
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
@@ -25,7 +28,12 @@ var (
2528
errEmptyTopic = errors.New("topic name cannot be empty")
2629
)
2730

28-
const defaultQueryTimeout = 30 * time.Second
31+
const (
32+
defaultQueryTimeout = 30 * time.Second
33+
eventHubPropsTimeout = 2 * time.Second
34+
basicTierMaxPartitions = 2
35+
basicTierReceiveTimeout = 3 * time.Second
36+
)
2937

3038
type Config struct {
3139
ConnectionString string
@@ -128,12 +136,19 @@ func (c *Client) UseTracer(tracer any) {
128136
}
129137
}
130138

131-
// Connect establishes a connection to Cassandra and registers metrics using the provided configuration when the client was Created.
139+
// Connect establishes a connection to Event Hub and registers metrics using the provided configuration when the client was Created.
132140
func (c *Client) Connect() {
133141
if !c.validConfigs(c.cfg) {
134142
return
135143
}
136144

145+
if c.cfg.ConsumerGroup == "" {
146+
c.cfg.ConsumerGroup = c.generateUniqueConsumerGroup()
147+
c.logger.Debugf("Auto-generated consumer group: %s", c.cfg.ConsumerGroup)
148+
} else {
149+
c.logger.Debugf("Using provided consumer group: %s", c.cfg.ConsumerGroup)
150+
}
151+
137152
c.logger.Debug("Event Hub connection started using connection string")
138153

139154
producerClient, err := azeventhubs.NewProducerClientFromConnectionString(c.cfg.ConnectionString,
@@ -205,6 +220,28 @@ func (c *Client) Connect() {
205220
c.producer = producerClient
206221
c.consumer = consumerClient
207222
c.checkPoint = checkpointStore
223+
224+
c.logger.Debug("Event Hub client initialization complete")
225+
}
226+
227+
// generateUniqueConsumerGroup creates a unique consumer group for this instance.
228+
func (*Client) generateUniqueConsumerGroup() string {
229+
// Try environment variables first (Kubernetes/Docker)
230+
if podName := os.Getenv("POD_NAME"); podName != "" {
231+
return fmt.Sprintf("gofr-%s", podName)
232+
}
233+
234+
if hostname := os.Getenv("HOSTNAME"); hostname != "" {
235+
return fmt.Sprintf("gofr-%s", hostname)
236+
}
237+
238+
// Fallback to system hostname + process ID.
239+
if hostname, err := os.Hostname(); err == nil {
240+
return fmt.Sprintf("gofr-%s-%d", hostname, os.Getpid())
241+
}
242+
243+
// Final fallback.
244+
return fmt.Sprintf("gofr-instance-%d", time.Now().UnixNano())
208245
}
209246

210247
// Subscribe checks all partitions for the first available event and returns it.
@@ -213,85 +250,185 @@ func (c *Client) Subscribe(ctx context.Context, topic string) (*pubsub.Message,
213250
return nil, errClientNotConnected
214251
}
215252

216-
var (
217-
msg *pubsub.Message
218-
err error
219-
)
253+
// Try processor approach first
254+
partitionClient := c.processor.NextPartitionClient(ctx)
255+
if partitionClient != nil {
256+
return c.processEventsFromPartitionClient(ctx, topic, partitionClient)
257+
}
258+
259+
// Fallback to direct consumer approach if processor doesn't have partition clients ready
260+
return c.subscribeDirectFromConsumer(ctx, topic)
261+
}
262+
263+
// processEventsFromPartitionClient processes events using the processor partition client.
264+
func (c *Client) processEventsFromPartitionClient(ctx context.Context, topic string,
265+
partitionClient *azeventhubs.ProcessorPartitionClient) (*pubsub.Message, error) {
266+
defer closePartitionResources(ctx, partitionClient)
267+
268+
timeout := c.getReceiveTimeout()
220269

221-
// for each partition in the event hub, create a partition client with processEvents as the function to process events
222-
for {
223-
partitionClient := c.processor.NextPartitionClient(ctx)
270+
receiveCtx, cancel := context.WithTimeout(ctx, timeout)
271+
defer cancel()
224272

225-
if partitionClient == nil {
226-
break
273+
c.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_total_count", "topic", topic, "subscription_name", partitionClient.PartitionID())
274+
275+
start := time.Now()
276+
277+
// ReceiveEvents signature: ReceiveEvents(ctx context.Context, count int, options *ReceiveEventsOptions) ([]*ReceivedEventData, error)
278+
// Note: ReceiveEventsOptions is nil for default behavior.
279+
events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil)
280+
if err != nil {
281+
if !errors.Is(err, context.DeadlineExceeded) {
282+
c.logger.Debugf("Error receiving events from partition %s: %v", partitionClient.PartitionID(), err)
227283
}
228284

229-
c.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_total_count", "topic", topic, "subscription_name", partitionClient.PartitionID())
285+
return nil, nil
286+
}
287+
288+
if len(events) == 0 {
289+
return nil, nil
290+
}
291+
292+
// Create message from the first event
293+
msg := pubsub.NewMessage(ctx)
294+
msg.Value = events[0].Body
295+
msg.Committer = &Message{
296+
event: events[0],
297+
processor: partitionClient,
298+
logger: c.logger,
299+
}
300+
msg.Topic = topic
301+
msg.MetaData = events[0].EventData
230302

231-
start := time.Now()
303+
end := time.Since(start)
304+
c.logger.Debug(&Log{
305+
Mode: "SUB",
306+
MessageValue: strings.Join(strings.Fields(string(msg.Value)), " "),
307+
Topic: topic,
308+
Host: c.cfg.EventhubName + ":" + c.cfg.ConsumerGroup + ":" + partitionClient.PartitionID(),
309+
PubSubBackend: "EVHUB",
310+
Time: end.Microseconds(),
311+
})
232312

233-
select {
234-
case <-ctx.Done():
235-
return nil, nil
236-
default:
237-
msg, err = c.processEvents(ctx, partitionClient)
238-
if errors.Is(err, ErrNoMsgReceived) {
239-
// If no message is received, we don't achieve anything by returning error rather check in a different partition.
240-
// This logic may change if we remove the timeout while receiving a message. However, waiting on just one partition
241-
// might lead to missing data, so spawning one go-routine or having a worker pool can be an option to do this operation faster.
242-
continue
243-
}
313+
c.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_success_count", "topic", topic, "subscription_name", partitionClient.PartitionID())
244314

245-
end := time.Since(start)
315+
return msg, nil
316+
}
246317

247-
c.logger.Debug(&Log{
248-
Mode: "SUB",
249-
MessageValue: strings.Join(strings.Fields(string(msg.Value)), " "),
250-
Topic: topic,
251-
Host: c.cfg.EventhubName + ":" + c.cfg.ConsumerGroup + ":" + partitionClient.PartitionID(),
252-
PubSubBackend: "EVHUB",
253-
Time: end.Microseconds(),
254-
})
318+
// subscribeDirectFromConsumer uses consumer client directly as fallback.
319+
func (c *Client) subscribeDirectFromConsumer(ctx context.Context, topic string) (*pubsub.Message, error) {
320+
// Get partition information
321+
props, err := c.consumer.GetEventHubProperties(ctx, nil)
322+
if err != nil {
323+
c.logger.Errorf("Failed to get Event Hub properties: %v", err)
324+
return nil, err
325+
}
255326

256-
c.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_success_count", "topic", topic, "subscription_name", partitionClient.PartitionID())
327+
// Try each partition for available messages - use LATEST to avoid old messages
328+
for _, partitionID := range props.PartitionIDs {
329+
msg, err := c.tryReadFromPartition(ctx, partitionID, topic)
330+
if err != nil {
331+
c.logger.Debugf("Error reading from partition %s: %v", partitionID, err)
332+
continue
333+
}
257334

335+
if msg != nil {
258336
return msg, nil
259337
}
260338
}
261339

262340
return nil, nil
263341
}
264342

265-
func (*Client) processEvents(ctx context.Context, partitionClient *azeventhubs.ProcessorPartitionClient) (*pubsub.Message, error) {
266-
defer closePartitionResources(ctx, partitionClient)
343+
// tryReadFromPartition attempts to read a single message from specified partition.
344+
func (c *Client) tryReadFromPartition(ctx context.Context, partitionID, topic string) (*pubsub.Message, error) {
345+
// Create partition client for direct read with LATEST position to avoid old messages.
346+
partitionClient, err := c.consumer.NewPartitionClient(partitionID, &azeventhubs.PartitionClientOptions{
347+
StartPosition: azeventhubs.StartPosition{
348+
Latest: to.Ptr(true), // Use Latest to only get new messages
349+
},
350+
})
267351

268-
receiveCtx, receiveCtxCancel := context.WithTimeout(ctx, time.Second)
269-
events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil)
352+
if err != nil {
353+
return nil, err
354+
}
355+
356+
defer partitionClient.Close(ctx)
357+
358+
timeout := c.getReceiveTimeout()
359+
360+
receiveCtx, cancel := context.WithTimeout(ctx, timeout)
361+
defer cancel()
270362

271-
receiveCtxCancel()
363+
c.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_total_count", "topic", topic, "subscription_name", partitionID)
364+
365+
start := time.Now()
366+
367+
events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil)
272368

273369
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
274370
return nil, err
275371
}
276372

277373
if len(events) == 0 {
278-
return nil, ErrNoMsgReceived
374+
return nil, nil // No message available in this partition
279375
}
280376

377+
// Create message from event
281378
msg := pubsub.NewMessage(ctx)
282379

283380
msg.Value = events[0].Body
284381
msg.Committer = &Message{
285382
event: events[0],
286-
processor: partitionClient,
383+
processor: nil, // Not using processor for direct reads
384+
logger: c.logger,
287385
}
288-
289-
msg.Topic = partitionClient.PartitionID()
386+
msg.Topic = topic
290387
msg.MetaData = events[0].EventData
291388

389+
end := time.Since(start)
390+
c.logger.Debug(&Log{
391+
Mode: "SUB",
392+
MessageValue: strings.Join(strings.Fields(string(msg.Value)), " "),
393+
Topic: topic,
394+
Host: c.cfg.EventhubName + ":" + c.cfg.ConsumerGroup + ":" + partitionID,
395+
PubSubBackend: "EVHUB",
396+
Time: end.Microseconds(),
397+
})
398+
399+
c.metrics.IncrementCounter(ctx, "app_pubsub_subscribe_success_count", "topic", topic, "subscription_name", partitionID)
400+
292401
return msg, nil
293402
}
294403

404+
// getReceiveTimeout returns appropriate timeout based on Event Hub characteristics.
405+
func (c *Client) getReceiveTimeout() time.Duration {
406+
// Check if this might be basic tier by examining partition count
407+
if c.isLikelyBasicTier() {
408+
return basicTierReceiveTimeout
409+
}
410+
411+
return time.Second
412+
}
413+
414+
// isLikelyBasicTier detects basic tier characteristics.
415+
func (c *Client) isLikelyBasicTier() bool {
416+
if c.consumer == nil {
417+
return false
418+
}
419+
420+
ctx, cancel := context.WithTimeout(context.Background(), eventHubPropsTimeout)
421+
defer cancel()
422+
423+
props, err := c.consumer.GetEventHubProperties(ctx, nil)
424+
if err != nil {
425+
return false // Default to standard behavior on error
426+
}
427+
428+
// Basic tier typically has fewer partitions
429+
return len(props.PartitionIDs) <= basicTierMaxPartitions
430+
}
431+
295432
func closePartitionResources(ctx context.Context, partitionClient *azeventhubs.ProcessorPartitionClient) {
296433
partitionClient.Close(ctx)
297434
}
@@ -401,18 +538,31 @@ func (c *Client) GetEventHubName() string {
401538
return c.cfg.EventhubName
402539
}
403540

541+
// Close safely closes all Event Hub clients and resources.
404542
func (c *Client) Close() error {
405-
err := c.producer.Close(context.Background())
406-
if err != nil {
407-
c.logger.Errorf("failed to close Event Hub producer %v", err)
543+
var lastErr error
544+
545+
// Close producer if it exists
546+
if c.producer != nil {
547+
if err := c.producer.Close(context.Background()); err != nil {
548+
c.logger.Errorf("failed to close Event Hub producer: %v", err)
549+
lastErr = err
550+
}
408551
}
409552

410-
err = c.consumer.Close(context.Background())
411-
if err != nil {
412-
c.logger.Errorf("failed to close Event Hub consumer %v", err)
553+
// Close consumer if it exists
554+
if c.consumer != nil {
555+
if err := c.consumer.Close(context.Background()); err != nil {
556+
c.logger.Errorf("failed to close Event Hub consumer: %v", err)
557+
lastErr = err
558+
}
413559
}
414560

415-
c.processorCtx()
561+
// Cancel processor context if it exists
562+
if c.processorCtx != nil {
563+
c.processorCtx()
564+
c.logger.Debug("Event Hub processor context canceled")
565+
}
416566

417-
return err
567+
return lastErr
418568
}

pkg/gofr/datasource/pubsub/eventhub/message.go

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,15 @@ type Message struct {
1414

1515
func (a *Message) Commit() {
1616
// Update the checkpoint with the latest event received
17-
err := a.processor.UpdateCheckpoint(context.Background(), a.event, nil)
18-
if err != nil {
19-
a.logger.Errorf("failed to acknowledge event with eventID %v", a.event.MessageID)
17+
if a.processor != nil {
18+
err := a.processor.UpdateCheckpoint(context.Background(), a.event, nil)
19+
if err != nil {
20+
a.logger.Errorf("failed to acknowledge event with eventID %v: %v", a.event.MessageID, err)
21+
return
22+
}
23+
24+
a.logger.Debugf("Message committed via processor checkpoint (MessageID: %v)", a.event.MessageID)
25+
} else {
26+
a.logger.Debugf("Message acknowledged (direct read mode)")
2027
}
2128
}

0 commit comments

Comments
 (0)