Skip to content

Commit 1b89826

Browse files
committed
replaced topic lock by atomic boolean to avoid lock contention
1 parent 60cf380 commit 1b89826

File tree

3 files changed

+18
-26
lines changed

3 files changed

+18
-26
lines changed

.vscode/dryrun.log

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
make --dry-run --always-make --keep-going --print-directory
2+
make: Entering directory '/home/aratz/github.com/aratz-lasa/go-libp2p-pubsub'
3+
make: Leaving directory '/home/aratz/github.com/aratz-lasa/go-libp2p-pubsub'
4+
5+
make: *** No targets specified and no makefile found. Stop.
6+

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@ require (
1313
github.com/libp2p/go-msgio v0.2.0
1414
github.com/multiformats/go-multiaddr v0.5.0
1515
github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee
16+
go.uber.org/atomic v1.7.0
1617
)

topic.go

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
pb "github.com/libp2p/go-libp2p-pubsub/pb"
1212

1313
"github.com/libp2p/go-libp2p-core/peer"
14+
"go.uber.org/atomic"
1415
)
1516

1617
// ErrTopicClosed is returned if a Topic is utilized after it has been closed
@@ -30,8 +31,7 @@ type Topic struct {
3031
evtHandlerMux sync.RWMutex
3132
evtHandlers map[*TopicEventHandler]struct{}
3233

33-
mux sync.RWMutex
34-
closed bool
34+
closed atomic.Bool
3535
}
3636

3737
// String returns the topic associated with t
@@ -47,10 +47,7 @@ func (t *Topic) SetScoreParams(p *TopicScoreParams) error {
4747
return fmt.Errorf("invalid topic score parameters: %w", err)
4848
}
4949

50-
t.mux.Lock()
51-
defer t.mux.Unlock()
52-
53-
if t.closed {
50+
if t.closed.Load() {
5451
return ErrTopicClosed
5552
}
5653

@@ -84,9 +81,7 @@ func (t *Topic) SetScoreParams(p *TopicScoreParams) error {
8481
// EventHandler creates a handle for topic specific events
8582
// Multiple event handlers may be created and will operate independently of each other
8683
func (t *Topic) EventHandler(opts ...TopicEventHandlerOpt) (*TopicEventHandler, error) {
87-
t.mux.RLock()
88-
defer t.mux.RUnlock()
89-
if t.closed {
84+
if t.closed.Load() {
9085
return nil, ErrTopicClosed
9186
}
9287

@@ -141,9 +136,7 @@ func (t *Topic) sendNotification(evt PeerEvent) {
141136
// Note that subscription is not an instantaneous operation. It may take some time
142137
// before the subscription is processed by the pubsub main loop and propagated to our peers.
143138
func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
144-
t.mux.RLock()
145-
defer t.mux.RUnlock()
146-
if t.closed {
139+
if t.closed.Load() {
147140
return nil, ErrTopicClosed
148141
}
149142

@@ -184,9 +177,7 @@ func (t *Topic) Subscribe(opts ...SubOpt) (*Subscription, error) {
184177
// cancel function. Subsequent calls increase the reference counter.
185178
// To completely disable the relay, all references must be cancelled.
186179
func (t *Topic) Relay() (RelayCancelFunc, error) {
187-
t.mux.RLock()
188-
defer t.mux.RUnlock()
189-
if t.closed {
180+
if t.closed.Load() {
190181
return nil, ErrTopicClosed
191182
}
192183

@@ -215,16 +206,14 @@ type ProvideKey func() (crypto.PrivKey, peer.ID)
215206
type PublishOptions struct {
216207
ready RouterReady
217208
customKey ProvideKey
218-
local bool
209+
local bool
219210
}
220211

221212
type PubOpt func(pub *PublishOptions) error
222213

223214
// Publish publishes data to topic.
224215
func (t *Topic) Publish(ctx context.Context, data []byte, opts ...PubOpt) error {
225-
t.mux.RLock()
226-
defer t.mux.RUnlock()
227-
if t.closed {
216+
if t.closed.Load() {
228217
return ErrTopicClosed
229218
}
230219

@@ -347,9 +336,7 @@ func WithSecretKeyAndPeerId(key crypto.PrivKey, pid peer.ID) PubOpt {
347336
// Close closes down the topic. Will return an error unless there are no active event handlers or subscriptions.
348337
// Does not error if the topic is already closed.
349338
func (t *Topic) Close() error {
350-
t.mux.Lock()
351-
defer t.mux.Unlock()
352-
if t.closed {
339+
if t.closed.Load() {
353340
return nil
354341
}
355342

@@ -364,17 +351,15 @@ func (t *Topic) Close() error {
364351
err := <-req.resp
365352

366353
if err == nil {
367-
t.closed = true
354+
t.closed.Swap(true)
368355
}
369356

370357
return err
371358
}
372359

373360
// ListPeers returns a list of peers we are connected to in the given topic.
374361
func (t *Topic) ListPeers() []peer.ID {
375-
t.mux.RLock()
376-
defer t.mux.RUnlock()
377-
if t.closed {
362+
if t.closed.Load() {
378363
return []peer.ID{}
379364
}
380365

0 commit comments

Comments
 (0)