Skip to content

Commit a782d33

Browse files
committed
chore: find with specific criteria
1 parent d86d0d6 commit a782d33

File tree

2 files changed

+34
-97
lines changed

2 files changed

+34
-97
lines changed

main_test.go

+14-30
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"sync"
66
"time"
77

8-
"github.com/stretchr/testify/require"
8+
"github.com/ethereum/go-ethereum/common/hexutil"
99
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
1010
)
1111

@@ -21,47 +21,31 @@ var nodeList = []string{
2121
// If using vscode, go to Preferences > Settings, and edit Go: Test Timeout to at least 60s
2222

2323
func (s *StoreSuite) TestBasic() {
24-
numMsgToSend := 100
24+
// TODO: search criteria
2525
pubsubTopic := relay.DefaultWakuTopic
26-
contentTopic := "test1"
26+
contentTopics := []string{"test1"}
27+
envelopeHash := "0x" // Use "0x" to find all messages that match the pubsub topic, content topic and start/end time
28+
startTime := time.Now().Add(-20 * time.Second)
29+
endTime := time.Now()
30+
31+
// =========================================================
2732

2833
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // Test shouldnt take more than 60s
2934
defer cancel()
3035

31-
// Connecting to nodes
32-
// ================================================================
33-
34-
log.Info("Connecting to nodes...")
35-
36-
connectToNodes(ctx, s.node)
37-
38-
time.Sleep(2 * time.Second) // Required so Identify protocol is executed
39-
40-
s.NotZero(len(s.node.Relay().PubSub().ListPeers(relay.DefaultWakuTopic)), "no peers available")
41-
42-
// Sending messages
43-
// ================================================================
44-
startTime := s.node.Timesource().Now()
45-
46-
// err := sendMessages( to send the msgs sequentially
47-
err := sendMessagesConcurrent(ctx, s.node, numMsgToSend, pubsubTopic, contentTopic)
48-
require.NoError(s.T(), err)
49-
50-
endTime := s.node.Timesource().Now()
51-
52-
// Store
53-
// ================================================================
54-
55-
time.Sleep(5 * time.Second) // Adding a delay to guarantee that messages are inserted (needed with sqlite)
36+
addNodes(ctx, s.node)
37+
hash, err := hexutil.Decode(envelopeHash)
38+
if err != nil {
39+
panic("invalid envelope hash id")
40+
}
5641

5742
wg := sync.WaitGroup{}
5843
for _, addr := range nodeList {
5944
wg.Add(1)
6045
func(addr string) {
6146
defer wg.Done()
62-
cnt, err := queryNode(ctx, s.node, addr, pubsubTopic, contentTopic, startTime, endTime)
47+
_, err := queryNode(ctx, s.node, addr, pubsubTopic, contentTopics, startTime, endTime, hash)
6348
s.NoError(err)
64-
s.Equal(numMsgToSend, cnt)
6549
}(addr)
6650
}
6751
wg.Wait()

utils_test.go

+20-67
Original file line numberDiff line numberDiff line change
@@ -1,92 +1,37 @@
11
package main
22

33
import (
4+
"bytes"
45
"context"
5-
"crypto/rand"
66
"fmt"
7-
"sync"
87
"time"
98

109
"github.com/libp2p/go-libp2p/core/peer"
1110
"github.com/multiformats/go-multiaddr"
11+
"github.com/waku-org/go-waku/logging"
1212
"github.com/waku-org/go-waku/waku/v2/node"
13-
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
13+
"github.com/waku-org/go-waku/waku/v2/peers"
1414
"github.com/waku-org/go-waku/waku/v2/protocol/store"
1515
"go.uber.org/zap"
1616
)
1717

18-
func connectToNodes(ctx context.Context, node *node.WakuNode) {
19-
wg := sync.WaitGroup{}
18+
func addNodes(ctx context.Context, node *node.WakuNode) {
2019
for _, addr := range nodeList {
21-
wg.Add(1)
22-
go func(addr string) {
23-
wg.Done()
24-
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
25-
defer cancel()
26-
err := node.DialPeer(ctx, addr)
27-
if err != nil {
28-
log.Error("could not connect to peer", zap.String("addr", addr), zap.Error(err))
29-
}
30-
}(addr)
31-
}
32-
wg.Wait()
33-
}
34-
35-
func sendMessages(ctx context.Context, node *node.WakuNode, numMsgToSend int, topic string, contentTopic string) error {
36-
for i := 0; i < numMsgToSend; i++ {
37-
payload := make([]byte, 128)
38-
_, err := rand.Read(payload)
20+
ma, err := multiaddr.NewMultiaddr(addr)
3921
if err != nil {
40-
panic(err)
22+
log.Error("invalid multiaddress", zap.Error(err), zap.String("addr", addr))
23+
continue
4124
}
4225

43-
msg := &pb.WakuMessage{
44-
Payload: payload,
45-
Version: 0,
46-
ContentTopic: contentTopic,
47-
Timestamp: node.Timesource().Now().UnixNano(),
48-
}
49-
50-
_, err = node.Relay().Publish(ctx, msg)
26+
_, err = node.AddPeer(ma, peers.Static, store.StoreID_v20beta4)
5127
if err != nil {
52-
panic(err)
28+
log.Error("could not add peer", zap.Error(err), zap.Stringer("addr", ma))
29+
continue
5330
}
54-
time.Sleep(10 * time.Millisecond)
5531
}
56-
return nil
5732
}
5833

59-
func sendMessagesConcurrent(ctx context.Context, node *node.WakuNode, numMsgToSend int, topic string, contentTopic string) error {
60-
wg := sync.WaitGroup{}
61-
for i := 0; i < numMsgToSend; i++ {
62-
wg.Add(1)
63-
go func() {
64-
wg.Done()
65-
payload := make([]byte, 128)
66-
_, err := rand.Read(payload)
67-
if err != nil {
68-
panic(err)
69-
}
70-
71-
msg := &pb.WakuMessage{
72-
Payload: payload,
73-
Version: 0,
74-
ContentTopic: contentTopic,
75-
Timestamp: node.Timesource().Now().UnixNano(),
76-
}
77-
78-
_, err = node.Relay().Publish(ctx, msg)
79-
if err != nil {
80-
panic(err)
81-
}
82-
}()
83-
time.Sleep(10 * time.Millisecond)
84-
}
85-
wg.Wait()
86-
return nil
87-
}
88-
89-
func queryNode(ctx context.Context, node *node.WakuNode, addr string, pubsubTopic string, contentTopic string, startTime time.Time, endTime time.Time) (int, error) {
34+
func queryNode(ctx context.Context, node *node.WakuNode, addr string, pubsubTopic string, contentTopics []string, startTime time.Time, endTime time.Time, envelopeHash []byte) (int, error) {
9035
p, err := multiaddr.NewMultiaddr(addr)
9136
if err != nil {
9237
return -1, err
@@ -102,7 +47,7 @@ func queryNode(ctx context.Context, node *node.WakuNode, addr string, pubsubTopi
10247

10348
result, err := node.Store().Query(ctx, store.Query{
10449
Topic: pubsubTopic,
105-
ContentTopics: []string{contentTopic},
50+
ContentTopics: contentTopics,
10651
StartTime: startTime.UnixNano(),
10752
EndTime: endTime.UnixNano(),
10853
}, store.WithPeer(info.ID), store.WithPaging(false, 100), store.WithRequestId([]byte{1, 2, 3, 4, 5, 6, 7, 8}))
@@ -121,6 +66,14 @@ func queryNode(ctx context.Context, node *node.WakuNode, addr string, pubsubTopi
12166
}
12267
cursorIterations += 1
12368

69+
// uncomment to find message by ID
70+
for _, m := range result.GetMessages() {
71+
if len(envelopeHash) != 0 && bytes.Equal(m.Hash(pubsubTopic), envelopeHash) {
72+
log.Info("MESSAGE FOUND!", logging.HexString("envelopeHash", envelopeHash), logging.HostID("peerID", info.ID))
73+
return 0, nil
74+
}
75+
}
76+
12477
cnt += len(result.GetMessages())
12578
}
12679

0 commit comments

Comments
 (0)