Skip to content

Commit

Permalink
refactor: align naming with mqtt terminology
Browse files Browse the repository at this point in the history
  • Loading branch information
neurosnap committed Oct 3, 2024
1 parent 242caa6 commit ebd8a69
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 39 deletions.
24 changes: 12 additions & 12 deletions connector.go → broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@ import (
"github.com/antoniomika/syncmap"
)

type Connector interface {
type Broker interface {
GetChannels() iter.Seq2[string, *Channel]
GetClients() iter.Seq2[string, *Client]
Connect(*Client, []*Channel) (error, error)
}

type BaseConnector struct {
type BaseBroker struct {
Channels *syncmap.Map[string, *Channel]
}

func (b *BaseConnector) Cleanup() {
func (b *BaseBroker) Cleanup() {
toRemove := []string{}
for _, channel := range b.GetChannels() {
count := 0
Expand All @@ -31,7 +31,7 @@ func (b *BaseConnector) Cleanup() {

if count == 0 {
channel.Cleanup()
toRemove = append(toRemove, channel.ID)
toRemove = append(toRemove, channel.Topic)
}
}

Expand All @@ -40,25 +40,25 @@ func (b *BaseConnector) Cleanup() {
}
}

func (b *BaseConnector) GetChannels() iter.Seq2[string, *Channel] {
func (b *BaseBroker) GetChannels() iter.Seq2[string, *Channel] {
return b.Channels.Range
}

func (b *BaseConnector) GetClients() iter.Seq2[string, *Client] {
func (b *BaseBroker) GetClients() iter.Seq2[string, *Client] {
return func(yield func(string, *Client) bool) {
for _, channel := range b.GetChannels() {
channel.Clients.Range(yield)
}
}
}

func (b *BaseConnector) Connect(client *Client, channels []*Channel) (error, error) {
func (b *BaseBroker) Connect(client *Client, channels []*Channel) (error, error) {
for _, channel := range channels {
dataChannel := b.ensureChannel(channel)
dataChannel.Clients.Store(client.ID, client)
client.Channels.Store(dataChannel.ID, dataChannel)
client.Channels.Store(dataChannel.Topic, dataChannel)
defer func() {
client.Channels.Delete(channel.ID)
client.Channels.Delete(channel.Topic)
dataChannel.Clients.Delete(client.ID)

client.Cleanup()
Expand Down Expand Up @@ -186,10 +186,10 @@ func (b *BaseConnector) Connect(client *Client, channels []*Channel) (error, err
return inputErr, outputErr
}

func (b *BaseConnector) ensureChannel(channel *Channel) *Channel {
dataChannel, _ := b.Channels.LoadOrStore(channel.ID, channel)
func (b *BaseBroker) ensureChannel(channel *Channel) *Channel {
dataChannel, _ := b.Channels.LoadOrStore(channel.Topic, channel)
dataChannel.Handle()
return dataChannel
}

var _ Connector = (*BaseConnector)(nil)
var _ Broker = (*BaseBroker)(nil)
6 changes: 3 additions & 3 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ type ChannelMessage struct {
Action ChannelAction
}

func NewChannel(name string) *Channel {
func NewChannel(topic string) *Channel {
return &Channel{
ID: name,
Topic: topic,
Done: make(chan struct{}),
Data: make(chan ChannelMessage),
Clients: syncmap.New[string, *Client](),
}
}

type Channel struct {
ID string
Topic string
Done chan struct{}
Data chan ChannelMessage
Clients *syncmap.Map[string, *Client]
Expand Down
12 changes: 6 additions & 6 deletions cmd/authorized_keys/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,12 +109,12 @@ func main() {
keyPath := GetEnv("SSH_AUTHORIZED_KEYS", "./ssh_data/authorized_keys")
cfg := &pubsub.Cfg{
Logger: logger,
PubSub: &pubsub.PubSubMulticast{
Logger: logger,
Connector: &pubsub.BaseConnector{
PubSub: pubsub.NewMulticast(
&pubsub.BaseBroker{
Channels: syncmap.New[string, *pubsub.Channel](),
},
},
logger,
),
}

s, err := wish.NewServer(
Expand Down Expand Up @@ -150,9 +150,9 @@ func main() {
select {
case <-time.After(5 * time.Second):
for _, channel := range cfg.PubSub.GetChannels() {
slog.Info("channel online", slog.Any("channel", channel.ID))
slog.Info("channel online", slog.Any("channel topic", channel.Topic))
for _, client := range channel.GetClients() {
slog.Info("client online", slog.Any("channel", channel.ID), slog.Any("client", client.ID), slog.String("direction", client.Direction.String()))
slog.Info("client online", slog.Any("channel topic", channel.Topic), slog.Any("client", client.ID), slog.String("direction", client.Direction.String()))
}
}
case <-done:
Expand Down
29 changes: 18 additions & 11 deletions multicast.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@ import (
"log/slog"
)

type PubSubMulticast struct {
Connector
type Multicast struct {
Broker
Logger *slog.Logger
}

func (p *PubSubMulticast) getClients(direction ChannelDirection) iter.Seq2[string, *Client] {
func NewMulticast(broker Broker, logger *slog.Logger) *Multicast {
return &Multicast{
Broker: broker,
Logger: logger,
}
}

func (p *Multicast) getClients(direction ChannelDirection) iter.Seq2[string, *Client] {
return func(yield func(string, *Client) bool) {
for clientID, client := range p.GetClients() {
if client.Direction == direction {
Expand All @@ -23,19 +30,19 @@ func (p *PubSubMulticast) getClients(direction ChannelDirection) iter.Seq2[strin
}
}

func (p *PubSubMulticast) GetPipes() iter.Seq2[string, *Client] {
func (p *Multicast) GetPipes() iter.Seq2[string, *Client] {
return p.getClients(ChannelDirectionInputOutput)
}

func (p *PubSubMulticast) GetPubs() iter.Seq2[string, *Client] {
func (p *Multicast) GetPubs() iter.Seq2[string, *Client] {
return p.getClients(ChannelDirectionInput)
}

func (p *PubSubMulticast) GetSubs() iter.Seq2[string, *Client] {
func (p *Multicast) GetSubs() iter.Seq2[string, *Client] {
return p.getClients(ChannelDirectionOutput)
}

func (p *PubSubMulticast) connect(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, direction ChannelDirection, blockWrite bool, replay, keepAlive bool) (error, error) {
func (p *Multicast) connect(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, direction ChannelDirection, blockWrite bool, replay, keepAlive bool) (error, error) {
client := NewClient(ID, rw, direction, blockWrite, replay, keepAlive)

go func() {
Expand All @@ -46,16 +53,16 @@ func (p *PubSubMulticast) connect(ctx context.Context, ID string, rw io.ReadWrit
return p.Connect(client, channels)
}

func (p *PubSubMulticast) Pipe(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, replay bool) (error, error) {
func (p *Multicast) Pipe(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, replay bool) (error, error) {
return p.connect(ctx, ID, rw, channels, ChannelDirectionInputOutput, false, replay, false)
}

func (p *PubSubMulticast) Pub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel) error {
func (p *Multicast) Pub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel) error {
return errors.Join(p.connect(ctx, ID, rw, channels, ChannelDirectionInput, true, false, false))
}

func (p *PubSubMulticast) Sub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, keepAlive bool) error {
func (p *Multicast) Sub(ctx context.Context, ID string, rw io.ReadWriter, channels []*Channel, keepAlive bool) error {
return errors.Join(p.connect(ctx, ID, rw, channels, ChannelDirectionOutput, false, false, keepAlive))
}

var _ PubSub = (*PubSubMulticast)(nil)
var _ = (*Multicast)(nil)
12 changes: 6 additions & 6 deletions multicast_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@ func TestMulticastSubBlock(t *testing.T) {
name := "test-channel"
syncer := make(chan int)

cast := &PubSubMulticast{
cast := &Multicast{
Logger: slog.Default(),
Connector: &BaseConnector{
Broker: &BaseBroker{
Channels: syncmap.New[string, *Channel](),
},
}
Expand Down Expand Up @@ -85,9 +85,9 @@ func TestMulticastPubBlock(t *testing.T) {
name := "test-channel"
syncer := make(chan int)

cast := &PubSubMulticast{
cast := &Multicast{
Logger: slog.Default(),
Connector: &BaseConnector{
Broker: &BaseBroker{
Channels: syncmap.New[string, *Channel](),
},
}
Expand Down Expand Up @@ -131,9 +131,9 @@ func TestMulticastMultSubs(t *testing.T) {
name := "test-channel"
syncer := make(chan int)

cast := &PubSubMulticast{
cast := &Multicast{
Logger: slog.Default(),
Connector: &BaseConnector{
Broker: &BaseBroker{
Channels: syncmap.New[string, *Channel](),
},
}
Expand Down
2 changes: 1 addition & 1 deletion pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
)

type PubSub interface {
Connector
Broker
GetPubs() iter.Seq2[string, *Client]
GetSubs() iter.Seq2[string, *Client]
GetPipes() iter.Seq2[string, *Client]
Expand Down

0 comments on commit ebd8a69

Please sign in to comment.