Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

[![Based on TON][ton-svg]][ton]
[![Telegram Channel][tgc-svg]][tg-channel]
![Coverage](https://img.shields.io/badge/Coverage-70.4%25-brightgreen)
![Coverage](https://img.shields.io/badge/Coverage-70.1%25-brightgreen)

Golang library for interacting with TON blockchain.

Expand Down
11 changes: 11 additions & 0 deletions adnl/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,23 @@ func NewGatewayWithNetManager(key ed25519.PrivateKey, reader NetManager) *Gatewa
}

var PacketsBufferSize = 128 * 1024
var DefaultUDPBufferSize = 32 << 20

var DefaultListener = func(addr string) (net.PacketConn, error) {
lp, err := net.ListenPacket("udp", addr)
if err != nil {
return nil, err
}

if conn, ok := lp.(*net.UDPConn); ok {
if err := conn.SetReadBuffer(DefaultUDPBufferSize); err != nil {
Logger("[ADNL] failed to set read buffer:", err)
}
if err := conn.SetWriteBuffer(DefaultUDPBufferSize); err != nil {
Logger("[ADNL] failed to set write buffer:", err)
}
}

return NewSyncConn(lp, PacketsBufferSize), nil
}

Expand Down
48 changes: 36 additions & 12 deletions adnl/rldp/bbr2.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ func NewBBRv2Controller(l *TokenBucket, o BBRv2Options) *BBRv2Controller {
}
c.btlbw.Store(start)
c.pacingRate.Store(start)
c.inflight.Store(rateToInflight(start, c.minRTT.Load()))
initialInflight := rateToInflight(start, c.minRTT.Load())
c.applyInflightLimit(initialInflight)
c.hiInflight.Store(c.inflight.Load())
c.loInflight.Store(0)

Expand Down Expand Up @@ -201,6 +202,28 @@ func (c *BBRv2Controller) SetAppLimited(v bool) { c.appLimited.Store(v) }

func (c *BBRv2Controller) markActive() { c.lastActive.Store(nowMs()) }

func (c *BBRv2Controller) applyInflightLimit(inflight int64) {
if inflight < 0 {
inflight = 0
}
c.inflight.Store(inflight)

burst := inflight + inflight/4
const minBurst = 32 * 1024
if burst < minBurst {
burst = minBurst
}

if maxRate := c.opts.MaxRate; maxRate > 0 {
maxBurst := rateToInflight(maxRate, max64(c.minRTT.Load(), 1))
if maxBurst > 0 && burst > maxBurst {
burst = maxBurst
}
}

c.limiter.SetCapacityBytes(burst)
}

func (c *BBRv2Controller) OnNewSendBurst() {
if c.appLimited.Swap(false) {
c.fullBW.Store(0)
Expand Down Expand Up @@ -236,7 +259,7 @@ func (c *BBRv2Controller) ObserveRTT(rttMs int64) {
c.lastRTT.Store(rttMs)

if btl := c.btlbw.Load(); btl > 0 {
c.inflight.Store(rateToInflight(btl, c.minRTT.Load()))
c.applyInflightLimit(rateToInflight(btl, c.minRTT.Load()))
}
}

Expand Down Expand Up @@ -325,6 +348,7 @@ func (c *BBRv2Controller) resetForNewFlow(now int64) {
if model <= 0 {
model = rateToInflight(c.opts.MinRate, max64(c.minRTT.Load(), 1))
}
c.applyInflightLimit(model)
c.hiInflight.Store(max64(model+model/4, 2*1500))
c.lastAckTs.Store(now)
c.appLimited.Store(false)
Expand All @@ -337,15 +361,12 @@ func (c *BBRv2Controller) primeTokenBucketForRate(now int64, rate int64) {
}
rtt := max64(c.minRTT.Load(), 1)
bdp := rateToInflight(rate, rtt)
burst := max64(64*1024, min64(8*bdp, 4*1024*1024))
type tbIface interface {
SetBurst(int64)
AddTokens(int64)
}
if tb, ok := any(c.limiter).(tbIface); ok {
tb.SetBurst(burst)
tb.AddTokens(min64(bdp, burst))
target := bdp
if cur := c.inflight.Load(); cur > target {
target = cur
}
c.applyInflightLimit(target)
c.limiter.AddTokens(min64(bdp, target))
}

func humanBps(bps int64) string {
Expand Down Expand Up @@ -445,7 +466,10 @@ func (c *BBRv2Controller) InflightAllowance(currentBytes int64) int64 {
}

func (c *BBRv2Controller) CurrentMinRTT() int64 {
return c.minRTT.Load()
if !c.minRTTProvisional.Load() {
return c.minRTT.Load()
}
return -1
}

func (c *BBRv2Controller) CurrentRTT() int64 {
Expand Down Expand Up @@ -479,7 +503,7 @@ func (c *BBRv2Controller) updateModelAndRate(now int64) float64 {
if inflight <= 0 {
inflight = 2 * 1500 // at least two MSS-equivalents
}
c.inflight.Store(inflight)
c.applyInflightLimit(inflight)

// Losses in the last window → decide whether to lower inflight_hi
var lossRate float64
Expand Down
51 changes: 50 additions & 1 deletion adnl/rldp/bucket.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rldp

import (
"math"
"sync/atomic"
"time"
)
Expand Down Expand Up @@ -36,7 +37,18 @@ func (tb *TokenBucket) SetCapacityBytes(burstBytes int64) {
if burstBytes < 0 {
burstBytes = 0
}
atomic.StoreInt64(&tb.capacity, burstBytes*1000)
capMicro := toMicroBytes(burstBytes)
atomic.StoreInt64(&tb.capacity, capMicro)

for {
curr := atomic.LoadInt64(&tb.tokens)
if curr <= capMicro {
break
}
if atomic.CompareAndSwapInt64(&tb.tokens, curr, capMicro) {
break
}
}
}

func (tb *TokenBucket) SetRate(bps int64) {
Expand Down Expand Up @@ -124,6 +136,33 @@ func (tb *TokenBucket) ConsumePackets(maxPackets, partSize int) int {
return gotBytes / partSize
}

// SetBurst implements the interface used by the BBR controller for seeding
// the token bucket with a specific burst size (in bytes).
func (tb *TokenBucket) SetBurst(burstBytes int64) {
tb.SetCapacityBytes(burstBytes)
}

// AddTokens adds up to n bytes worth of tokens into the bucket.
// The total amount of tokens is always capped at the configured capacity.
func (tb *TokenBucket) AddTokens(n int64) {
if n <= 0 {
return
}

add := toMicroBytes(n)
for {
curr := atomic.LoadInt64(&tb.tokens)
cp := atomic.LoadInt64(&tb.capacity)
newVal := curr + add
if cp > 0 && newVal > cp {
newVal = cp
}
if atomic.CompareAndSwapInt64(&tb.tokens, curr, newVal) {
return
}
}
}

func (tb *TokenBucket) TryConsumeBytes(n int) bool {
return tb.ConsumeUpTo(n) == n
}
Expand All @@ -134,3 +173,13 @@ func abs64(x int64) int64 {
}
return x
}

func toMicroBytes(v int64) int64 {
if v >= math.MaxInt64/1000 {
return math.MaxInt64
}
if v <= math.MinInt64/1000 {
return math.MinInt64
}
return v * 1000
}
Loading
Loading