Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 36 additions & 4 deletions p2p/simulations/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,10 +234,11 @@ func (t *testService) Snapshot() ([]byte, error) {
// * get and increment a counter
// * subscribe to counter increment events
type TestAPI struct {
state *atomic.Value
peerCount *int64
counter int64
feed event.Feed
state *atomic.Value
peerCount *int64
counter int64
activeSubscriptions int64
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you want sync.WaitGroup here. Also maybe just subCount, no need to be overly verbose.

feed event.Feed
}

func (t *TestAPI) PeerCount() int64 {
Expand Down Expand Up @@ -273,14 +274,17 @@ func (t *TestAPI) Events(ctx context.Context) (*rpc.Subscription, error) {
events := make(chan int64)
sub := t.feed.Subscribe(events)
defer sub.Unsubscribe()
atomic.AddInt64(&t.activeSubscriptions, 1)

for {
select {
case event := <-events:
notifier.Notify(rpcSub.ID, event)
case <-sub.Err():
atomic.AddInt64(&t.activeSubscriptions, -1)
return
case <-rpcSub.Err():
atomic.AddInt64(&t.activeSubscriptions, -1)
return
}
}
Expand All @@ -289,6 +293,10 @@ func (t *TestAPI) Events(ctx context.Context) (*rpc.Subscription, error) {
return rpcSub, nil
}

func (t *TestAPI) GetNumActiveSubscriptions() int64 {
return atomic.LoadInt64(&t.activeSubscriptions)
}

var testServices = adapters.LifecycleConstructors{
"test": newTestService,
}
Expand Down Expand Up @@ -557,6 +565,14 @@ func TestHTTPNodeRPC(t *testing.T) {
t.Fatalf("error getting node RPC client: %s", err)
}

// get the number of subscriptions before subscribing to know what number to
// expect once it becomes active
var expectedActiveSubscriptions int64
if err := rpcClient1.CallContext(ctx, &expectedActiveSubscriptions, "test_getNumActiveSubscriptions"); err != nil {
t.Fatalf("error calling RPC method: %s", err)
}
expectedActiveSubscriptions += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you not able to know statically the expected number of active subs? (1) ?


// subscribe to events using client 1
events := make(chan int64, 1)
sub, err := rpcClient1.Subscribe(ctx, "test", events, "events")
Expand All @@ -565,6 +581,22 @@ func TestHTTPNodeRPC(t *testing.T) {
}
defer sub.Unsubscribe()

// make sure the subscription becomes active
var numActiveSubscriptions int64
for i := 0; i < 3; i++ {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should do a for and select loop with a timeout here. You're basically saying "timeout after 300 milliseconds" here, but in a more complicated way.

err := rpcClient1.CallContext(ctx, &numActiveSubscriptions, "test_getNumActiveSubscriptions")
if err != nil {
t.Fatalf("error calling RPC method: %s", err)
}
if numActiveSubscriptions > 0 {
break
}
time.Sleep(100 * time.Millisecond)
}
if numActiveSubscriptions != expectedActiveSubscriptions {
t.Fatalf("subscription never became active")
}

// call some RPC methods using client 2
if err := rpcClient2.CallContext(ctx, nil, "test_add", 10); err != nil {
t.Fatalf("error calling RPC method: %s", err)
Expand Down