Skip to content

Commit 0957ab3

Browse files
committed
query for missing messages
1 parent 552f7e7 commit 0957ab3

File tree

4 files changed

+296
-153
lines changed

4 files changed

+296
-153
lines changed

go/main_test.go

+61-90
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,17 @@
11
package main
22

33
import (
4+
"bufio"
45
"context"
56
"fmt"
7+
"os"
68
"strconv"
7-
"sync"
9+
"strings"
810
"time"
9-
10-
"github.com/stretchr/testify/require"
11-
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
1211
)
1312

1413
var nodeList = []string{
15-
"/dns4/node-01.ac-cn-hongkong-c.status.prod.statusim.net/tcp/30303/p2p/16Uiu2HAkvEZgh3KLwhLwXg95e5ojM8XykJ4Kxi2T7hk22rnA7pJC",
16-
"/dns4/node-01.do-ams3.status.prod.statusim.net/tcp/30303/p2p/16Uiu2HAm6HZZr7aToTvEBPpiys4UxajCTU97zj5v7RNR2gbniy1D",
17-
"/dns4/node-01.gc-us-central1-a.status.prod.statusim.net/tcp/30303/p2p/16Uiu2HAkwBp8T6G77kQXSNMnxgaMky1JeyML5yqoTHRM8dbeCBNb",
18-
"/dns4/node-02.ac-cn-hongkong-c.status.prod.statusim.net/tcp/30303/p2p/16Uiu2HAmFy8BrJhCEmCYrUfBdSNkrPw6VHExtv4rRp1DSBnCPgx8",
19-
"/dns4/node-02.do-ams3.status.prod.statusim.net/tcp/30303/p2p/16Uiu2HAmSve7tR5YZugpskMv2dmJAsMUKmfWYEKRXNUxRaTCnsXV",
20-
"/dns4/node-02.gc-us-central1-a.status.prod.statusim.net/tcp/30303/p2p/16Uiu2HAmDQugwDHM3YeUp86iGjrUvbdw3JPRgikC7YoGBsT2ymMg",
14+
"/dns4/store-01.do-ams3.shards.test.status.im/tcp/30303/p2p/16Uiu2HAmAUdrQ3uwzuE4Gy4D56hX6uLKEeerJAnhKEHZ3DxF1EfT",
2115
}
2216

2317
// If using vscode, go to Preferences > Settings, and edit Go: Test Timeout to at least 60s
@@ -30,13 +24,32 @@ func parseTime(s string) time.Time {
3024
return time.Unix(i, 0)
3125
}
3226

33-
func (s *StoreSuite) TestBasic() {
34-
numMsgToSend := 100
35-
pubsubTopic := relay.DefaultWakuTopic
36-
contentTopic := "test1"
27+
func getValue(param string) string {
28+
x := strings.Split(param, "=")
29+
return x[1]
30+
}
31+
32+
func getIntValue(param string) int64 {
33+
x := strings.Split(param, "=")
34+
35+
num, err := strconv.ParseInt(x[1], 10, 64)
36+
if err != nil {
37+
panic(err)
38+
}
39+
return num
40+
}
3741

38-
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // Test shouldnt take more than 60s
39-
defer cancel()
42+
func getArrValue(param string) []string {
43+
x := strings.ReplaceAll(param, "\"", "")
44+
x = strings.ReplaceAll(param, "[", "")
45+
x = strings.ReplaceAll(x, "]", "")
46+
47+
x2 := strings.Split(x, " ")
48+
return x2
49+
}
50+
51+
func (s *StoreSuite) TestBasic() {
52+
ctx := context.Background()
4053

4154
// Connecting to nodes
4255
// ================================================================
@@ -47,94 +60,52 @@ func (s *StoreSuite) TestBasic() {
4760

4861
time.Sleep(2 * time.Second) // Required so Identify protocol is executed
4962

50-
s.NotZero(len(s.node.Relay().PubSub().ListPeers(relay.DefaultWakuTopic)), "no peers available")
51-
52-
// Sending messages
53-
// ================================================================
54-
startTime := s.node.Timesource().Now()
55-
56-
// err := sendMessages( to send the msgs sequentially
57-
err := sendMessagesConcurrent(ctx, s.node, numMsgToSend, pubsubTopic, contentTopic)
58-
require.NoError(s.T(), err)
63+
// Open the file
64+
file, err := os.Open("missing_messages.txt")
65+
if err != nil {
66+
panic(err)
67+
}
68+
defer file.Close()
5969

60-
endTime := s.node.Timesource().Now()
70+
scanner := bufio.NewScanner(file)
6171

6272
// Store
6373
// ================================================================
6474

65-
time.Sleep(5 * time.Second) // Adding a delay to guarantee that messages are inserted (needed with sqlite)
66-
67-
wg := sync.WaitGroup{}
68-
for _, addr := range nodeList {
69-
wg.Add(1)
70-
func(addr string) {
71-
defer wg.Done()
72-
cnt, err := queryNode(ctx, s.node, addr, pubsubTopic, contentTopic, startTime, endTime)
73-
s.NoError(err)
74-
s.Equal(numMsgToSend, cnt)
75-
}(addr)
76-
}
77-
wg.Wait()
78-
}
75+
// Read each line and process it
7976

80-
func (s *StoreSuite) TestCompareDatabasesPerformance() {
81-
// The next settings might be to be adapted depending on the databases content.
82-
// We seek to pick times windows so that the number of returned rows is ~1000.
83-
expectedNumMsgs := 966
84-
pubsubTopic := "/waku/2/default-waku/proto"
85-
startTime := parseTime("1695992040")
86-
endTime := parseTime("1695992056")
87-
contentTopic := "/waku/2/default-content/proto"
77+
contentTopc := make(map[string]struct{})
8878

89-
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // Test shouldnt take more than 60s
90-
defer cancel()
79+
for scanner.Scan() {
80+
line := scanner.Text()
9181

92-
// Connecting to nodes
93-
// ================================================================
94-
95-
log.Info("Connecting to nodes...")
82+
params := strings.Split(line, ";")
9683

97-
connectToNodes(ctx, s.node)
98-
99-
time.Sleep(2 * time.Second) // Required so Identify protocol is executed
100-
101-
// Store
102-
// ================================================================
84+
// startTime := time.Unix(0, getIntValue(params[1]))
85+
// endTime := time.Unix(0, getIntValue(params[2]))
86+
contentTopics := getArrValue(getValue(params[3]))
87+
// pubsubTopic := getValue(params[4])
88+
for _, x := range contentTopics {
89+
contentTopc[x] = struct{}{}
90+
}
10391

104-
timeSpentMap := make(map[string]time.Duration)
105-
numUsers := int64(10)
92+
// cnt, err := queryNode(ctx, s.node, nodeList[0], pubsubTopic, contentTopics, startTime, endTime)
93+
// if err != nil {
94+
// fmt.Println("COULD NOT QUERY STORENODE: ", err)
95+
// } else {
96+
// fmt.Println(cnt, "MESSAGES FOUND FOR - ", startTime, endTime, (contentTopics), pubsubTopic)
97+
// if cnt != 0 {
98+
// fmt.Println("!!!!!!!!!!!!!!!!")
99+
// }
100+
// }
106101

107-
peers := []string{
108-
// Postgres peer
109-
"/ip4/127.0.0.1/tcp/30303/p2p/16Uiu2HAmJyLCRhiErTRFcW5GKPrpoMjGbbWdFMx4GCUnnhmxeYhd",
110-
// SQLite peer
111-
"/ip4/127.0.0.1/tcp/30304/p2p/16Uiu2HAkxj3WzLiqBximSaHc8wV9Co87GyRGRYLVGsHZrzi3TL5W",
112102
}
113103

114-
wg := sync.WaitGroup{}
115-
for _, addr := range peers {
116-
for userIndex := 0; userIndex < int(numUsers); userIndex++ {
117-
wg.Add(1)
118-
go func(addr string) {
119-
defer wg.Done()
120-
fmt.Println("Querying node", addr)
121-
start := time.Now()
122-
cnt, err := queryNode(ctx, s.node, addr, pubsubTopic, contentTopic, startTime, endTime)
123-
timeSpent := time.Since(start)
124-
fmt.Printf("\n%s took %v\n\n", addr, timeSpent)
125-
s.NoError(err)
126-
s.Equal(expectedNumMsgs, cnt)
127-
timeSpentMap[addr] += timeSpent
128-
}(addr)
129-
}
104+
result := ""
105+
for k := range contentTopc {
106+
result += "\"" + k + "\", "
130107
}
131108

132-
wg.Wait()
133-
134-
timeSpentNanos := timeSpentMap[peers[0]].Nanoseconds() / numUsers
135-
fmt.Println("\n\nAverage time spent: ", peers[0], time.Duration(timeSpentNanos))
109+
fmt.Println(result)
136110

137-
timeSpentNanos = timeSpentMap[peers[1]].Nanoseconds() / numUsers
138-
fmt.Println("\n\nAverage time spent:", peers[1], time.Duration(timeSpentNanos))
139-
fmt.Println("")
140111
}

0 commit comments

Comments
 (0)