Skip to content

Commit

Permalink
added broker rebuilding
Browse files Browse the repository at this point in the history
  • Loading branch information
VarthanV committed Dec 1, 2024
1 parent 1a06264 commit ba299f1
Showing 1 changed file with 134 additions and 51 deletions.
185 changes: 134 additions & 51 deletions broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/gorilla/websocket"
"github.com/sirupsen/logrus"
"gorm.io/gorm"
"gorm.io/gorm/clause"

"github.com/VarthanV/pub-sub/exchange"
"github.com/VarthanV/pub-sub/queue"
Expand Down Expand Up @@ -68,14 +69,17 @@ func New(db *gorm.DB, cfg *config.Config) Broker {
cfg.WorkerConfig.MaxWorkersAllowedConcurrentlyForRealtimeUpdates),
cfg: cfg,
}

return b
}

// Start starts the instance of a new broker , advisable to
// pass a context with timeout to meet the expectations
func (b *broker) Start(ctx context.Context) error {
var (
errorChan = make(chan error, 1)
isStaredChan = make(chan bool, 1)
startDeadline <-chan time.Time
errorChan = make(chan error, 1) // will exit on first error occured
isStaredChan = make(chan bool, 1)
wg sync.WaitGroup
)

if b.currentState.Load() == int32(BrokerStateRunning) {
Expand All @@ -84,12 +88,13 @@ func (b *broker) Start(ctx context.Context) error {
logrus.Fatal("broker already running")
}

// Start routine
go func() {
defer close(isStaredChan)
defer close(errorChan)
b.db = b.db.WithContext(ctx)

// Rebuild broker
wg.Add(2)

// Start configuing semaphores
go func() {
defer wg.Done()

// Setup tokens in sem for configuring workers
for i := 0; i <
Expand All @@ -101,6 +106,61 @@ func (b *broker) Start(ctx context.Context) error {
b.syncToDbSem <- struct{}{}
}

}()

// Rebuild broker
go func() {
logrus.Info("Rebuilding broker")
var (
exchanges []models.Exchange
)

b.currentState.Store(BrokerStateSettingUp)
defer wg.Done()
// Get all exchanges
err := b.db.Model(&models.Exchange{}).
Preload("Bindings.Queues"). // Preload both Bindings and associated Queues
Where(&models.Exchange{}).
Find(&exchanges).Error

if err != nil {
logrus.Error("error in getting exchanges from db: ", err)
errorChan <- err
return
}

// Iterate over the exchanges and configure them
for _, exchange := range exchanges {
// Create the exchange
if err := b.CreateExchange(ctx, exchange.Name, exchange.ExchangeType); err != nil {
logrus.Errorf("error creating exchange %s: %v", exchange.Name, err)
errorChan <- err
continue
}

// Iterate over bindings for the exchange
for _, binding := range exchange.Bindings {
for _, queue := range binding.Queues {
// Create the queue
if err := b.CreateQueue(ctx, queue.Name, queue.Durable); err != nil {
logrus.Errorf("error creating queue %s: %v", queue.Name, err)
errorChan <- err
continue
}

// Bind the queue to the exchange
if err := b.BindQueue(ctx, queue.Name, exchange.Name, binding.Key); err != nil {
logrus.Errorf("error binding queue %s to exchange %s with key %s: %v",
queue.Name, exchange.Name, binding.Key, err)
errorChan <- err
}
}
}
}
}()

go func() {
wg.Wait()
isStaredChan <- true
}()

Expand Down Expand Up @@ -157,16 +217,6 @@ func (b *broker) Start(ctx context.Context) error {
}
}()

// See if deadline already there in the context else we derive a custom deadline for starting
// and make it done

_, ok := ctx.Deadline()
if !ok {
// FIXME: take the deadline from config
logrus.Infof("deadline not enforced in base ctx ,setting deadline for %d minutes", 1)
startDeadline = time.After(time.Minute * 1)
}

for {
select {
case e, ok := <-errorChan:
Expand All @@ -186,10 +236,6 @@ func (b *broker) Start(ctx context.Context) error {
logrus.Error("context done")
return fmt.Errorf("context done")

case <-startDeadline:
logrus.Error("start deadline exceeded")
return fmt.Errorf("start deadline exceeded")

}
}
}
Expand All @@ -206,13 +252,16 @@ func (b *broker) CreateExchange(ctx context.Context, name string, exchangeType e

b.exchanges[name] = exchange.New(name, exchangeType)
logrus.Info("Created exchange ", name)
err := b.db.WithContext(ctx).Create(&models.Exchange{
Name: name,
ExchangeType: exchangeType,
}).Error
if err != nil {
logrus.Error("error in creating exchange ", err)
return err

if b.currentState.Load() != BrokerStateSettingUp {
err := b.db.Clauses(clause.OnConflict{DoNothing: true}).Create(&models.Exchange{
Name: name,
ExchangeType: exchangeType,
}).Error
if err != nil {
logrus.Error("error in creating exchange ", err)
return err
}
}
return nil
}
Expand All @@ -229,21 +278,24 @@ func (b *broker) CreateQueue(ctx context.Context, name string, durable bool) err
}
b.queues[name] = queue.New(name, durable)
logrus.Info("Created queue ", name)
err := b.db.WithContext(ctx).Create(&models.Queue{
Name: name,
Durable: durable,
}).Error
if err != nil {
logrus.Error("error in creating queue ", err)
return err
if b.currentState.Load() != BrokerStateSettingUp {
err := b.db.Clauses(clause.OnConflict{DoNothing: true}).Create(&models.Queue{
Name: name,
Durable: durable,
}).Error
if err != nil {
logrus.Error("error in creating queue ", err)
return err
}
}
return nil
}

// BindQueue binds a queue to an exchange by the given binding key
// BindQueue binds a queue to an exchange by the given binding key
func (b *broker) BindQueue(ctx context.Context, queueName, exchangeName, bindingKey string) error {
b.mu.Lock()
defer b.mu.Unlock()

exchange, exists := b.exchanges[exchangeName]
if !exists {
errors.Handle(errors.ErrExchangeDoesnotExist)
Expand Down Expand Up @@ -273,22 +325,53 @@ func (b *broker) BindQueue(ctx context.Context, queueName, exchangeName, binding
}
bi.Queues = append(bi.Queues, q)

// Get queue with the name
modelQ := &models.Queue{}
if b.currentState.Load() != BrokerStateSettingUp {
// Fetch the exchange from the database to ensure its ID is available
var exchangeModel models.Exchange
if err := b.db.Where("name = ?", exchangeName).First(&exchangeModel).Error; err != nil {
logrus.Errorf("Error fetching exchange %s: %v", exchangeName, err)
return err
}

err := b.db.Where(&models.Queue{Name: queueName}).
Last(&modelQ).Error
if err != nil {
logrus.Error("error in getting q ", err)
return err
}
// Fetch the queue from the database to ensure its ID is available
var queueModel models.Queue
if err := b.db.Where("name = ?", queueName).First(&queueModel).Error; err != nil {
logrus.Errorf("Error fetching queue %s: %v", queueName, err)
return err
}

err = b.db.Create(&models.Binding{
Key: bindingKey,
ExchangeName: exchangeName,
Queues: []models.Queue{{Base: models.Base{ID: modelQ.ID}}},
}).Error
return err
// Create the binding in the database
binding := models.Binding{
Key: bindingKey,
ExchangeName: exchangeName,
Queues: []models.Queue{{Base: models.Base{ID: queueModel.ID}}},
}

tx := b.db.Begin()
if err := tx.Clauses(clause.OnConflict{DoNothing: true}).Create(&binding).Error; err != nil {
logrus.Errorf("Error creating binding: %v", err)
tx.Rollback()
return err
}

// Associate the binding to the exchange
if err := tx.Model(&exchangeModel).
Association("Bindings").
Append(&binding); err != nil {
logrus.Errorf("Error associating binding to exchange %s: %v", exchangeName, err)
tx.Rollback()
return err
}

// Commit the transaction
if err := tx.Commit().Error; err != nil {
logrus.Errorf("Error committing transaction: %v", err)
return err
}

}
logrus.Infof("Successfully bound queue %s to exchange %s with key %s", queueName, exchangeName, bindingKey)
return nil
}

// Subscribe subscribes a websocket connection to a topic for exchange of realtime updates.
Expand Down

0 comments on commit ba299f1

Please sign in to comment.