diff --git a/main_test.go b/main_test.go index 30e0ae0..133f0ea 100644 --- a/main_test.go +++ b/main_test.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/stretchr/testify/require" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/waku-org/go-waku/waku/v2/protocol/relay" ) @@ -21,47 +21,31 @@ var nodeList = []string{ // If using vscode, go to Preferences > Settings, and edit Go: Test Timeout to at least 60s func (s *StoreSuite) TestBasic() { - numMsgToSend := 100 + // TODO: search criteria pubsubTopic := relay.DefaultWakuTopic - contentTopic := "test1" + contentTopics := []string{"test1"} + envelopeHash := "0x" // Use "0x" to find all messages that match the pubsub topic, content topic and start/end time + startTime := time.Now().Add(-20 * time.Second) + endTime := time.Now() + + // ========================================================= ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // Test shouldnt take more than 60s defer cancel() - // Connecting to nodes - // ================================================================ - - log.Info("Connecting to nodes...") - - connectToNodes(ctx, s.node) - - time.Sleep(2 * time.Second) // Required so Identify protocol is executed - - s.NotZero(len(s.node.Relay().PubSub().ListPeers(relay.DefaultWakuTopic)), "no peers available") - - // Sending messages - // ================================================================ - startTime := s.node.Timesource().Now() - - // err := sendMessages( to send the msgs sequentially - err := sendMessagesConcurrent(ctx, s.node, numMsgToSend, pubsubTopic, contentTopic) - require.NoError(s.T(), err) - - endTime := s.node.Timesource().Now() - - // Store - // ================================================================ - - time.Sleep(5 * time.Second) // Adding a delay to guarantee that messages are inserted (needed with sqlite) + addNodes(ctx, s.node) + hash, err := hexutil.Decode(envelopeHash) + if err != nil { + panic("invalid envelope hash id") + } wg := sync.WaitGroup{} for _, addr := range nodeList { wg.Add(1) func(addr string) { defer wg.Done() - cnt, err := queryNode(ctx, s.node, addr, pubsubTopic, contentTopic, startTime, endTime) + _, err := queryNode(ctx, s.node, addr, pubsubTopic, contentTopics, startTime, endTime, hash) s.NoError(err) - s.Equal(numMsgToSend, cnt) }(addr) } wg.Wait() diff --git a/utils_test.go b/utils_test.go index c8ad43a..b9960d7 100644 --- a/utils_test.go +++ b/utils_test.go @@ -1,92 +1,37 @@ package main import ( + "bytes" "context" - "crypto/rand" "fmt" - "sync" "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" + "github.com/waku-org/go-waku/logging" "github.com/waku-org/go-waku/waku/v2/node" - "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/peers" "github.com/waku-org/go-waku/waku/v2/protocol/store" "go.uber.org/zap" ) -func connectToNodes(ctx context.Context, node *node.WakuNode) { - wg := sync.WaitGroup{} +func addNodes(ctx context.Context, node *node.WakuNode) { for _, addr := range nodeList { - wg.Add(1) - go func(addr string) { - wg.Done() - ctx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - err := node.DialPeer(ctx, addr) - if err != nil { - log.Error("could not connect to peer", zap.String("addr", addr), zap.Error(err)) - } - }(addr) - } - wg.Wait() -} - -func sendMessages(ctx context.Context, node *node.WakuNode, numMsgToSend int, topic string, contentTopic string) error { - for i := 0; i < numMsgToSend; i++ { - payload := make([]byte, 128) - _, err := rand.Read(payload) + ma, err := multiaddr.NewMultiaddr(addr) if err != nil { - panic(err) + log.Error("invalid multiaddress", zap.Error(err), zap.String("addr", addr)) + continue } - msg := &pb.WakuMessage{ - Payload: payload, - Version: 0, - ContentTopic: contentTopic, - Timestamp: node.Timesource().Now().UnixNano(), - } - - _, err = node.Relay().Publish(ctx, msg) + _, err = node.AddPeer(ma, peers.Static, store.StoreID_v20beta4) if err != nil { - panic(err) + log.Error("could not add peer", zap.Error(err), zap.Stringer("addr", ma)) + continue } - time.Sleep(10 * time.Millisecond) } - return nil } -func sendMessagesConcurrent(ctx context.Context, node *node.WakuNode, numMsgToSend int, topic string, contentTopic string) error { - wg := sync.WaitGroup{} - for i := 0; i < numMsgToSend; i++ { - wg.Add(1) - go func() { - wg.Done() - payload := make([]byte, 128) - _, err := rand.Read(payload) - if err != nil { - panic(err) - } - - msg := &pb.WakuMessage{ - Payload: payload, - Version: 0, - ContentTopic: contentTopic, - Timestamp: node.Timesource().Now().UnixNano(), - } - - _, err = node.Relay().Publish(ctx, msg) - if err != nil { - panic(err) - } - }() - time.Sleep(10 * time.Millisecond) - } - wg.Wait() - return nil -} - -func queryNode(ctx context.Context, node *node.WakuNode, addr string, pubsubTopic string, contentTopic string, startTime time.Time, endTime time.Time) (int, error) { +func queryNode(ctx context.Context, node *node.WakuNode, addr string, pubsubTopic string, contentTopics []string, startTime time.Time, endTime time.Time, envelopeHash []byte) (int, error) { p, err := multiaddr.NewMultiaddr(addr) if err != nil { return -1, err @@ -102,7 +47,7 @@ func queryNode(ctx context.Context, node *node.WakuNode, addr string, pubsubTopi result, err := node.Store().Query(ctx, store.Query{ Topic: pubsubTopic, - ContentTopics: []string{contentTopic}, + ContentTopics: contentTopics, StartTime: startTime.UnixNano(), EndTime: endTime.UnixNano(), }, store.WithPeer(info.ID), store.WithPaging(false, 100), store.WithRequestId([]byte{1, 2, 3, 4, 5, 6, 7, 8})) @@ -121,6 +66,14 @@ func queryNode(ctx context.Context, node *node.WakuNode, addr string, pubsubTopi } cursorIterations += 1 + // uncomment to find message by ID + for _, m := range result.GetMessages() { + if len(envelopeHash) != 0 && bytes.Equal(m.Hash(pubsubTopic), envelopeHash) { + log.Info("MESSAGE FOUND!", logging.HexString("envelopeHash", envelopeHash), logging.HostID("peerID", info.ID)) + return 0, nil + } + } + cnt += len(result.GetMessages()) }