From 425fb2eb0eb166050d948408ecd498382bc5da82 Mon Sep 17 00:00:00 2001 From: eycorsican Date: Sat, 28 Nov 2020 05:56:18 +0800 Subject: [PATCH] Remove intermediate buffering and solve the Read blocking issue --- core/conn.go | 4 ++- core/tcp_callback_export.go | 32 +++++++++++------- core/tcp_conn.go | 65 +++++++++++++++++++------------------ go.mod | 4 +-- go.sum | 13 +++++--- 5 files changed, 68 insertions(+), 50 deletions(-) diff --git a/core/conn.go b/core/conn.go index f25974c8..487f03d6 100644 --- a/core/conn.go +++ b/core/conn.go @@ -15,7 +15,9 @@ type TCPConn interface { Sent(len uint16) error // Receive will be called when data arrives from TUN. - Receive(data []byte) error + Receive() (<-chan []byte, error) + + ReceiveDone(int) // Err will be called when a fatal error has occurred on the connection. // The corresponding pcb is already freed when this callback is called diff --git a/core/tcp_callback_export.go b/core/tcp_callback_export.go index e936fe81..85fbff4f 100644 --- a/core/tcp_callback_export.go +++ b/core/tcp_callback_export.go @@ -76,17 +76,7 @@ func tcpRecvFn(arg unsafe.Pointer, tpcb *C.struct_tcp_pcb, p *C.struct_pbuf, err } } - var buf []byte - var totlen = int(p.tot_len) - if p.tot_len == p.len { - buf = (*[1 << 30]byte)(unsafe.Pointer(p.payload))[:totlen:totlen] - } else { - buf = NewBytes(totlen) - defer FreeBytes(buf) - C.pbuf_copy_partial(p, unsafe.Pointer(&buf[0]), p.tot_len, 0) - } - - rerr := conn.(TCPConn).Receive(buf[:totlen]) + readCh, rerr := conn.(TCPConn).Receive() if rerr != nil { switch rerr.(*lwipError).Code { case LWIP_ERR_ABRT: @@ -109,6 +99,26 @@ func tcpRecvFn(arg unsafe.Pointer, tpcb *C.struct_tcp_pcb, p *C.struct_pbuf, err } } + select { + case buf, ok := <-readCh: + if !ok { + C.tcp_recved(tpcb, p.tot_len) + C.tcp_shutdown(tpcb, 1, 0) + return C.ERR_OK + } + if len(buf) < int(p.tot_len) { + conn.(TCPConn).ReceiveDone(-1) + shouldFreePbuf = false + return C.ERR_CONN + } + C.pbuf_copy_partial(p, unsafe.Pointer(&buf[0]), p.tot_len, 0) + conn.(TCPConn).ReceiveDone(int(p.tot_len)) + C.tcp_recved(tpcb, p.tot_len) + default: + shouldFreePbuf = false + return C.ERR_CONN + } + return C.ERR_OK } diff --git a/core/tcp_conn.go b/core/tcp_conn.go index 539f8549..fa515e12 100644 --- a/core/tcp_conn.go +++ b/core/tcp_conn.go @@ -66,9 +66,10 @@ type tcpConn struct { connKey uint32 canWrite *sync.Cond // Condition variable to implement TCP backpressure. state tcpConnState - sndPipeReader *io.PipeReader - sndPipeWriter *io.PipeWriter + readCh chan []byte + readDoneCh chan int closeOnce sync.Once + readCloseOnce sync.Once closeErr error } @@ -86,18 +87,17 @@ func newTCPConn(pcb *C.struct_tcp_pcb, handler TCPConnHandler) (TCPConn, error) setTCPErrCallback(pcb) setTCPPollCallback(pcb, C.u8_t(TCP_POLL_INTERVAL)) - pipeReader, pipeWriter := io.Pipe() conn := &tcpConn{ - pcb: pcb, - handler: handler, - localAddr: ParseTCPAddr(ipAddrNTOA(pcb.remote_ip), uint16(pcb.remote_port)), - remoteAddr: ParseTCPAddr(ipAddrNTOA(pcb.local_ip), uint16(pcb.local_port)), - connKeyArg: connKeyArg, - connKey: connKey, - canWrite: sync.NewCond(&sync.Mutex{}), - state: tcpNewConn, - sndPipeReader: pipeReader, - sndPipeWriter: pipeWriter, + pcb: pcb, + handler: handler, + localAddr: ParseTCPAddr(ipAddrNTOA(pcb.remote_ip), uint16(pcb.remote_port)), + remoteAddr: ParseTCPAddr(ipAddrNTOA(pcb.local_ip), uint16(pcb.local_port)), + connKeyArg: connKeyArg, + connKey: connKey, + canWrite: sync.NewCond(&sync.Mutex{}), + state: tcpNewConn, + readCh: make(chan []byte, 1), + readDoneCh: make(chan int, 1), } // Associate conn with key and save to the global map. @@ -180,16 +180,15 @@ func (conn *tcpConn) receiveCheck() error { return nil } -func (conn *tcpConn) Receive(data []byte) error { +func (conn *tcpConn) Receive() (<-chan []byte, error) { if err := conn.receiveCheck(); err != nil { - return err - } - n, err := conn.sndPipeWriter.Write(data) - if err != nil { - return NewLWIPError(LWIP_ERR_CLSD) + return nil, err } - C.tcp_recved(conn.pcb, C.u16_t(n)) - return NewLWIPError(LWIP_ERR_OK) + return conn.readCh, nil +} + +func (conn *tcpConn) ReceiveDone(n int) { + conn.readDoneCh <- n } func (conn *tcpConn) Read(data []byte) (int, error) { @@ -204,12 +203,12 @@ func (conn *tcpConn) Read(data []byte) (int, error) { } conn.Unlock() - // Handler should get EOF. - n, err := conn.sndPipeReader.Read(data) - if err == io.ErrClosedPipe { - err = io.EOF + conn.readCh <- data + n := <-conn.readDoneCh + if n == -1 { + return 0, errors.New("insufficient read buffer") } - return n, err + return n, nil } // writeInternal enqueues data to snd_buf, and treats ERR_MEM returned by tcp_write not an error, @@ -312,7 +311,8 @@ func (conn *tcpConn) CloseWrite() error { } func (conn *tcpConn) CloseRead() error { - return conn.sndPipeReader.Close() + conn.readCloseOnce.Do(conn.closeReadCh) + return nil } func (conn *tcpConn) Sent(len uint16) error { @@ -386,6 +386,11 @@ func (conn *tcpConn) close() { } } +func (conn *tcpConn) closeReadCh() { + close(conn.readCh) + close(conn.readDoneCh) +} + func (conn *tcpConn) setLocalClosed() error { conn.Lock() defer conn.Unlock() @@ -394,9 +399,6 @@ func (conn *tcpConn) setLocalClosed() error { return nil } - // Causes the read half of the pipe returns. - conn.sndPipeWriter.Close() - if conn.state == tcpWriteClosed { conn.state = tcpClosing } else { @@ -464,8 +466,7 @@ func (conn *tcpConn) release() { freeConnKeyArg(conn.connKeyArg) tcpConns.Delete(conn.connKey) } - conn.sndPipeWriter.Close() - conn.sndPipeReader.Close() + conn.readCloseOnce.Do(conn.closeReadCh) conn.state = tcpClosed } diff --git a/go.mod b/go.mod index f55fccb9..226974a5 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,6 @@ go 1.13 require ( github.com/songgao/water v0.0.0-20190725173103-fd331bda3f4b - golang.org/x/net v0.0.0-20191021144547-ec77196f6094 - golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 + golang.org/x/net v0.0.0-20201021035429-f5854403a974 + golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f ) diff --git a/go.sum b/go.sum index 8cc9402a..72f4b57a 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,14 @@ github.com/songgao/water v0.0.0-20190725173103-fd331bda3f4b h1:+y4hCMc/WKsDbAPsOQZgBSaSZ26uh2afyaWeVg/3s/c= github.com/songgao/water v0.0.0-20190725173103-fd331bda3f4b/go.mod h1:P5HUIBuIWKbyjl083/loAegFkfbFNx5i2qEP4CNbm7E= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= -golang.org/x/net v0.0.0-20191021144547-ec77196f6094 h1:5O4U9trLjNpuhpynaDsqwCk+Tw6seqJz1EbqbnzHrc8= -golang.org/x/net v0.0.0-20191021144547-ec77196f6094/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20201021035429-f5854403a974 h1:IX6qOQeG5uLjB/hjjwjedwfjND0hgjPMMyO1RoIXQNI= +golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037 h1:YyJpGZS1sBuBCzLAR1VEpK193GlqGZbnPFnPV/5Rsb4= -golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f h1:+Nyd8tzPX9R7BWHguqsrbFdRx3WQ/1ib8I44HXV5yTA= +golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=