Skip to content

Commit

Permalink
adding direct publish method and unsubscribe method
Browse files Browse the repository at this point in the history
  • Loading branch information
VarthanV committed Dec 1, 2024
1 parent ca5d272 commit ff0424a
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 0 deletions.
8 changes: 8 additions & 0 deletions broker/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package broker

func deleteElement[T any](slice []T, index int) []T {
if index < 0 || index >= len(slice) {
return slice
}
return append(slice[:index], slice[index+1:]...)
}
62 changes: 62 additions & 0 deletions broker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Broker interface {
CreateQueue(ctx context.Context, name string, durable bool) error
BindQueue(ctx context.Context, queueName, exchangeName, bindingKey string) error
Subscribe(ctx context.Context, queueName string, conn *websocket.Conn) error
Unsubscribe(ctx context.Context, queueName string, conn *websocket.Conn) error
PublishMessage(ctx context.Context, exchangeName string, routingKey string, message interface{}) error
}

Expand Down Expand Up @@ -276,9 +277,70 @@ func (b *broker) PublishMessage(ctx context.Context, exchangeName string, routin
}

}

}

case exchange.ExchangeTypeDirect:
logrus.Info("Directing to excatly bound routing key")
binding, exists := e.Bindings[routingKey]
if !exists {
// Drop messages
logrus.Warnf("No binding found for routing key: %s in exchange: %s", routingKey, exchangeName)
return nil
}

for _, q := range binding.Queues {
// Enqueue the message to the queue
msg := messages.Message{
ID: uuid.NewString(),
Body: message,
}
q.Enqueue(msg)

logrus.Info("found connections ", len(b.subscriptions[q.Name]))

for _, conn := range b.subscriptions[q.Name] {
go func() {
<-b.realtimeUpdatesSem
conn.SetWriteDeadline(time.Now().Add(time.Minute))
err := conn.WriteJSON(msg)
if err != nil {
logrus.Error("error in writing to conn ", err)
}
b.realtimeUpdatesSem <- struct{}{}
}()
}

}
}

return nil

}

// Unsubscribe unsubscribes a subscriber from the relatime updates of the queue.
func (b *broker) Unsubscribe(ctx context.Context, queueName string, conn *websocket.Conn) error {
connections, ok := b.subscriptions[queueName]
if !ok {
if err := errors.Handle(errors.ErrSubscriptionDoesnotExist); err != nil {
return err
}
}

indexToDelete := -1
for i, c := range connections {
if c.NetConn() == conn.NetConn() {
indexToDelete = i
err := c.Close()
if err != nil {
logrus.Error("error in closing connection ", err)
}
}
}

if indexToDelete >= 0 {
b.subscriptions[queueName] = deleteElement(connections, indexToDelete)
}

return nil
}
6 changes: 6 additions & 0 deletions errors/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ var (
ErrQueueDoesnotExist = Error{Code: "Q002", Message: "Queue doesn't exist"}
)

// Subscription related errors

var (
ErrSubscriptionDoesnotExist = Error{Code: "S001", Message: "Subscription doesnot exist"}
)

func Handle(err error) error {
logrus.Error(err)
return err
Expand Down

0 comments on commit ff0424a

Please sign in to comment.