Skip to content

Commit

Permalink
refactor fanin (#88)
Browse files Browse the repository at this point in the history
simplify fanin code: drop tomb package and don't use reflection
  • Loading branch information
jandelgado authored Jun 22, 2024
1 parent 5c9956d commit c13ef2e
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 159 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Install Go
uses: actions/setup-go@v4
uses: actions/setup-go@v5
with:
go-version: ${{ env.GO_VERSION }}
- name: Checkout code
uses: actions/checkout@v4
- name: Run linters
uses: golangci/golangci-lint-action@v3
uses: golangci/golangci-lint-action@v6
with:
version: ${{ env.GO_LANG_CI_LINT_VERSION }}

Expand Down
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
# Changelog for rabtap

## v1.39.2 (2024-06-089
## v1.39.3 (2024-06-22)

* simplify code (fanin) and reduce dependencies

## v1.39.2 (2024-06-08)

* fix consumer incorrectly displayed in info command
* update dependencies & go
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ require (
github.com/stretchr/testify v1.9.0
golang.org/x/net v0.26.0
golang.org/x/sync v0.7.0
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637
)

require github.com/stealthrocket/net v0.2.1
Expand Down
29 changes: 1 addition & 28 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -5,66 +5,39 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815 h1:bWDMxwH3px2JBh6AyO7hdCn/PkvCZXii8TGj7sbtEbQ=
github.com/docopt/docopt-go v0.0.0-20180111231733-ee0de3bc6815/go.mod h1:WwZ+bS3ebgob9U8Nd0kOddGdZWjyMGR8Wziv+TBNwSE=
github.com/fatih/color v1.15.0 h1:kOqh6YHBtK8aywxGerMG2Eq3H6Qgoqeo13Bk2Mv/nBs=
github.com/fatih/color v1.15.0/go.mod h1:0h5ZqXfHYED7Bhv2ZJamyIOUej9KtShiJESRwBDUSsw=
github.com/fatih/color v1.17.0 h1:GlRw1BRJxkpqUCBKzKOw098ed57fEsKeNjpTe3cSjK4=
github.com/fatih/color v1.17.0/go.mod h1:YZ7TlrGPkiz6ku9fK3TLD/pl3CpsiFyu8N92HLgmosI=
github.com/google/uuid v1.4.0 h1:MtMxsa51/r9yyhkyLsVeVt0B+BGQZzpQiTQ4eHZ8bc4=
github.com/google/uuid v1.4.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
github.com/rabbitmq/amqp091-go v1.9.0/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
github.com/sirupsen/logrus v1.9.4-0.20230606125235-dd1b4c2e81af h1:Sp5TG9f7K39yfB+If0vjp97vuT74F72r8hfRpP8jLU0=
github.com/sirupsen/logrus v1.9.4-0.20230606125235-dd1b4c2e81af/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stealthrocket/net v0.2.1 h1:PehPGAAjuV46zaeHGlNgakFV7QDGUAREMcEQsZQ8NLo=
github.com/stealthrocket/net v0.2.1/go.mod h1:VvoFod9pYC9mo+bEg2NQB/D+KVOjxfhZjZ5zyvozq7M=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws=
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 h1:yiW+nvdHb9LVqSHQBXfZCieqV4fzYhNBql77zY0ykqs=
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637/go.mod h1:BHsqpu/nsuzkT5BpiH1EMZPLyqSMM8JbIavyFACoFNk=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
109 changes: 39 additions & 70 deletions pkg/fanin.go
Original file line number Diff line number Diff line change
@@ -1,83 +1,52 @@
package rabtap

import (
"reflect"

tomb "gopkg.in/tomb.v2"
"context"
"sync"
)

// Fanin allows to do a select ("fan-in") on an set of channels
type Fanin struct {
Ch chan interface{}
channels []reflect.SelectCase
t tomb.Tomb
}

// NewFanin creates a new Fanin object
func NewFanin(channels []interface{}) *Fanin {
fanin := Fanin{Ch: make(chan interface{})}
fanin.add(fanin.t.Dying())
for _, c := range channels {
fanin.add(c)
}

fanin.t.Go(fanin.loop)
return &fanin
}

func (s *Fanin) add(c interface{}) {
s.channels = append(s.channels,
reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(c)})
}

// Stop stops the fanin go-routine
func (s *Fanin) Stop() error {
s.t.Kill(nil)
return s.t.Wait()
}

// Alive returns true if the fanin is running
func (s *Fanin) Alive() bool {
return s.t.Alive()
}

// Select wait for activity on any of the channels
func (s *Fanin) loop() error {

for {
chosen, message, ok := reflect.Select(s.channels)

// channels[0] is always the tomb Dying() chan. Request to end fanin.
if chosen == 0 {
close(s.Ch) // note: sends nil message on s.Ch
return nil
// WrapChan takes a channel of any type and returns a new channel of type interface{}
func WrapChan[T any](c <-chan T) <-chan interface{} {
wrapped := make(chan interface{})
go func() {
for m := range c {
wrapped <- m
}
close(wrapped)
}()
return wrapped
}

if !ok {
// The chosen channel has been closed, so zero
// out the channel to disable the case (happens on normal shutdown)
s.channels[chosen].Chan = reflect.ValueOf(nil)
// Fanin selects sumultanously from an array of channels and sends received
// messages to a new channel ("fan-in" of channels)
func Fanin(ctx context.Context, channels []<-chan interface{}) chan interface{} {
var wg sync.WaitGroup
out := make(chan interface{})

// if no channels are left, end the fanin.
active := false
for i := 1; i < len(s.channels); i++ {
if s.channels[i].Chan != reflect.ValueOf(nil) {
active = true
break
}
}
if !active {
close(s.Ch)
return nil
}
} else {
// allow a blocking write to s.Ch to be terminated
receiver := func(ctx context.Context, c <-chan interface{}) {
defer wg.Done()
for {
select {
case s.Ch <- message.Interface():
case <-s.channels[0].Chan.Interface().(<-chan struct{}):
case <-ctx.Done():
return
case val, ok := <-c:
if !ok {
return
}
out <- val
}
}
}

wg.Add(len(channels))
for _, c := range channels {
go receiver(ctx, c)
}

go func() {
wg.Wait()
close(out)
}()

return out
}
66 changes: 19 additions & 47 deletions pkg/fanin_test.go
Original file line number Diff line number Diff line change
@@ -1,75 +1,47 @@
package rabtap

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func expectIntOnChan(t *testing.T, val int, ch <-chan interface{}) {
func expectOnChan[T any](t *testing.T, val T, ch <-chan interface{}) {
t.Helper()
select {
case message := <-ch:
assert.Equal(t, val, message.(int))
case <-time.After(3 * time.Second):
assert.Fail(t, "did not receive message in expected time")
}
}

func expectNilOnChan(t *testing.T, ch <-chan interface{}) {
select {
case message := <-ch:
assert.Nil(t, message)
case <-time.After(3 * time.Second):
assert.Equal(t, val, message.(T))
case <-time.After(1 * time.Second):
assert.Fail(t, "did not receive message in expected time")
}
}

func TestFaninReceivesFromMultipleChannels(t *testing.T) {

// create fanin of 3 int channels
chan1 := make(chan int)
chan2 := make(chan int)
chan3 := make(chan int)
fanin := NewFanin([]interface{}{chan1, chan2, chan3})

assert.True(t, fanin.Alive())

go func() {
chan1 <- 99
chan2 <- 100
chan3 <- 101
}()
chan1 := make(chan int, 1)
defer close(chan1)
chan2 := make(chan string, 1)
defer close(chan2)
fanin := Fanin(context.TODO(), []<-chan interface{}{WrapChan(chan1), WrapChan(chan2)})

expectIntOnChan(t, 99, fanin.Ch)
expectIntOnChan(t, 100, fanin.Ch)
expectIntOnChan(t, 101, fanin.Ch)

// fanin.Stop() closes fanin channel which in turn sends nil message
assert.Nil(t, fanin.Stop())
expectNilOnChan(t, fanin.Ch)

assert.False(t, fanin.Alive())
chan1 <- 99
expectOnChan(t, 99, fanin)
chan2 <- "hello"
expectOnChan(t, "hello", fanin)
}

func TestFaninClosesChanWhenAllInputsAreClosed(t *testing.T) {

chan1 := make(chan int)
chan2 := make(chan int)
fanin := NewFanin([]interface{}{chan1, chan2})

go func() {
chan1 <- 99
chan2 <- 100
close(chan1)
close(chan2)
}()
fanin := Fanin(context.TODO(), []<-chan interface{}{WrapChan(chan1), WrapChan(chan2)})

expectIntOnChan(t, 99, fanin.Ch)
expectIntOnChan(t, 100, fanin.Ch)
close(chan1)
close(chan2)

// close of last channel closes fanin channel which in turn sends nil message
expectNilOnChan(t, fanin.Ch)
_, ok := <-fanin

assert.False(t, fanin.Alive())
assert.False(t, ok)
}
6 changes: 2 additions & 4 deletions pkg/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ type TapChannel chan TapMessage
//
// queueName is the queue to subscribe to. tapCh is where the consumed messages
// are sent to. errCh is the channel where errors are sent to.
//
func (s *AmqpSubscriber) EstablishSubscription(
ctx context.Context,
queueName string,
Expand All @@ -106,9 +105,8 @@ func (s *AmqpSubscriber) createWorkerFunc(

// also subscribe to channel close notifications
amqpErrorCh := session.Channel.NotifyClose(make(chan *amqp.Error, 1))
fanin := NewFanin([]interface{}{ch, amqpErrorCh})

return amqpMessageLoop(ctx, outCh, errOutCh, fanin.Ch)
fanin := Fanin(ctx, []<-chan interface{}{WrapChan(ch), WrapChan(amqpErrorCh)})
return amqpMessageLoop(ctx, outCh, errOutCh, fanin)
}
}

Expand Down
16 changes: 10 additions & 6 deletions pkg/tap.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,21 @@ func (s *AmqpTap) createWorkerFunc(
// also subscribe to channel close notifications
amqpErrorCh := session.Channel.NotifyClose(make(chan *amqp.Error, 1))

fanin := NewFanin(append(tappedChs, amqpErrorCh))
defer func() { _ = fanin.Stop() }()
return amqpMessageLoop(ctx, outCh, errOutCh, fanin.Ch)
chans := []<-chan interface{}{WrapChan(amqpErrorCh)}
for _, c := range tappedChs {
chans = append(chans, WrapChan(c))
}

fanin := Fanin(ctx, chans)
return amqpMessageLoop(ctx, outCh, errOutCh, fanin)
}
}

func (s *AmqpTap) setupTapsForExchanges(
session Session,
exchangeConfigList []ExchangeConfiguration) ([]interface{}, error) {
exchangeConfigList []ExchangeConfiguration) ([]<-chan amqp.Delivery, error) {

var channels []interface{}
var channels []<-chan amqp.Delivery

for _, exchangeConfig := range exchangeConfigList {
exchange, queue, err := s.setupTap(session, exchangeConfig)
Expand Down Expand Up @@ -134,7 +138,7 @@ func (s *AmqpTap) setupTap(session Session,
// and must be set to
// - '#' on topic exchanges
// - a binding-key on direct exchanges (i.e. no wildcards)
// - '' on fanout or headers exchanges
// - on fanout or headers exchanges
// On errors delete prior created exchanges and/or queues to make sure
// that there are no leftovers lying around on the broker.
// TODO error handling must be improved - does not work if connection is lost
Expand Down

0 comments on commit c13ef2e

Please sign in to comment.