diff --git a/network/forward/forward.go b/network/forward/forward.go new file mode 100644 index 0000000000..28094f4d3d --- /dev/null +++ b/network/forward/forward.go @@ -0,0 +1,160 @@ +package forward + +import ( + "errors" + "fmt" + "sync" + + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/network" +) + +var ( + NoMorePeers = errors.New("no more peers") +) + +// Session encapsulates one single peer iteration query +type Session struct { + kademlia *network.KademliaLoadBalancer // kademlia backend + base []byte // base address to use for iteration + id int // id of session + capabilityIndex string // kademlia capabilityIndex in use + nextC chan struct{} // triggered to output one single peer from iterator + getC chan *network.Peer // receives peer from iterator +} + +// Id returns the session id +func (s *Session) Id() int { + return s.id +} + +// Get returns up to numPeers peers from the current position of the iterator +// If no further peers are available a NoMorePeers error will be returned +func (s *Session) Get(numPeers int) ([]*network.Peer, error) { + var result []*network.Peer + select { + case <-s.getC: + return result, NoMorePeers + default: + } + for i := 0; i < numPeers; i++ { + s.nextC <- struct{}{} + p, ok := <-s.getC + if !ok { + break + } + result = append(result, p) + } + return result, nil +} + +// starts the iterator and blocks for request for next peer through Get() +func (s *Session) load() error { + err := s.kademlia.EachBinFiltered(s.base, s.capabilityIndex, func(bin network.LBBin) bool { + for _, p := range bin.LBPeers { + _, ok := <-s.nextC + if !ok { + return false + } + s.getC <- p.Peer + } + return true + }) + close(s.getC) + return err +} + +// frees resources +func (s *Session) destroy() { + close(s.nextC) +} + +// SessionManager is the Session object factory +type SessionManager struct { + kademlia *network.KademliaLoadBalancer // underlying kademlia backend + sessions map[int]*Session // index of active sessions, mapped by session id + lastId int // last assigned id for session, starts at 1 to make create from context easier + mu sync.Mutex // protects sessions map +} + +// NewSessionManager is the SessionManager constructor +// Sessions created with the SessionManager will use the provided kademlia backend +// TODO: argument should be network.KademliaBackend, but needs KademliaLoadBalancer to implement this +func NewSessionManager(kademlia *network.KademliaLoadBalancer) *SessionManager { + return &SessionManager{ + sessions: make(map[int]*Session), + kademlia: kademlia, + } +} + +// New creates a new Session object with the given capabilityindex and base address +// if capabilityIndex is empty, the global kademlia database will be used +// if base is nil, the kademlia base address will be used as comparator for the iteration +func (m *SessionManager) New(capabilityIndex string, base []byte) *Session { + s := &Session{ + capabilityIndex: capabilityIndex, + kademlia: m.kademlia, + nextC: make(chan struct{}), + getC: make(chan *network.Peer), + } + if base == nil { + s.base = m.kademlia.BaseAddr() + } else { + s.base = base + } + go s.load() + return m.add(s) +} + +// Reap frees the Session object resources and removes it from the session index +func (m *SessionManager) Reap(sessionId int) { + s, ok := m.sessions[sessionId] + if !ok { + return + } + s.destroy() +} + +// ToContext creates a SessionContext from the existing Session matching the provided id +// if the session does not exist an error is returned +func (m *SessionManager) ToContext(id int) (*SessionContext, error) { + s, ok := m.sessions[id] + if !ok { + return nil, fmt.Errorf("No such session %d", id) + } + return &SessionContext{ + CapabilityIndex: s.capabilityIndex, + SessionId: s.id, + Address: s.base, + }, nil +} + +// FromContext retrieves or creates a Session from a provided context +// If the context has the "id" value set, the corresponding Session is returned, or error if it does not exist +// Otherwise, a new Session is created and returned, optionally with the "address" and/or "capability" values provided in the context +func (m *SessionManager) FromContext(sctx *SessionContext) (*Session, error) { + + sessionId, ok := sctx.Value("id").(int) + if ok { + s, ok := m.sessions[sessionId] + if !ok { + return nil, fmt.Errorf("No such session %d", sessionId) + } + return s, nil + } + + addr, _ := sctx.Value("address").([]byte) + capabilityIndex, _ := sctx.Value("capability").(string) + return m.New(capabilityIndex, addr), nil +} + +// adds a new session to the sessionmanager +func (m *SessionManager) add(s *Session) *Session { + m.mu.Lock() + defer m.mu.Unlock() + m.lastId++ + log.Trace("adding session", "id", m.lastId) + s.id = m.lastId + m.sessions[m.lastId] = s + return s +} diff --git a/network/forward/forward_test.go b/network/forward/forward_test.go new file mode 100644 index 0000000000..70f84b8796 --- /dev/null +++ b/network/forward/forward_test.go @@ -0,0 +1,153 @@ +package forward + +import ( + "bytes" + "testing" + + "github.com/ethersphere/swarm/log" + "github.com/ethersphere/swarm/network" + "github.com/ethersphere/swarm/network/capability" + "github.com/ethersphere/swarm/pot" + "github.com/ethersphere/swarm/testutil" +) + +func init() { + testutil.Init() +} + +// TestNew tests that the SessionManager constructor creates Session object with expected values +func TestNew(t *testing.T) { + addr := make([]byte, 32) + addr[31] = 0x01 + kadParams := network.NewKadParams() + kad := network.NewKademlia(addr, kadParams) + kadLB := network.NewKademliaLoadBalancer(kad, false) + defer kadLB.Stop() + + mgr := NewSessionManager(kadLB) + fwdBase := mgr.New("", nil) + defer mgr.Reap(fwdBase.Id()) + if !bytes.Equal(fwdBase.base, addr) { + t.Fatalf("base base; expected %x, got %x", addr, fwdBase.base) + } + if fwdBase.id != 1 { + t.Fatalf("sessionId; expected %d, got %d", 1, fwdBase.id) + } + + bytesNear := pot.NewAddressFromString("00000001") + capabilityIndex := "foo" + fwdExplicit := mgr.New(capabilityIndex, bytesNear) + if !bytes.Equal(fwdExplicit.base, bytesNear) { + t.Fatalf("base explicit; expected %x, got %x", bytesNear, fwdExplicit.base) + } + if fwdExplicit.id != 2 { + t.Fatalf("sessionId; expected %d, got %d", 2, fwdExplicit.id) + } + if fwdExplicit.capabilityIndex != capabilityIndex { + t.Fatalf("capabilityindex, expected %s, got %s", capabilityIndex, fwdExplicit.capabilityIndex) + } + if len(mgr.sessions) != 2 { + t.Fatalf("sessions array; expected %d, got %d", 2, len(mgr.sessions)) + } +} + +// TestManagerContext tests that the SessionManager's context translations creates Session objects with expected values, and retrieves existing matching Session objects +func TestManagerContext(t *testing.T) { + addr := make([]byte, 32) + addr[31] = 0x01 + kadParams := network.NewKadParams() + kad := network.NewKademlia(addr, kadParams) + kadLB := network.NewKademliaLoadBalancer(kad, false) + defer kadLB.Stop() + + mgr := NewSessionManager(kadLB) + fwdVoid := mgr.New("", nil) // id 1 + defer mgr.Reap(fwdVoid.Id()) + fwdOne := mgr.New("", nil) // id 2 + defer mgr.Reap(fwdOne.Id()) + if len(mgr.sessions) != 2 { + t.Fatalf("mgr session length; expected 2, got %d", len(mgr.sessions)) + } + if mgr.sessions[2] != fwdOne { + t.Fatalf("fromcontext; expected %p, got %p", fwdOne, mgr.sessions[2]) + } + + newAddr := make([]byte, 32) + newAddr[31] = 0x02 + fwdTwo := mgr.New("foo", newAddr) // id 3 + defer mgr.Reap(fwdTwo.Id()) + sctx, err := mgr.ToContext(3) + if err != nil { + t.Fatal(err) + } + if fwdTwo.id != sctx.SessionId { + t.Fatalf("to context id; expected %d, got %d", fwdTwo.id, sctx.SessionId) + } + if fwdTwo.capabilityIndex != sctx.CapabilityIndex { + t.Fatalf("to context id; expected %s, got %s", fwdTwo.capabilityIndex, sctx.CapabilityIndex) + } + if !bytes.Equal(fwdTwo.base, sctx.Address) { + t.Fatalf("to context id; expected %x, got %x", fwdTwo.base, sctx.Address) + } + + sctx = NewSessionContext("", nil) + sctx.SessionId = 3 + fwdThree, err := mgr.FromContext(sctx) + if err != nil { + t.Fatal(err) + } + if fwdThree != fwdTwo { + t.Fatalf("from new context; expected %p, got %p", fwdTwo, fwdThree) + } +} + +// TestGet verifies that the synchronous Get method retrieves peers in the correct order +func TestGet(t *testing.T) { + bytesOwn := pot.NewAddressFromString("00000000") + kadParams := network.NewKadParams() + kad := network.NewKademlia(bytesOwn, kadParams) + kadLB := network.NewKademliaLoadBalancer(kad, false) + defer kadLB.Stop() + cp := capability.NewCapability(4, 2) + kad.RegisterCapabilityIndex("foo", *cp) + + bytesFar := pot.NewAddressFromString("10000000") + bytesNear := pot.NewAddressFromString("00000001") + addrFar := network.NewBzzAddr(bytesFar, []byte{}) + addrNear := network.NewBzzAddr(bytesNear, []byte{}) + addrFar.Capabilities.Add(cp) + addrNear.Capabilities.Add(cp) + peerFar := network.NewPeer(&network.BzzPeer{BzzAddr: addrFar}, kad) + peerNear := network.NewPeer(&network.BzzPeer{BzzAddr: addrNear}, kad) + kad.Register(addrFar) + kad.Register(addrNear) + kad.On(peerFar) + kad.On(peerNear) + + mgr := NewSessionManager(kadLB) + fwd := mgr.New("foo", nil) + defer mgr.Reap(fwd.Id()) + p, err := fwd.Get(1) + if err != nil { + t.Fatal(err) + } + if len(p) != 1 { + t.Fatalf("get first count; expected 1, got %d", len(p)) + } + if !bytes.Equal(p[0].Address(), bytesNear) { + t.Fatalf("get first address; expected %x, got %x", bytesNear, p[0].Address()) + } + + p, err = fwd.Get(1) + if err != nil { + t.Fatal(err) + } + if len(p) != 1 { + t.Fatalf("get peers count; expected 1, got %d", len(p)) + } + if !bytes.Equal(p[0].Address(), bytesFar) { + t.Fatalf("get second address; expected %x, got %x", bytesFar, p[0].Address()) + } + log.Trace("peer", "peer", p) + +} diff --git a/network/forward/types.go b/network/forward/types.go new file mode 100644 index 0000000000..ec5a6597c7 --- /dev/null +++ b/network/forward/types.go @@ -0,0 +1,70 @@ +package forward + +import ( + "time" + + "github.com/ethersphere/swarm/network" +) + +var ( + zeroTime = time.Unix(0, 0) +) + +// SessionInterface provides an interface for an individual session object +type SessionInterface interface { + Subscribe() <-chan *network.Peer + Get(numberOfPeers int) ([]*network.Peer, error) +} + +// SessionContext is a context.Context that can be used to reference existing sessions or create new sessions +type SessionContext struct { + CapabilityIndex string + SessionId int + Address []byte +} + +// NewSessionContext creates a new SessionContext with the provided capabilityIndex and base address +func NewSessionContext(capabilityIndex string, base []byte) *SessionContext { + return &SessionContext{ + CapabilityIndex: capabilityIndex, + Address: base, + } +} + +// Deadline implements context.Context +func (c *SessionContext) Deadline() (time.Time, bool) { + return zeroTime, false +} + +// Done implements context.Context +func (c *SessionContext) Done() <-chan struct{} { + return nil +} + +// Err implements context.Context +func (c *SessionContext) Err() error { + return nil +} + +// Value implements context.Context +func (c *SessionContext) Value(k interface{}) interface{} { + ks, ok := k.(string) + if !ok { + return nil + } + switch ks { + case "address": + if c.Address == nil { + return nil + } + return c.Address + case "capability": + if c.CapabilityIndex == "" { + return nil + } + return c.CapabilityIndex + case "id": + return c.SessionId + } + return nil +} diff --git a/network/kademlia_load_balancer.go b/network/kademlia_load_balancer.go index a710554e60..35d8c44f33 100644 --- a/network/kademlia_load_balancer.go +++ b/network/kademlia_load_balancer.go @@ -24,6 +24,7 @@ import ( ) // KademliaBackend is the required interface of KademliaLoadBalancer. +// TODO: Consider if KademliaLoadBalancer itself should implement KademliaBackend type KademliaBackend interface { SubscribeToPeerChanges() *pubsubchannel.Subscription BaseAddr() []byte @@ -124,6 +125,11 @@ func (klb *KademliaLoadBalancer) EachBinDesc(base []byte, consumeBin LBBinConsum }) } +// BaseAddr returns the base address of the underlying kademlia backend +func (klb *KademliaLoadBalancer) BaseAddr() []byte { + return klb.kademlia.BaseAddr() +} + func (klb *KademliaLoadBalancer) peerBinToPeerList(bin *PeerBin) []LBPeer { resources := make([]resourceusestats.Resource, bin.Size) var i int diff --git a/pot/address.go b/pot/address.go index cc88c35d37..1ec916c2d5 100644 --- a/pot/address.go +++ b/pot/address.go @@ -79,6 +79,10 @@ func (a Address) Bytes() []byte { return a[:] } +func (a Address) Address() []byte { + return a[:] +} + // Distance returns the distance between address x and address y as a (comparable) big integer using the distance metric defined in the swarm specification // Fails if not all addresses are of equal length func Distance(x, y []byte) (*big.Int, error) {