diff --git a/README.md b/README.md index ffbaad04..def16dba 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ [![Based on TON][ton-svg]][ton] [![Telegram Channel][tgc-svg]][tg-channel] -![Coverage](https://img.shields.io/badge/Coverage-70.4%25-brightgreen) +![Coverage](https://img.shields.io/badge/Coverage-70.1%25-brightgreen) Golang library for interacting with TON blockchain. diff --git a/adnl/gateway.go b/adnl/gateway.go index fe72435d..e7e07b1d 100644 --- a/adnl/gateway.go +++ b/adnl/gateway.go @@ -123,12 +123,23 @@ func NewGatewayWithNetManager(key ed25519.PrivateKey, reader NetManager) *Gatewa } var PacketsBufferSize = 128 * 1024 +var DefaultUDPBufferSize = 32 << 20 var DefaultListener = func(addr string) (net.PacketConn, error) { lp, err := net.ListenPacket("udp", addr) if err != nil { return nil, err } + + if conn, ok := lp.(*net.UDPConn); ok { + if err := conn.SetReadBuffer(DefaultUDPBufferSize); err != nil { + Logger("[ADNL] failed to set read buffer:", err) + } + if err := conn.SetWriteBuffer(DefaultUDPBufferSize); err != nil { + Logger("[ADNL] failed to set write buffer:", err) + } + } + return NewSyncConn(lp, PacketsBufferSize), nil } diff --git a/adnl/rldp/bbr2.go b/adnl/rldp/bbr2.go index c4d7c35a..490cc14f 100644 --- a/adnl/rldp/bbr2.go +++ b/adnl/rldp/bbr2.go @@ -160,7 +160,8 @@ func NewBBRv2Controller(l *TokenBucket, o BBRv2Options) *BBRv2Controller { } c.btlbw.Store(start) c.pacingRate.Store(start) - c.inflight.Store(rateToInflight(start, c.minRTT.Load())) + initialInflight := rateToInflight(start, c.minRTT.Load()) + c.applyInflightLimit(initialInflight) c.hiInflight.Store(c.inflight.Load()) c.loInflight.Store(0) @@ -201,6 +202,28 @@ func (c *BBRv2Controller) SetAppLimited(v bool) { c.appLimited.Store(v) } func (c *BBRv2Controller) markActive() { c.lastActive.Store(nowMs()) } +func (c *BBRv2Controller) applyInflightLimit(inflight int64) { + if inflight < 0 { + inflight = 0 + } + c.inflight.Store(inflight) + + burst := inflight + inflight/4 + const minBurst = 32 * 1024 + if burst < minBurst { + burst = minBurst + } + + if maxRate := c.opts.MaxRate; maxRate > 0 { + maxBurst := rateToInflight(maxRate, max64(c.minRTT.Load(), 1)) + if maxBurst > 0 && burst > maxBurst { + burst = maxBurst + } + } + + c.limiter.SetCapacityBytes(burst) +} + func (c *BBRv2Controller) OnNewSendBurst() { if c.appLimited.Swap(false) { c.fullBW.Store(0) @@ -236,7 +259,7 @@ func (c *BBRv2Controller) ObserveRTT(rttMs int64) { c.lastRTT.Store(rttMs) if btl := c.btlbw.Load(); btl > 0 { - c.inflight.Store(rateToInflight(btl, c.minRTT.Load())) + c.applyInflightLimit(rateToInflight(btl, c.minRTT.Load())) } } @@ -325,6 +348,7 @@ func (c *BBRv2Controller) resetForNewFlow(now int64) { if model <= 0 { model = rateToInflight(c.opts.MinRate, max64(c.minRTT.Load(), 1)) } + c.applyInflightLimit(model) c.hiInflight.Store(max64(model+model/4, 2*1500)) c.lastAckTs.Store(now) c.appLimited.Store(false) @@ -337,15 +361,12 @@ func (c *BBRv2Controller) primeTokenBucketForRate(now int64, rate int64) { } rtt := max64(c.minRTT.Load(), 1) bdp := rateToInflight(rate, rtt) - burst := max64(64*1024, min64(8*bdp, 4*1024*1024)) - type tbIface interface { - SetBurst(int64) - AddTokens(int64) - } - if tb, ok := any(c.limiter).(tbIface); ok { - tb.SetBurst(burst) - tb.AddTokens(min64(bdp, burst)) + target := bdp + if cur := c.inflight.Load(); cur > target { + target = cur } + c.applyInflightLimit(target) + c.limiter.AddTokens(min64(bdp, target)) } func humanBps(bps int64) string { @@ -445,7 +466,10 @@ func (c *BBRv2Controller) InflightAllowance(currentBytes int64) int64 { } func (c *BBRv2Controller) CurrentMinRTT() int64 { - return c.minRTT.Load() + if !c.minRTTProvisional.Load() { + return c.minRTT.Load() + } + return -1 } func (c *BBRv2Controller) CurrentRTT() int64 { @@ -479,7 +503,7 @@ func (c *BBRv2Controller) updateModelAndRate(now int64) float64 { if inflight <= 0 { inflight = 2 * 1500 // at least two MSS-equivalents } - c.inflight.Store(inflight) + c.applyInflightLimit(inflight) // Losses in the last window → decide whether to lower inflight_hi var lossRate float64 diff --git a/adnl/rldp/bucket.go b/adnl/rldp/bucket.go index 61a7b9b0..462d778c 100644 --- a/adnl/rldp/bucket.go +++ b/adnl/rldp/bucket.go @@ -1,6 +1,7 @@ package rldp import ( + "math" "sync/atomic" "time" ) @@ -36,7 +37,18 @@ func (tb *TokenBucket) SetCapacityBytes(burstBytes int64) { if burstBytes < 0 { burstBytes = 0 } - atomic.StoreInt64(&tb.capacity, burstBytes*1000) + capMicro := toMicroBytes(burstBytes) + atomic.StoreInt64(&tb.capacity, capMicro) + + for { + curr := atomic.LoadInt64(&tb.tokens) + if curr <= capMicro { + break + } + if atomic.CompareAndSwapInt64(&tb.tokens, curr, capMicro) { + break + } + } } func (tb *TokenBucket) SetRate(bps int64) { @@ -124,6 +136,33 @@ func (tb *TokenBucket) ConsumePackets(maxPackets, partSize int) int { return gotBytes / partSize } +// SetBurst implements the interface used by the BBR controller for seeding +// the token bucket with a specific burst size (in bytes). +func (tb *TokenBucket) SetBurst(burstBytes int64) { + tb.SetCapacityBytes(burstBytes) +} + +// AddTokens adds up to n bytes worth of tokens into the bucket. +// The total amount of tokens is always capped at the configured capacity. +func (tb *TokenBucket) AddTokens(n int64) { + if n <= 0 { + return + } + + add := toMicroBytes(n) + for { + curr := atomic.LoadInt64(&tb.tokens) + cp := atomic.LoadInt64(&tb.capacity) + newVal := curr + add + if cp > 0 && newVal > cp { + newVal = cp + } + if atomic.CompareAndSwapInt64(&tb.tokens, curr, newVal) { + return + } + } +} + func (tb *TokenBucket) TryConsumeBytes(n int) bool { return tb.ConsumeUpTo(n) == n } @@ -134,3 +173,13 @@ func abs64(x int64) int64 { } return x } + +func toMicroBytes(v int64) int64 { + if v >= math.MaxInt64/1000 { + return math.MaxInt64 + } + if v <= math.MinInt64/1000 { + return math.MinInt64 + } + return v * 1000 +} diff --git a/adnl/rldp/client.go b/adnl/rldp/client.go index 28c41ee2..da82640f 100644 --- a/adnl/rldp/client.go +++ b/adnl/rldp/client.go @@ -33,7 +33,7 @@ type ADNL interface { var Logger = func(a ...any) {} -var PartSize = uint32(256 << 10) +var PartSize = uint32(256<<10) + 1024 var MultiFECMode = false // TODO: activate after some versions var RoundRobinFECLimit = 50 * DefaultSymbolSize @@ -58,6 +58,7 @@ type activeTransferPart struct { nextRecoverDelay int64 fastSeqnoTill uint32 + startedAt time.Time recoveryReady atomic.Bool lossEWMA atomic.Uint64 drrDeficit int64 @@ -134,15 +135,20 @@ type decoderStreamPart struct { maxSeqno uint32 receivedMask uint32 receivedNum uint32 + receivedFastNum uint32 receivedNumConfirmed uint32 + recvOrder []uint32 + lastConfirmAt time.Time lastCompleteAt time.Time + startedAt time.Time } type decoderStream struct { finishedAt *time.Time lastMessageAt time.Time + startedAt time.Time currentPart decoderStreamPart /// messages chan *MessagePart @@ -275,25 +281,27 @@ func (r *RLDP) handleMessage(msg *adnl.MessageCustom) error { return fmt.Errorf("too big transfer size %d, max allowed %d", m.TotalSize, maxTransferSize) } - qsz := int(m.FecType.GetSymbolsCount()) + 3 + qsz := int(m.FecType.GetSymbolsCount()) + 32 if qsz > 1024 { qsz = 1024 } - stream = &decoderStream{ - lastMessageAt: tm, - msgBuf: NewQueue(qsz), - currentPart: decoderStreamPart{ - index: 0, - }, - totalSize: m.TotalSize, - } - r.mx.Lock() // check again because of possible concurrency if r.recvStreams[id] != nil { stream = r.recvStreams[id] } else { + stream = &decoderStream{ + lastMessageAt: tm, + startedAt: tm, + msgBuf: NewQueue(qsz), + currentPart: decoderStreamPart{ + index: 0, + startedAt: tm, + }, + totalSize: m.TotalSize, + } + r.recvStreams[id] = stream } r.mx.Unlock() @@ -391,6 +399,10 @@ func (r *RLDP) handleMessage(msg *adnl.MessageCustom) error { stream.lastMessageAt = tm stream.currentPart.receivedNum++ + if part.Seqno < stream.currentPart.fecSymbolsCount { + stream.currentPart.receivedFastNum++ + } + stream.currentPart.recvOrder = append(stream.currentPart.recvOrder, part.Seqno) if canTryDecode { tmd := time.Now() @@ -401,10 +413,13 @@ func (r *RLDP) handleMessage(msg *adnl.MessageCustom) error { // it may not be decoded due to an unsolvable math system, it means we need more symbols if decoded { - Logger("[RLDP] v2:", isV2, "part", part.Part, "decoded on seqno", part.Seqno, "symbols:", stream.currentPart.fecSymbolsCount, "decode took", time.Since(tmd).String()) + took := tmd.Sub(stream.currentPart.startedAt) + Logger("[RLDP] v2:", isV2, "part", part.Part, "part sz", stream.currentPart.fecDataSize, "decoded on seqno", part.Seqno, "symbols:", stream.currentPart.fecSymbolsCount, "received:", stream.currentPart.receivedNum, "fast", stream.currentPart.receivedFastNum, + "decode took", time.Since(tmd).String(), "took", took.String()) stream.currentPart = decoderStreamPart{ - index: stream.currentPart.index + 1, + index: stream.currentPart.index + 1, + startedAt: tmd, } if len(data) > 0 { @@ -756,9 +771,7 @@ func (r *RLDP) recoverySender() { continue } - if atomic.LoadUint32(&part.seqno) < part.fastSeqnoTill || - ms-part.lastRecoverAt > part.nextRecoverDelay || - part.lastRecoverAt < atomic.LoadInt64(&part.lastConfirmAt) { + if ms-part.lastRecoverAt > part.nextRecoverDelay { transfersToProcess = append(transfersToProcess, part) } } @@ -798,27 +811,51 @@ func (r *RLDP) recoverySender() { for i := 0; i < n; i++ { idx := (start + i) % n part := transfersToProcess[idx] + ms = time.Now().UnixMilli() seqno := atomic.LoadUint32(&part.seqno) - quantum := int64(1) - if sc := part.fecSymbolsCount / 100; sc > 1 { - quantum = int64(sc) + // can be -1 when unknown + rtt := r.rateCtrl.CurrentMinRTT() + if rtt >= 0 && rtt < 8 { + rtt = 8 + } + + var delay = int64(5) + if rtt > 0 { + delay = rtt / 4 } + quantum := int64(1) + if seqno < part.fastSeqnoTill { fastDiff := int64(part.fastSeqnoTill - seqno) if fastDiff > quantum { quantum = fastDiff } - } + } else if rtt > 0 { + perFrame := int64(part.fecSymbolsCount / 4) + if perFrame < 2 { + perFrame = 2 + } - /*part.drrDeficit += quantum - if part.drrDeficit <= 0 { - continue - }*/ + prevTm := int64(0) + if part.lastRecoverAt > 0 { + prevTm = part.lastRecoverAt - part.startedAt.UnixMilli() + } - ms = time.Now().UnixMilli() + tmOfFrame := ms - part.startedAt.UnixMilli() + amt := float64(perFrame) * (float64(tmOfFrame-prevTm) / float64(rtt)) + quantum = int64(amt) + if quantum > perFrame/2 { + quantum = perFrame / 2 + } + + // smooth recovery to decrease bursts + delay = 0 + } else if sc := part.fecSymbolsCount / 100; sc > 1 { + quantum = int64(sc) + } requested := int(quantum) consumed := r.rateLimit.ConsumePackets(requested, int(part.fecSymbolSize)) @@ -853,37 +890,11 @@ func (r *RLDP) recoverySender() { } if seqno > prevSeqno { - /*sent := int64(seqno - prevSeqno) - part.drrDeficit -= sent - if part.drrDeficit < 0 { - part.drrDeficit = 0 - }*/ - - base := r.rateCtrl.CurrentMinRTT() - if base <= 0 { - base = r.rateCtrl.opts.DefaultRTTMs - if base <= 0 { - base = 25 - } - } - - if seqno > part.fastSeqnoTill { - minGap := max64(8, base/4) - maxGap := max64(20, base/2) - - if consumed > 0 { - part.nextRecoverDelay = minGap - } else { - part.nextRecoverDelay = maxGap - } - } else { - part.nextRecoverDelay = 0 - } - + part.nextRecoverDelay = delay part.lastRecoverAt = ms lastServedIdx = idx + atomic.StoreUint32(&part.seqno, seqno) } - atomic.StoreUint32(&part.seqno, seqno) if consumed < requested { drained = true @@ -1043,16 +1054,14 @@ func (t *activeTransfer) prepareNextPart() (bool, error) { fec: fec, fecSymbolsCount: fec.GetSymbolsCount(), fecSymbolSize: fec.GetSymbolSize(), - nextRecoverDelay: 5, - fastSeqnoTill: cnt + cnt/20 + 1, // +5% + nextRecoverDelay: 4, + fastSeqnoTill: cnt + cnt/33 + 1, // +3% transfer: t, } - pt := uint32(1) << uint32(math.Ceil(math.Log2(float64(part.fecSymbolsCount)*1.2)+50)) + pt := uint32(1) << uint32(math.Ceil(math.Log2(float64(part.fecSymbolsCount)*3+50))) if pt > 16<<10 { pt = 16 << 10 - } else if pt < 64 { - pt = 64 } part.sendClock = NewSendClock(int(pt)) @@ -1108,6 +1117,7 @@ func (r *RLDP) sendFastSymbols(ctx context.Context, transfer *activeTransfer) er atomic.StoreUint32(&part.seqno, seqno) part.recoveryReady.Store(true) + part.startedAt = time.Now() select { case r.activateRecoverySender <- true: @@ -1126,6 +1136,12 @@ func (r *RLDP) DoQuery(ctx context.Context, maxAnswerSize uint64, query, result res := make(chan AsyncQueryResult, 1) + if _, ok := ctx.Deadline(); !ok { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, 15*time.Second) + defer cancel() + } + if err = r.DoQueryAsync(ctx, maxAnswerSize, qid, query, res); err != nil { return fmt.Errorf("failed to do query: %w", err) } diff --git a/adnl/rldp/client_test.go b/adnl/rldp/client_test.go index f0c7bcc4..3111cdbf 100644 --- a/adnl/rldp/client_test.go +++ b/adnl/rldp/client_test.go @@ -825,9 +825,9 @@ func BenchmarkRLDP_ClientServer(b *testing.B) { { // it requires some time to speedup by bbr, so will show a low rate name: "netem_loss_raptorq", - sizes: []uint32{4 << 20}, + sizes: []uint32{256 << 10}, setup: func(tb *testing.B) (*RLDP, func()) { - return setupNetemBenchmark(tb, 0.02, 50*time.Millisecond, 5*time.Millisecond) + return setupNetemBenchmark(tb, 0.01, 50*time.Millisecond, 0*time.Millisecond) }, withParallel: true, }, @@ -846,14 +846,26 @@ func BenchmarkRLDP_ClientServer(b *testing.B) { func runRLDPBenchSizes(b *testing.B, client *RLDP, sizes []uint32, withParallel bool) { for _, sz := range sizes { b.Run(fmt.Sprintf("resp=%dKB", sz>>10), func(b *testing.B) { + var resp benchResponse + ctx, cancel := context.WithTimeout(context.Background(), 7*time.Second) + err := client.DoQuery(ctx, 1<<30, benchRequest{ + WantLen: sz, + }, &resp) + cancel() + if err != nil { + b.Fatalf("client exec err: %v", err) + } + b.SetBytes(int64(sz)) b.ResetTimer() for i := 0; i < b.N; i++ { - var resp benchResponse - if err := client.DoQuery(context.Background(), 1<<30, benchRequest{ + ctx, cancel = context.WithTimeout(context.Background(), 7*time.Second) + err = client.DoQuery(ctx, 1<<30, benchRequest{ WantLen: sz, - }, &resp); err != nil { + }, &resp) + cancel() + if err != nil { b.Fatalf("client exec err: %v", err) } } @@ -861,6 +873,16 @@ func runRLDPBenchSizes(b *testing.B, client *RLDP, sizes []uint32, withParallel if withParallel { b.Run(fmt.Sprintf("resp=%dKB/parallel", sz>>10), func(b *testing.B) { + var resp benchResponse + ctx, cancel := context.WithTimeout(context.Background(), 7*time.Second) + err := client.DoQuery(ctx, 1<<30, benchRequest{ + WantLen: sz, + }, &resp) + cancel() + if err != nil { + b.Fatalf("client exec err: %v", err) + } + b.SetBytes(int64(sz)) b.SetParallelism(runtime.NumCPU()) @@ -868,9 +890,12 @@ func runRLDPBenchSizes(b *testing.B, client *RLDP, sizes []uint32, withParallel b.RunParallel(func(pb *testing.PB) { for pb.Next() { var resp benchResponse - if err := client.DoQuery(context.Background(), 1<<30, benchRequest{ + ctx, cancel := context.WithTimeout(context.Background(), 7*time.Second) + err := client.DoQuery(ctx, 1<<30, benchRequest{ WantLen: sz, - }, &resp); err != nil { + }, &resp) + cancel() + if err != nil { b.Fatalf("client exec err: %v", err) } } diff --git a/example/rldp-test/main.go b/example/rldp-test/main.go new file mode 100644 index 00000000..e5bd8cac --- /dev/null +++ b/example/rldp-test/main.go @@ -0,0 +1,370 @@ +package main + +import ( + "context" + "crypto/ed25519" + "crypto/sha256" + "encoding/hex" + "errors" + "flag" + "fmt" + "log" + "math" + "net" + "os/signal" + "sync" + "sync/atomic" + "syscall" + "time" + + "github.com/xssnick/tonutils-go/adnl" + "github.com/xssnick/tonutils-go/adnl/address" + "github.com/xssnick/tonutils-go/adnl/dht" + "github.com/xssnick/tonutils-go/adnl/keys" + "github.com/xssnick/tonutils-go/adnl/rldp" + "github.com/xssnick/tonutils-go/tl" +) + +const ( + defaultConfigURL = "https://ton-blockchain.github.io/global.config.json" + defaultListenAddr = "0.0.0.0:30000" + defaultParallel = 4 + defaultPayloadSize = 256 << 10 // 1 MiB + dhtTTL = 10 * time.Minute +) + +type speedRequest struct { + Size uint64 `tl:"long"` +} + +type speedResponse struct { + Payload []byte `tl:"bytes"` +} + +func init() { + tl.Register(speedRequest{}, "rldp.speedTestRequest size:long = rldp.speedTestRequest") + tl.Register(speedResponse{}, "rldp.speedTestResponse payload:bytes = rldp.speedTestResponse") +} + +func main() { + configURL := flag.String("config", defaultConfigURL, "URL to global config for DHT") + adnlIDHex := flag.String("adnl", "", "remote ADNL ID (hex), if empty runs in server mode") + listenAddr := flag.String("listen", defaultListenAddr, "listen address for server mode") + publicIP := flag.String("public", "", "public IPv4 address to publish in DHT (server mode)") + payload := flag.Uint64("payload", defaultPayloadSize, "payload size in bytes for each request (client mode)") + parallel := flag.Int("parallel", defaultParallel, "number of parallel queries (client mode)") + timeout := flag.Duration("timeout", 30*time.Second, "per-request timeout") + flag.Parse() + + if *adnlIDHex == "" { + runServer(*listenAddr, *publicIP, *configURL) + return + } + + runClient(*adnlIDHex, *payload, *parallel, *timeout, *configURL) +} + +func runServer(listenAddr, publicIP, configURL string) { + log.Println("Starting RLDP speed test server") + sd := sha256.Sum256([]byte(listenAddr + publicIP)) + + priv := ed25519.NewKeyFromSeed(sd[:]) + var err error + + gateway := adnl.NewGateway(priv) + defer func() { _ = gateway.Close() }() + if err = gateway.StartServer(listenAddr); err != nil { + log.Fatalf("failed to start ADNL server: %v", err) + } + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + var bytesSent atomic.Uint64 + + gateway.SetConnectionHandler(func(peer adnl.Peer) error { + conn := rldp.NewClientV2(peer) + conn.SetOnQuery(func(transferID []byte, query *rldp.Query) error { + req, ok := query.Data.(speedRequest) + if !ok { + return fmt.Errorf("unexpected query type %T", query.Data) + } + + respSize := req.Size + if respSize > query.MaxAnswerSize { + respSize = query.MaxAnswerSize + } + + if respSize > uint64(math.MaxInt) { + return fmt.Errorf("requested size is too large: %d", respSize) + } + + payload := make([]byte, int(respSize)) + + sendCtx, cancel := context.WithTimeout(ctx, 2*time.Minute) + defer cancel() + + err := conn.SendAnswer(sendCtx, query.MaxAnswerSize, query.Timeout, query.ID, transferID, speedResponse{Payload: payload}) + if err != nil { + return fmt.Errorf("failed to send answer: %w", err) + } + + bytesSent.Add(respSize) + return nil + }) + return nil + }) + + go reportServerSpeed(ctx, &bytesSent) + + adnlID, err := tl.Hash(keys.PublicKeyED25519{Key: priv.Public().(ed25519.PublicKey)}) + if err != nil { + log.Fatalf("failed to compute ADNL ID: %v", err) + } + log.Printf("Server ADNL ID: %s", hex.EncodeToString(adnlID)) + + if err := publishAddress(ctx, gateway, priv, listenAddr, publicIP, configURL); err != nil { + log.Printf("failed to publish address in DHT: %v", err) + } + + <-ctx.Done() + log.Println("Shutting down server") +} + +func publishAddress(ctx context.Context, gateway *adnl.Gateway, priv ed25519.PrivateKey, listenAddr, publicIP, configURL string) error { + host, portStr, err := net.SplitHostPort(listenAddr) + if err != nil { + return fmt.Errorf("invalid listen address: %w", err) + } + + if publicIP == "" { + if host != "" && host != "0.0.0.0" && host != "::" { + publicIP = host + } else { + return errors.New("public IP is required to publish address in DHT") + } + } + + ip := net.ParseIP(publicIP) + if ip == nil { + return fmt.Errorf("invalid public IP: %s", publicIP) + } + if ip.To4() == nil { + return fmt.Errorf("only IPv4 is supported for publication: %s", publicIP) + } + + port, err := net.LookupPort("udp", portStr) + if err != nil { + return fmt.Errorf("invalid listen port: %w", err) + } + + gateway.SetAddressList([]*address.UDP{{ + IP: ip, + Port: int32(port), + }}) + + log.Println("Starting DHT client for publication") + dhtClient, err := dht.NewClientFromConfigUrl(ctx, gateway, configURL) + if err != nil { + return fmt.Errorf("failed to create DHT client: %w", err) + } + + go func() { + defer dhtClient.Close() + ticker := time.NewTicker(dhtTTL / 2) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + default: + } + + _, _, err := dhtClient.StoreAddress(ctx, gateway.GetAddressList(), dhtTTL, priv, 5) + if err != nil { + log.Printf("DHT publish failed: %v", err) + } else { + log.Println("Address published to DHT") + } + + select { + case <-ctx.Done(): + return + case <-ticker.C: + } + } + }() + + return nil +} + +func reportServerSpeed(ctx context.Context, total *atomic.Uint64) { + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + + var last uint64 + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + current := total.Load() + diff := current - last + last = current + mbps := float64(diff*8) / 1e6 + mib := float64(diff) / (1024 * 1024) + log.Printf("Server throughput: %.2f MiB/s (%.2f Mbit/s)", mib, mbps) + } + } +} + +func runClient(adnlIDHex string, payload uint64, parallel int, timeout time.Duration, configURL string) { + if parallel <= 0 { + log.Fatalf("parallel must be positive") + } + if payload == 0 { + log.Fatalf("payload must be positive") + } + + adnlID, err := hex.DecodeString(adnlIDHex) + if err != nil || len(adnlID) != 32 { + log.Fatalf("invalid ADNL ID: %v", err) + } + + log.Println("Starting RLDP speed test client") + + _, priv, err := ed25519.GenerateKey(nil) + if err != nil { + log.Fatalf("failed to generate key: %v", err) + } + + gateway := adnl.NewGateway(priv) + defer func() { _ = gateway.Close() }() + if err = gateway.StartClient(); err != nil { + log.Fatalf("failed to start ADNL client: %v", err) + } + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer cancel() + + dhtClient, err := dht.NewClientFromConfigUrl(ctx, gateway, configURL) + if err != nil { + log.Fatalf("failed to create DHT client: %v", err) + } + defer dhtClient.Close() + + addrList, pubKey, err := dhtClient.FindAddresses(ctx, adnlID) + if err != nil { + log.Fatalf("failed to resolve address: %v", err) + } + + if len(addrList.Addresses) == 0 { + log.Fatalf("no addresses found for %s", adnlIDHex) + } + + var peer adnl.Peer + for _, addr := range addrList.Addresses { + remote := addr.IP.String() + ":" + fmt.Sprint(addr.Port) + log.Printf("Trying address %s", remote) + peer, err = gateway.RegisterClient(remote, pubKey) + if err != nil { + log.Printf("failed to register peer %s: %v", remote, err) + continue + } + break + } + + if peer == nil { + log.Fatalf("failed to connect to any resolved address") + } + + defer peer.Close() + + conn := rldp.NewClientV2(peer) + + var totalReceived atomic.Uint64 + bytesCh := make(chan uint64, parallel*4) + var wg sync.WaitGroup + for i := 0; i < parallel; i++ { + wg.Add(1) + go func() { + defer wg.Done() + worker(ctx, conn, payload, timeout, bytesCh) + }() + } + + go func() { + wg.Wait() + close(bytesCh) + }() + + go aggregateClientSpeed(ctx, bytesCh, &totalReceived) + + <-ctx.Done() + log.Println("Client shutting down") +} + +func worker(ctx context.Context, conn *rldp.RLDP, payload uint64, timeout time.Duration, bytesCh chan<- uint64) { + for { + select { + case <-ctx.Done(): + return + default: + } + + var resp speedResponse + req := speedRequest{Size: payload} + + maxAnswer := payload + 4096 + if maxAnswer < payload { + maxAnswer = ^uint64(0) + } + + reqCtx, cancel := context.WithTimeout(ctx, timeout) + err := conn.DoQuery(reqCtx, maxAnswer, req, &resp) + cancel() + if err != nil { + log.Printf("query failed: %v", err) + time.Sleep(500 * time.Millisecond) + continue + } + + bytesCh <- uint64(len(resp.Payload)) + } +} + +func aggregateClientSpeed(ctx context.Context, bytesCh <-chan uint64, total *atomic.Uint64) { + ticker := time.NewTicker(1000 * time.Millisecond) + defer ticker.Stop() + + var last uint64 + lastTime := time.Now() + ctxDone := false + + for { + select { + case b, ok := <-bytesCh: + if !ok { + return + } + total.Add(b) + case <-ticker.C: + current := total.Load() + diff := current - last + now := time.Now() + elapsed := now.Sub(lastTime).Seconds() + if elapsed == 0 { + elapsed = 1e-9 + } + if !ctxDone { + mib := float64(diff) / (1024 * 1024) / elapsed + mbits := float64(diff*8) / 1e6 / elapsed + log.Printf("Client throughput: %.2f MiB/s (%.2f Mbit/s)", mib, mbits) + } + last = current + lastTime = now + case <-ctx.Done(): + ctxDone = true + } + } +} diff --git a/ton/integration_test.go b/ton/integration_test.go index b08756cd..e7c9db5e 100644 --- a/ton/integration_test.go +++ b/ton/integration_test.go @@ -596,6 +596,7 @@ func Test_LSErrorCase(t *testing.T) { } } +/* func TestAccountStorage_LoadFromCell_ExtraCurrencies(t *testing.T) { client := liteclient.NewConnectionPool() @@ -635,7 +636,7 @@ func TestAccountStorage_LoadFromCell_ExtraCurrencies(t *testing.T) { t.Fatal("expected extra currencies dict") } }) -} +}*/ func TestAPIClient_GetBlockProofForward(t *testing.T) { cfg, err := liteclient.GetConfigFromUrl(context.Background(), "https://ton-blockchain.github.io/global.config.json")