Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 197 additions & 22 deletions addrmgr/addrmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@
package addrmgr

import (
"encoding/base32"
"encoding/binary"
"encoding/json"
"fmt"
"net"
"os"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -53,6 +55,14 @@ type AddrManager struct {
// Tried addresses are addresses that have been tested.
addrTried [triedBucketCount][]*KnownAddress

// addrNewStats maintains statistics about addresses in each
// new bucket.
addrNewStats [newBucketCount]bucketStats

// addrTriedStats maintains statistics about addresses in each
// tried bucket.
addrTriedStats [triedBucketCount]bucketStats

// addrChanged signals whether the address manager needs to have its state
// serialized and saved to the file system.
addrChanged bool
Expand Down Expand Up @@ -95,6 +105,21 @@ type AddrManager struct {
triedBucketSize int
}

// addrTypeFilter specifies the wanted network address types for address
// selection.
type addrTypeFilter struct {
wantIPv4 bool
wantIPv6 bool
wantTORv3 bool
}

// bucketStats tracks the number of addresses by type within a single bucket.
type bucketStats struct {
numIPv4 uint16
numIPv6 uint16
numTORv3 uint16
}

// serializedKnownAddress is used to represent the serializable state of a
// known address. It excludes convenience fields that can be derived from the
// address manager's state.
Expand Down Expand Up @@ -217,6 +242,59 @@ const (
serialisationVersion = 1
)

// increment increases the count for the given address type in the bucket counts.
func (bs *bucketStats) increment(addrType NetAddressType) {
switch addrType {
case IPv4Address:
bs.numIPv4++
case IPv6Address:
bs.numIPv6++
case TORv3Address:
bs.numTORv3++
}
}

// decrement decreases the count for the given address type in the bucket counts.
func (bs *bucketStats) decrement(addrType NetAddressType) {
switch addrType {
case IPv4Address:
bs.numIPv4--
case IPv6Address:
bs.numIPv6--
case TORv3Address:
bs.numTORv3--
}
}

// total returns the sum of address counts matching the filter.
func (bs *bucketStats) total(filter addrTypeFilter) int {
sum := 0
if filter.wantIPv4 {
sum += int(bs.numIPv4)
}
if filter.wantIPv6 {
sum += int(bs.numIPv6)
}
if filter.wantTORv3 {
sum += int(bs.numTORv3)
}
return sum
}

// matches returns true if the bucket statistics have any addresses matching the filter.
func (bs *bucketStats) matches(filter addrTypeFilter) bool {
return (filter.wantIPv4 && bs.numIPv4 > 0) ||
(filter.wantIPv6 && bs.numIPv6 > 0) ||
(filter.wantTORv3 && bs.numTORv3 > 0)
}

// matches returns true if the address type matches the filter criteria.
func (f addrTypeFilter) matches(addrType NetAddressType) bool {
return (f.wantIPv4 && addrType == IPv4Address) ||
(f.wantIPv6 && addrType == IPv6Address) ||
(f.wantTORv3 && addrType == TORv3Address)
}

// addOrUpdateAddress is a helper function to either update an address already known
// to the address manager, or to add the address if not already known.
func (a *AddrManager) addOrUpdateAddress(netAddr, srcAddr *NetAddress) {
Expand Down Expand Up @@ -290,6 +368,7 @@ func (a *AddrManager) addOrUpdateAddress(netAddr, srcAddr *NetAddress) {
// Add to new bucket.
ka.refs++
a.addrNew[bucket][addrKey] = ka
a.addrNewStats[bucket].increment(netAddr.Type)
a.addrChanged = true

log.Tracef("Added new address %s for a total of %d addresses", addrKey,
Expand All @@ -309,6 +388,7 @@ func (a *AddrManager) expireNew(bucket int) {
if v.isBad() {
log.Tracef("expiring bad address %v", k)
delete(a.addrNew[bucket], k)
a.addrNewStats[bucket].decrement(v.na.Type)
a.addrChanged = true
v.refs--
if v.refs == 0 {
Expand All @@ -329,6 +409,7 @@ func (a *AddrManager) expireNew(bucket int) {
log.Tracef("expiring oldest address %v", key)

delete(a.addrNew[bucket], key)
a.addrNewStats[bucket].decrement(oldest.na.Type)
a.addrChanged = true
oldest.refs--
if oldest.refs == 0 {
Expand Down Expand Up @@ -567,6 +648,7 @@ func (a *AddrManager) deserializePeers(filePath string) error {
}
ka.refs++
a.addrNew[i][val] = ka
a.addrNewStats[i].increment(ka.na.Type)
}
}
for i := range sam.TriedBuckets {
Expand All @@ -580,6 +662,7 @@ func (a *AddrManager) deserializePeers(filePath string) error {
ka.tried = true
a.nTried++
a.addrTried[i] = append(a.addrTried[i], ka)
a.addrTriedStats[i].increment(ka.na.Type)
}
}

Expand Down Expand Up @@ -730,6 +813,8 @@ func (a *AddrManager) reset() {
for i := range a.addrTried {
a.addrTried[i] = nil
}
a.addrTriedStats = [triedBucketCount]bucketStats{}
a.addrNewStats = [newBucketCount]bucketStats{}
a.addrChanged = true
a.getNewBucket = func(netAddr, srcAddr *NetAddress) int {
return getNewBucket(a.key, netAddr, srcAddr)
Expand All @@ -744,6 +829,20 @@ func (a *AddrManager) reset() {
// returns the result. If the host string is not recognized as any known type,
// then an unknown address type is returned without error.
func EncodeHost(host string) (NetAddressType, []byte) {
// Check if this is a valid TORv3 address.
if len(host) == 62 && strings.HasSuffix(host, ".onion") {
// TORv3 addresses tend to be lowercase by convention, but
// Go's base32.StdEncoding.DecodeString expects uppercase
// input. Convert to uppercase for successful decoding.
torAddressBytes, err := base32.StdEncoding.DecodeString(
strings.ToUpper(host[:56]))
if err == nil {
if pubkey, valid := isTORv3(torAddressBytes); valid {
return TORv3Address, pubkey[:]
}
}
}

// Look for IPv4 or IPv6 addresses
if ip := net.ParseIP(host); ip != nil {
if isIPv4(ip) {
Expand All @@ -757,34 +856,81 @@ func EncodeHost(host string) (NetAddressType, []byte) {
}

// GetAddress returns a single address that should be routable. It picks a
// random one from the possible addresses with preference given to ones that
// have not been used recently and should not pick 'close' addresses
// consecutively.
// random one from the possible addresses that satisfy the provided filter
// with preference given to ones that have not been used recently and should
// not pick 'close' addresses consecutively.
//
// This function is safe for concurrent access.
func (a *AddrManager) GetAddress() *KnownAddress {
func (a *AddrManager) GetAddress(filterFn NetAddressTypeFilter) *KnownAddress {
a.mtx.Lock()
defer a.mtx.Unlock()

if a.numAddresses() == 0 {
return nil
}

filter := addrTypeFilter{
wantIPv4: filterFn(IPv4Address),
wantIPv6: filterFn(IPv6Address),
wantTORv3: filterFn(TORv3Address),
}

if !filter.wantIPv4 && !filter.wantIPv6 && !filter.wantTORv3 {
return nil
}

// Collect indices of tried and new buckets that match the filter.
var triedBucketIdxsBuf [triedBucketCount]int
var newBucketIdxsBuf [newBucketCount]int
triedBucketIdxs := triedBucketIdxsBuf[:0]
newBucketIdxs := newBucketIdxsBuf[:0]
for i := range a.addrTriedStats {
if a.addrTriedStats[i].matches(filter) {
triedBucketIdxs = append(triedBucketIdxs, i)
}
}
for i := range a.addrNewStats {
if a.addrNewStats[i].matches(filter) {
newBucketIdxs = append(newBucketIdxs, i)
}
}

numTried := len(triedBucketIdxs)
numNew := len(newBucketIdxs)

// Return early if no buckets match the filter.
if numTried == 0 && numNew == 0 {
return nil
}

// Use a 50% chance for choosing between tried and new table entries.
large := 1 << 30
factor := 1.0
if a.nTried > 0 && (a.nNew == 0 || rand.IntN(2) == 0) {
if numTried > 0 && (numNew == 0 || rand.IntN(2) == 0) {
// Tried entry.
for {
// Pick a random bucket.
bucket := rand.IntN(len(a.addrTried))
if len(a.addrTried[bucket]) == 0 {
continue
}
// Pick a random bucket from buckets matching the filter.
bucketIdx := triedBucketIdxs[rand.IntN(numTried)]
bucket := a.addrTried[bucketIdx]

// Then, a random entry in the list.
randEntry := rand.IntN(len(a.addrTried[bucket]))
ka := a.addrTried[bucket][randEntry]
// Calculate total number of tried addresses
// matching the filter, then pick a random entry.
counts := a.addrTriedStats[bucketIdx]
totalMatching := counts.total(filter)
nth := rand.IntN(totalMatching)

// Find the nth address matching the filter.
var ka *KnownAddress
for _, addr := range bucket {
if !filter.matches(addr.na.Type) {
continue
}
if nth == 0 {
ka = addr
break
}
nth--
}

randval := rand.IntN(large)
if float64(randval) < (factor * ka.chance() * float64(large)) {
Expand All @@ -796,22 +942,29 @@ func (a *AddrManager) GetAddress() *KnownAddress {
} else {
// New node.
for {
// Pick a random bucket.
bucket := rand.IntN(len(a.addrNew))
if len(a.addrNew[bucket]) == 0 {
continue
}
// Pick a random bucket from the buckets matching the filter.
bucketIdx := newBucketIdxs[rand.IntN(numNew)]
bucket := a.addrNew[bucketIdx]

// Then, a random entry in it.
// Calculate total number of new addresses
// matching the filter, then pick a random entry.
bucketStats := a.addrNewStats[bucketIdx]
totalMatching := bucketStats.total(filter)
nth := rand.IntN(totalMatching)

// Find the nth address matching the filter.
var ka *KnownAddress
nth := rand.IntN(len(a.addrNew[bucket]))
for _, value := range a.addrNew[bucket] {
for _, addr := range bucket {
if !filter.matches(addr.na.Type) {
continue
}
if nth == 0 {
ka = value
ka = addr
break
}
nth--
}

randval := rand.IntN(large)
if float64(randval) < (factor * ka.chance() * float64(large)) {
log.Tracef("Selected %s from new bucket", ka.na)
Expand Down Expand Up @@ -917,6 +1070,7 @@ func (a *AddrManager) Good(addr *NetAddress) error {
// we check for existence so we can record the first one
if _, ok := a.addrNew[i][addrKey]; ok {
delete(a.addrNew[i], addrKey)
a.addrNewStats[i].decrement(ka.na.Type)
a.addrChanged = true
ka.refs--
if addrNewAvailableIndex == -1 {
Expand All @@ -938,6 +1092,7 @@ func (a *AddrManager) Good(addr *NetAddress) error {
if len(a.addrTried[bucket]) < a.triedBucketSize {
ka.tried = true
a.addrTried[bucket] = append(a.addrTried[bucket], ka)
a.addrTriedStats[bucket].increment(ka.na.Type)
a.addrChanged = true
a.nTried++
return nil
Expand All @@ -960,6 +1115,8 @@ func (a *AddrManager) Good(addr *NetAddress) error {
// Replace oldest tried address in bucket with ka.
ka.tried = true
a.addrTried[bucket][oldestTriedIndex] = ka
a.addrTriedStats[bucket].decrement(rmka.na.Type)
a.addrTriedStats[bucket].increment(ka.na.Type)

rmka.tried = false
rmka.refs++
Expand All @@ -975,6 +1132,7 @@ func (a *AddrManager) Good(addr *NetAddress) error {

// We made sure there is space here just above.
a.addrNew[newBucket][rmkey] = rmka
a.addrNewStats[newBucket].increment(rmka.na.Type)
return nil
}

Expand Down Expand Up @@ -1082,6 +1240,9 @@ const (

// Ipv6Strong represents a connection state between two IPv6 addresses.
Ipv6Strong

// Private represents a connection state between two TORv3 addresses.
Private
)

// getRemoteReachabilityFromLocal returns the type of connection reachability
Expand All @@ -1093,6 +1254,16 @@ func getRemoteReachabilityFromLocal(localAddr, remoteAddr *NetAddress) NetAddres
case !remoteAddr.IsRoutable():
return Unreachable

case remoteAddr.Type == TORv3Address:
switch {
case localAddr.Type == TORv3Address:
return Private
case localAddr.IsRoutable() && localAddr.Type == IPv4Address:
return Ipv4
default:
return Default
}

case isRFC4380(remoteAddr.IP):
switch {
case !localAddr.IsRoutable():
Expand All @@ -1109,6 +1280,8 @@ func getRemoteReachabilityFromLocal(localAddr, remoteAddr *NetAddress) NetAddres
switch {
case localAddr.IsRoutable() && localAddr.Type == IPv4Address:
return Ipv4
case localAddr.Type == TORv3Address:
return Ipv4
default:
return Unreachable
}
Expand All @@ -1121,6 +1294,8 @@ func getRemoteReachabilityFromLocal(localAddr, remoteAddr *NetAddress) NetAddres
return Teredo
case localAddr.Type == IPv4Address:
return Ipv4
case localAddr.Type == TORv3Address:
return Ipv6Strong

// Is our IPv6 tunneled?
case isRFC3964(localAddr.IP) || isRFC6052(localAddr.IP) ||
Expand Down
Loading