-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathutils_test.go
83 lines (69 loc) · 2.08 KB
/
utils_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package main
import (
"bytes"
"context"
"fmt"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/waku-org/go-waku/logging"
"github.com/waku-org/go-waku/waku/v2/node"
"github.com/waku-org/go-waku/waku/v2/peers"
"github.com/waku-org/go-waku/waku/v2/protocol/store"
"go.uber.org/zap"
)
func addNodes(ctx context.Context, node *node.WakuNode) {
for _, addr := range nodeList {
ma, err := multiaddr.NewMultiaddr(addr)
if err != nil {
log.Error("invalid multiaddress", zap.Error(err), zap.String("addr", addr))
continue
}
_, err = node.AddPeer(ma, peers.Static, store.StoreID_v20beta4)
if err != nil {
log.Error("could not add peer", zap.Error(err), zap.Stringer("addr", ma))
continue
}
}
}
func queryNode(ctx context.Context, node *node.WakuNode, addr string, pubsubTopic string, contentTopics []string, startTime time.Time, endTime time.Time, envelopeHash []byte) (int, error) {
p, err := multiaddr.NewMultiaddr(addr)
if err != nil {
return -1, err
}
info, err := peer.AddrInfoFromP2pAddr(p)
if err != nil {
return -1, err
}
cnt := 0
cursorIterations := 0
result, err := node.Store().Query(ctx, store.Query{
Topic: pubsubTopic,
ContentTopics: contentTopics,
StartTime: startTime.UnixNano(),
EndTime: endTime.UnixNano(),
}, store.WithPeer(info.ID), store.WithPaging(false, 100), store.WithRequestId([]byte{1, 2, 3, 4, 5, 6, 7, 8}))
if err != nil {
return -1, err
}
for {
hasNext, err := result.Next(ctx)
if err != nil {
return -1, err
}
if !hasNext { // No more messages available
break
}
cursorIterations += 1
// uncomment to find message by ID
for _, m := range result.GetMessages() {
if len(envelopeHash) != 0 && bytes.Equal(m.Hash(pubsubTopic), envelopeHash) {
log.Info("MESSAGE FOUND!", logging.HexString("envelopeHash", envelopeHash), logging.HostID("peerID", info.ID))
return 0, nil
}
}
cnt += len(result.GetMessages())
}
log.Info(fmt.Sprintf("%d messages found in %s (Used cursor %d times)\n", cnt, info.ID, cursorIterations))
return cnt, nil
}