Skip to content
This repository was archived by the owner on Apr 3, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/lwip.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (s *lwipStack) Close() error {
s.cancel()

// Abort and close all TCP and UDP connections.
tcpConns.Range(func(_, c interface{}) bool {
tcpConns.Range(func(c, _ interface{}) bool {
c.(*tcpConn).Abort()
return true
})
Expand Down
72 changes: 29 additions & 43 deletions core/tcp_callback_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,11 @@ func tcpRecvFn(arg unsafe.Pointer, tpcb *C.struct_tcp_pcb, p *C.struct_pbuf, err
}
}()

conn, ok := tcpConns.Load(getConnKeyVal(arg))
if !ok {
// The connection does not exists.
C.tcp_abort(tpcb)
return C.ERR_ABRT
}
var conn = (*tcpConn)(arg)

if p == nil {
// Peer closed, EOF.
err := conn.(TCPConn).LocalClosed()
err := conn.LocalClosed()
switch err.(*lwipError).Code {
case LWIP_ERR_ABRT:
return C.ERR_ABRT
Expand All @@ -86,7 +81,7 @@ func tcpRecvFn(arg unsafe.Pointer, tpcb *C.struct_tcp_pcb, p *C.struct_pbuf, err
C.pbuf_copy_partial(p, unsafe.Pointer(&buf[0]), p.tot_len, 0)
}

rerr := conn.(TCPConn).Receive(buf[:totlen])
rerr := conn.Receive(buf[:totlen])
if rerr != nil {
switch rerr.(*lwipError).Code {
case LWIP_ERR_ABRT:
Expand Down Expand Up @@ -114,52 +109,43 @@ func tcpRecvFn(arg unsafe.Pointer, tpcb *C.struct_tcp_pcb, p *C.struct_pbuf, err

//export tcpSentFn
func tcpSentFn(arg unsafe.Pointer, tpcb *C.struct_tcp_pcb, len C.u16_t) C.err_t {
if conn, ok := tcpConns.Load(getConnKeyVal(arg)); ok {
err := conn.(TCPConn).Sent(uint16(len))
switch err.(*lwipError).Code {
case LWIP_ERR_ABRT:
return C.ERR_ABRT
case LWIP_ERR_OK:
return C.ERR_OK
default:
panic("unexpected error")
}
} else {
C.tcp_abort(tpcb)
var conn = (*tcpConn)(arg)
err := conn.Sent(uint16(len))
switch err.(*lwipError).Code {
case LWIP_ERR_ABRT:
return C.ERR_ABRT
case LWIP_ERR_OK:
return C.ERR_OK
default:
panic("unexpected error")
}
}

//export tcpErrFn
func tcpErrFn(arg unsafe.Pointer, err C.err_t) {
if conn, ok := tcpConns.Load(getConnKeyVal(arg)); ok {
switch err {
case C.ERR_ABRT:
// Aborted through tcp_abort or by a TCP timer
conn.(TCPConn).Err(errors.New("connection aborted"))
case C.ERR_RST:
// The connection was reset by the remote host
conn.(TCPConn).Err(errors.New("connection reseted"))
default:
conn.(TCPConn).Err(errors.New(fmt.Sprintf("lwip error code %v", int(err))))
}
var conn = (*tcpConn)(arg)
switch err {
case C.ERR_ABRT:
// Aborted through tcp_abort or by a TCP timer
conn.Err(errors.New("connection aborted"))
case C.ERR_RST:
// The connection was reset by the remote host
conn.Err(errors.New("connection reseted"))
default:
conn.Err(errors.New(fmt.Sprintf("lwip error code %v", int(err))))
}
}

//export tcpPollFn
func tcpPollFn(arg unsafe.Pointer, tpcb *C.struct_tcp_pcb) C.err_t {
if conn, ok := tcpConns.Load(getConnKeyVal(arg)); ok {
err := conn.(TCPConn).Poll()
switch err.(*lwipError).Code {
case LWIP_ERR_ABRT:
return C.ERR_ABRT
case LWIP_ERR_OK:
return C.ERR_OK
default:
panic("unexpected error")
}
} else {
C.tcp_abort(tpcb)
var conn = (*tcpConn)(arg)
err := conn.Poll()
switch err.(*lwipError).Code {
case LWIP_ERR_ABRT:
return C.ERR_ABRT
case LWIP_ERR_OK:
return C.ERR_OK
default:
panic("unexpected error")
}
}
27 changes: 10 additions & 17 deletions core/tcp_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@ package core
/*
#cgo CFLAGS: -I./c/include
#include "lwip/tcp.h"

void tcp_arg_cgo(struct tcp_pcb *pcb, uintptr_t ptr) {
tcp_arg(pcb, (void*)ptr);
}

*/
import "C"
import (
"errors"
"fmt"
"io"
"math/rand"
"net"
"sync"
"time"
Expand All @@ -18,6 +22,8 @@ import (

type tcpConnState uint

var tcpConns sync.Map

const (
// tcpNewConn is the initial state.
tcpNewConn tcpConnState = iota
Expand Down Expand Up @@ -62,8 +68,6 @@ type tcpConn struct {
handler TCPConnHandler
remoteAddr *net.TCPAddr
localAddr *net.TCPAddr
connKeyArg unsafe.Pointer
connKey uint32
canWrite *sync.Cond // Condition variable to implement TCP backpressure.
state tcpConnState
sndPipeReader *io.PipeReader
Expand All @@ -73,12 +77,6 @@ type tcpConn struct {
}

func newTCPConn(pcb *C.struct_tcp_pcb, handler TCPConnHandler) (TCPConn, error) {
connKeyArg := newConnKeyArg()
connKey := rand.Uint32()
setConnKeyVal(unsafe.Pointer(connKeyArg), connKey)

// Pass the key as arg for subsequent tcp callbacks.
C.tcp_arg(pcb, unsafe.Pointer(connKeyArg))

// Register callbacks.
setTCPRecvCallback(pcb)
Expand All @@ -92,16 +90,14 @@ func newTCPConn(pcb *C.struct_tcp_pcb, handler TCPConnHandler) (TCPConn, error)
handler: handler,
localAddr: ParseTCPAddr(ipAddrNTOA(pcb.remote_ip), uint16(pcb.remote_port)),
remoteAddr: ParseTCPAddr(ipAddrNTOA(pcb.local_ip), uint16(pcb.local_port)),
connKeyArg: connKeyArg,
connKey: connKey,
canWrite: sync.NewCond(&sync.Mutex{}),
state: tcpNewConn,
sndPipeReader: pipeReader,
sndPipeWriter: pipeWriter,
}

// Associate conn with key and save to the global map.
tcpConns.Store(connKey, conn)
C.tcp_arg_cgo(pcb, C.uintptr_t(uintptr(unsafe.Pointer(conn))))
tcpConns.Store(conn, true)

// Connecting remote host could take some time, do it in another goroutine
// to prevent blocking the lwip thread.
Expand Down Expand Up @@ -453,10 +449,7 @@ func (conn *tcpConn) LocalClosed() error {
}

func (conn *tcpConn) release() {
if _, found := tcpConns.Load(conn.connKey); found {
freeConnKeyArg(conn.connKeyArg)
tcpConns.Delete(conn.connKey)
}
tcpConns.Delete(conn)
conn.sndPipeWriter.Close()
conn.sndPipeReader.Close()
conn.state = tcpClosed
Expand Down
65 changes: 0 additions & 65 deletions core/tcp_conn_map.go

This file was deleted.