From 61b2d3a92894819bd49f7b5526eef103fadbcdcd Mon Sep 17 00:00:00 2001 From: jayantxie Date: Tue, 29 Jul 2025 13:19:32 +0800 Subject: [PATCH 01/21] feat: go connection state checker --- connstate/conn.go | 84 ++++++++++++++++++++++++++++ connstate/poll.go | 22 ++++++++ connstate/poll_bsd.go | 4 ++ connstate/poll_cache.go | 74 ++++++++++++++++++++++++ connstate/poll_linux.go | 53 ++++++++++++++++++ connstate/sys_epoll_linux.go | 64 +++++++++++++++++++++ connstate/sys_epoll_linux_arm64.go | 63 +++++++++++++++++++++ connstate/sys_epoll_linux_loong64.go | 65 +++++++++++++++++++++ 8 files changed, 429 insertions(+) create mode 100644 connstate/conn.go create mode 100644 connstate/poll.go create mode 100644 connstate/poll_bsd.go create mode 100644 connstate/poll_cache.go create mode 100644 connstate/poll_linux.go create mode 100644 connstate/sys_epoll_linux.go create mode 100644 connstate/sys_epoll_linux_arm64.go create mode 100644 connstate/sys_epoll_linux_loong64.go diff --git a/connstate/conn.go b/connstate/conn.go new file mode 100644 index 0000000..b1c0d4f --- /dev/null +++ b/connstate/conn.go @@ -0,0 +1,84 @@ +package connstate + +import ( + "errors" + "fmt" + "net" + "sync/atomic" + "syscall" + "unsafe" +) + +type ConnState uint32 + +const ( + StateOK ConnState = iota + StateRemoteClosed + StateClosed +) + +type ConnWithState interface { + net.Conn + State() ConnState +} + +func ListenConnState(conn net.Conn) (ConnWithState, error) { + pollInitOnce.Do(func() { + var err error + poll, err = openpoll() + if err != nil { + panic(fmt.Sprintf("gopkg.connstate openpoll failed, err: %v", err)) + } + }) + sysConn, ok := conn.(syscall.Conn) + if !ok { + return nil, errors.New("conn is not syscall.Conn") + } + rawConn, err := sysConn.SyscallConn() + if err != nil { + return nil, err + } + var fd *fdOperator + var opAddErr error + err = rawConn.Control(func(fileDescriptor uintptr) { + fd = pollcache.alloc() + fd.fd = int(fileDescriptor) + fd.conn = unsafe.Pointer(&connWithState{Conn: conn, fd: unsafe.Pointer(fd)}) + opAddErr = poll.control(fd, opAdd) + }) + if fd != nil { + if err != nil && opAddErr == nil { + _ = poll.control(fd, opDel) + } + if err != nil || opAddErr != nil { + pollcache.freeable(fd) + } + } + if err != nil { + return nil, err + } + if opAddErr != nil { + return nil, opAddErr + } + return (*connWithState)(fd.conn), nil +} + +type connWithState struct { + net.Conn + fd unsafe.Pointer // *fdOperator + state uint32 +} + +func (c *connWithState) Close() error { + fd := (*fdOperator)(atomic.LoadPointer(&c.fd)) + if fd != nil && atomic.CompareAndSwapPointer(&c.fd, unsafe.Pointer(fd), nil) { + atomic.StoreUint32(&c.state, uint32(StateClosed)) + _ = poll.control(fd, opDel) + pollcache.freeable(fd) + } + return c.Conn.Close() +} + +func (c *connWithState) State() ConnState { + return ConnState(atomic.LoadUint32(&c.state)) +} diff --git a/connstate/poll.go b/connstate/poll.go new file mode 100644 index 0000000..30a8254 --- /dev/null +++ b/connstate/poll.go @@ -0,0 +1,22 @@ +package connstate + +import ( + "sync" +) + +type op int + +const ( + opAdd op = iota + opDel +) + +var ( + pollInitOnce sync.Once + poll poller +) + +type poller interface { + wait() error + control(fd *fdOperator, op op) error +} diff --git a/connstate/poll_bsd.go b/connstate/poll_bsd.go new file mode 100644 index 0000000..7d0b9b1 --- /dev/null +++ b/connstate/poll_bsd.go @@ -0,0 +1,4 @@ +//go:build darwin || netbsd || freebsd || openbsd || dragonfly +// +build darwin netbsd freebsd openbsd dragonfly + +package connstate diff --git a/connstate/poll_cache.go b/connstate/poll_cache.go new file mode 100644 index 0000000..9161466 --- /dev/null +++ b/connstate/poll_cache.go @@ -0,0 +1,74 @@ +package connstate + +import ( + "sync" + "sync/atomic" + "unsafe" +) + +const pollBlockSize = 4 * 1024 + +type fdOperator struct { + link *fdOperator // in pollcache, protected by pollcache.lock + index int32 + + fd int + conn unsafe.Pointer // *connWithState +} + +var pollcache pollCache + +type pollCache struct { + lock sync.Mutex + first *fdOperator + cache []*fdOperator + // freelist store the freeable operator + // to reduce GC pressure, we only store op index here + freelist []int32 + freeack int32 +} + +func (c *pollCache) alloc() *fdOperator { + c.lock.Lock() + if c.first == nil { + const pdSize = unsafe.Sizeof(fdOperator{}) + n := pollBlockSize / pdSize + if n == 0 { + n = 1 + } + index := int32(len(c.cache)) + for i := uintptr(0); i < n; i++ { + pd := &fdOperator{index: index} + c.cache = append(c.cache, pd) + pd.link = c.first + c.first = pd + index++ + } + } + op := c.first + c.first = op.link + c.lock.Unlock() + return op +} + +// freeable mark the operator that could be freed +// only poller could do the real free action +func (c *pollCache) freeable(op *fdOperator) { + atomic.StorePointer(&op.conn, nil) + c.lock.Lock() + // reset all state + if atomic.CompareAndSwapInt32(&c.freeack, 1, 0) { + for _, idx := range c.freelist { + op := c.cache[idx] + op.link = c.first + c.first = op + } + c.freelist = c.freelist[:0] + } + c.freelist = append(c.freelist, op.index) + c.lock.Unlock() +} + +func (c *pollCache) free() { + atomic.StoreInt32(&c.freeack, 1) +} diff --git a/connstate/poll_linux.go b/connstate/poll_linux.go new file mode 100644 index 0000000..6b560a9 --- /dev/null +++ b/connstate/poll_linux.go @@ -0,0 +1,53 @@ +package connstate + +import ( + "sync/atomic" + "syscall" + "unsafe" +) + +type epoller struct { + epfd int +} + +func (p *epoller) wait() error { + for { + events := make([]epollevent, 32) + n, err := EpollWait(p.epfd, events, -1) + if err != nil && err != syscall.EINTR { + return err + } + for i := 0; i < n; i++ { + ev := &events[i] + op := *(**fdOperator)(unsafe.Pointer(&ev.data)) + if conn := (*connWithState)(atomic.LoadPointer(&op.conn)); conn != nil { + if ev.events&(syscall.EPOLLHUP|syscall.EPOLLRDHUP|syscall.EPOLLERR) != 0 { + atomic.CompareAndSwapUint32(&conn.state, uint32(StateOK), uint32(StateRemoteClosed)) + } + } + } + // we can make sure that there is no op remaining if finished handling all events + pollcache.free() + } +} + +func (p *epoller) control(fd *fdOperator, op op) error { + if op == opAdd { + var ev epollevent + ev.data = *(*[8]byte)(unsafe.Pointer(&fd)) + ev.events = syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLERR | syscall.EPOLLET + return EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd.fd, &ev) + } else { + var ev epollevent + return EpollCtl(p.epfd, syscall.EPOLL_CTL_DEL, fd.fd, &ev) + } +} + +func openpoll() (p poller, err error) { + var epfd int + epfd, err = EpollCreate(0) + if err != nil { + return nil, err + } + return &epoller{epfd: epfd}, nil +} diff --git a/connstate/sys_epoll_linux.go b/connstate/sys_epoll_linux.go new file mode 100644 index 0000000..12a6e70 --- /dev/null +++ b/connstate/sys_epoll_linux.go @@ -0,0 +1,64 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !arm64 && !loong64 +// +build !arm64,!loong64 + +package connstate + +import ( + "syscall" + "unsafe" +) + +const EPOLLET = -syscall.EPOLLET + +type epollevent struct { + events uint32 + data [8]byte // unaligned uintptr +} + +// EpollCreate implements epoll_create1. +func EpollCreate(flag int) (fd int, err error) { + var r0 uintptr + r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0) + if err == syscall.Errno(0) { + err = nil + } + return int(r0), err +} + +// EpollCtl implements epoll_ctl. +func EpollCtl(epfd, op, fd int, event *epollevent) (err error) { + _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0) + if err == syscall.Errno(0) { + err = nil + } + return err +} + +// EpollWait implements epoll_wait. +func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) { + var r0 uintptr + _p0 := unsafe.Pointer(&events[0]) + if msec == 0 { + r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0) + } else { + r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) + } + if err == syscall.Errno(0) { + err = nil + } + return int(r0), err +} diff --git a/connstate/sys_epoll_linux_arm64.go b/connstate/sys_epoll_linux_arm64.go new file mode 100644 index 0000000..3474580 --- /dev/null +++ b/connstate/sys_epoll_linux_arm64.go @@ -0,0 +1,63 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connstate + +import ( + "syscall" + "unsafe" +) + +const EPOLLET = syscall.EPOLLET + +type epollevent struct { + events uint32 + _ int32 + data [8]byte // unaligned uintptr +} + +// EpollCreate implements epoll_create1. +func EpollCreate(flag int) (fd int, err error) { + var r0 uintptr + r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0) + if err == syscall.Errno(0) { + err = nil + } + return int(r0), err +} + +// EpollCtl implements epoll_ctl. +func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { + _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0) + if err == syscall.Errno(0) { + err = nil + } + return err +} + +// EpollWait implements epoll_wait. +func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) { + var r0 uintptr + _p0 := unsafe.Pointer(&events[0]) + if msec == 0 { + r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0) + } else { + // TODO: release P + r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) + } + if err == syscall.Errno(0) { + err = nil + } + return int(r0), err +} diff --git a/connstate/sys_epoll_linux_loong64.go b/connstate/sys_epoll_linux_loong64.go new file mode 100644 index 0000000..4c313c1 --- /dev/null +++ b/connstate/sys_epoll_linux_loong64.go @@ -0,0 +1,65 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build linux && loong64 +// +build linux,loong64 + +package connstate + +import ( + "syscall" + "unsafe" +) + +const EPOLLET = syscall.EPOLLET + +type epollevent struct { + events uint32 + _ int32 + data [8]byte // unaligned uintptr +} + +// EpollCreate implements epoll_create1. +func EpollCreate(flag int) (fd int, err error) { + var r0 uintptr + r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0) + if err == syscall.Errno(0) { + err = nil + } + return int(r0), err +} + +// EpollCtl implements epoll_ctl. +func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { + _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0) + if err == syscall.Errno(0) { + err = nil + } + return err +} + +// EpollWait implements epoll_wait. +func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) { + var r0 uintptr + _p0 := unsafe.Pointer(&events[0]) + if msec == 0 { + r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0) + } else { + r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) + } + if err == syscall.Errno(0) { + err = nil + } + return int(r0), err +} From 216ec68b91ffdf5eb6822bd8a319cd1b9907553e Mon Sep 17 00:00:00 2001 From: jayantxie Date: Tue, 29 Jul 2025 13:47:54 +0800 Subject: [PATCH 02/21] fix: bugs --- connstate/conn.go | 9 +-------- connstate/poll.go | 10 ++++++++++ connstate/poll_linux.go | 2 +- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/connstate/conn.go b/connstate/conn.go index b1c0d4f..416d391 100644 --- a/connstate/conn.go +++ b/connstate/conn.go @@ -2,7 +2,6 @@ package connstate import ( "errors" - "fmt" "net" "sync/atomic" "syscall" @@ -23,13 +22,7 @@ type ConnWithState interface { } func ListenConnState(conn net.Conn) (ConnWithState, error) { - pollInitOnce.Do(func() { - var err error - poll, err = openpoll() - if err != nil { - panic(fmt.Sprintf("gopkg.connstate openpoll failed, err: %v", err)) - } - }) + pollInitOnce.Do(createPoller) sysConn, ok := conn.(syscall.Conn) if !ok { return nil, errors.New("conn is not syscall.Conn") diff --git a/connstate/poll.go b/connstate/poll.go index 30a8254..7ff3d70 100644 --- a/connstate/poll.go +++ b/connstate/poll.go @@ -1,6 +1,7 @@ package connstate import ( + "fmt" "sync" ) @@ -20,3 +21,12 @@ type poller interface { wait() error control(fd *fdOperator, op op) error } + +func createPoller() { + var err error + poll, err = openpoll() + if err != nil { + panic(fmt.Sprintf("gopkg.connstate openpoll failed, err: %v", err)) + } + go poll.wait() +} diff --git a/connstate/poll_linux.go b/connstate/poll_linux.go index 6b560a9..4090129 100644 --- a/connstate/poll_linux.go +++ b/connstate/poll_linux.go @@ -35,7 +35,7 @@ func (p *epoller) control(fd *fdOperator, op op) error { if op == opAdd { var ev epollevent ev.data = *(*[8]byte)(unsafe.Pointer(&fd)) - ev.events = syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLERR | syscall.EPOLLET + ev.events = syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLERR | EPOLLET return EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd.fd, &ev) } else { var ev epollevent From c2de4397161a17cb96e7566265c9e22340e0fbe8 Mon Sep 17 00:00:00 2001 From: jayantxie Date: Tue, 29 Jul 2025 14:29:06 +0800 Subject: [PATCH 03/21] fix: bugs --- connstate/poll_linux.go | 24 +++++----- connstate/sys_epoll_linux.go | 64 --------------------------- connstate/sys_epoll_linux_arm64.go | 63 --------------------------- connstate/sys_epoll_linux_loong64.go | 65 ---------------------------- 4 files changed, 13 insertions(+), 203 deletions(-) delete mode 100644 connstate/sys_epoll_linux.go delete mode 100644 connstate/sys_epoll_linux_arm64.go delete mode 100644 connstate/sys_epoll_linux_loong64.go diff --git a/connstate/poll_linux.go b/connstate/poll_linux.go index 4090129..fa78fdc 100644 --- a/connstate/poll_linux.go +++ b/connstate/poll_linux.go @@ -6,22 +6,24 @@ import ( "unsafe" ) +const _EPOLLET uint32 = 0x80000000 + type epoller struct { epfd int } func (p *epoller) wait() error { for { - events := make([]epollevent, 32) - n, err := EpollWait(p.epfd, events, -1) + events := make([]syscall.EpollEvent, 32) + n, err := syscall.EpollWait(p.epfd, events, -1) if err != nil && err != syscall.EINTR { return err } for i := 0; i < n; i++ { ev := &events[i] - op := *(**fdOperator)(unsafe.Pointer(&ev.data)) + op := *(**fdOperator)(unsafe.Pointer(&ev.Fd)) if conn := (*connWithState)(atomic.LoadPointer(&op.conn)); conn != nil { - if ev.events&(syscall.EPOLLHUP|syscall.EPOLLRDHUP|syscall.EPOLLERR) != 0 { + if ev.Events&(syscall.EPOLLHUP|syscall.EPOLLRDHUP|syscall.EPOLLERR) != 0 { atomic.CompareAndSwapUint32(&conn.state, uint32(StateOK), uint32(StateRemoteClosed)) } } @@ -33,19 +35,19 @@ func (p *epoller) wait() error { func (p *epoller) control(fd *fdOperator, op op) error { if op == opAdd { - var ev epollevent - ev.data = *(*[8]byte)(unsafe.Pointer(&fd)) - ev.events = syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLERR | EPOLLET - return EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd.fd, &ev) + var ev syscall.EpollEvent + *(*unsafe.Pointer)(unsafe.Pointer(&ev.Fd)) = unsafe.Pointer(fd) + ev.Events = syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLERR | _EPOLLET + return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd.fd, &ev) } else { - var ev epollevent - return EpollCtl(p.epfd, syscall.EPOLL_CTL_DEL, fd.fd, &ev) + var ev syscall.EpollEvent + return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_DEL, fd.fd, &ev) } } func openpoll() (p poller, err error) { var epfd int - epfd, err = EpollCreate(0) + epfd, err = syscall.EpollCreate(1) if err != nil { return nil, err } diff --git a/connstate/sys_epoll_linux.go b/connstate/sys_epoll_linux.go deleted file mode 100644 index 12a6e70..0000000 --- a/connstate/sys_epoll_linux.go +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright 2025 CloudWeGo Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !arm64 && !loong64 -// +build !arm64,!loong64 - -package connstate - -import ( - "syscall" - "unsafe" -) - -const EPOLLET = -syscall.EPOLLET - -type epollevent struct { - events uint32 - data [8]byte // unaligned uintptr -} - -// EpollCreate implements epoll_create1. -func EpollCreate(flag int) (fd int, err error) { - var r0 uintptr - r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0) - if err == syscall.Errno(0) { - err = nil - } - return int(r0), err -} - -// EpollCtl implements epoll_ctl. -func EpollCtl(epfd, op, fd int, event *epollevent) (err error) { - _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0) - if err == syscall.Errno(0) { - err = nil - } - return err -} - -// EpollWait implements epoll_wait. -func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) { - var r0 uintptr - _p0 := unsafe.Pointer(&events[0]) - if msec == 0 { - r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0) - } else { - r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) - } - if err == syscall.Errno(0) { - err = nil - } - return int(r0), err -} diff --git a/connstate/sys_epoll_linux_arm64.go b/connstate/sys_epoll_linux_arm64.go deleted file mode 100644 index 3474580..0000000 --- a/connstate/sys_epoll_linux_arm64.go +++ /dev/null @@ -1,63 +0,0 @@ -// Copyright 2025 CloudWeGo Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package connstate - -import ( - "syscall" - "unsafe" -) - -const EPOLLET = syscall.EPOLLET - -type epollevent struct { - events uint32 - _ int32 - data [8]byte // unaligned uintptr -} - -// EpollCreate implements epoll_create1. -func EpollCreate(flag int) (fd int, err error) { - var r0 uintptr - r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0) - if err == syscall.Errno(0) { - err = nil - } - return int(r0), err -} - -// EpollCtl implements epoll_ctl. -func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { - _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0) - if err == syscall.Errno(0) { - err = nil - } - return err -} - -// EpollWait implements epoll_wait. -func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) { - var r0 uintptr - _p0 := unsafe.Pointer(&events[0]) - if msec == 0 { - r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0) - } else { - // TODO: release P - r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) - } - if err == syscall.Errno(0) { - err = nil - } - return int(r0), err -} diff --git a/connstate/sys_epoll_linux_loong64.go b/connstate/sys_epoll_linux_loong64.go deleted file mode 100644 index 4c313c1..0000000 --- a/connstate/sys_epoll_linux_loong64.go +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2025 CloudWeGo Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build linux && loong64 -// +build linux,loong64 - -package connstate - -import ( - "syscall" - "unsafe" -) - -const EPOLLET = syscall.EPOLLET - -type epollevent struct { - events uint32 - _ int32 - data [8]byte // unaligned uintptr -} - -// EpollCreate implements epoll_create1. -func EpollCreate(flag int) (fd int, err error) { - var r0 uintptr - r0, _, err = syscall.RawSyscall(syscall.SYS_EPOLL_CREATE1, uintptr(flag), 0, 0) - if err == syscall.Errno(0) { - err = nil - } - return int(r0), err -} - -// EpollCtl implements epoll_ctl. -func EpollCtl(epfd int, op int, fd int, event *epollevent) (err error) { - _, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_CTL, uintptr(epfd), uintptr(op), uintptr(fd), uintptr(unsafe.Pointer(event)), 0, 0) - if err == syscall.Errno(0) { - err = nil - } - return err -} - -// EpollWait implements epoll_wait. -func EpollWait(epfd int, events []epollevent, msec int) (n int, err error) { - var r0 uintptr - _p0 := unsafe.Pointer(&events[0]) - if msec == 0 { - r0, _, err = syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), 0, 0, 0) - } else { - r0, _, err = syscall.Syscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) - } - if err == syscall.Errno(0) { - err = nil - } - return int(r0), err -} From 1a2bb9e05793e8324f09ce03417db7428c6d251a Mon Sep 17 00:00:00 2001 From: jayantxie Date: Tue, 29 Jul 2025 15:27:54 +0800 Subject: [PATCH 04/21] fix: bugs --- connstate/poll_bsd.go | 74 +++++++++++++++++++++++++++++++++++++++++ connstate/poll_linux.go | 5 +-- 2 files changed, 77 insertions(+), 2 deletions(-) diff --git a/connstate/poll_bsd.go b/connstate/poll_bsd.go index 7d0b9b1..30896fe 100644 --- a/connstate/poll_bsd.go +++ b/connstate/poll_bsd.go @@ -2,3 +2,77 @@ // +build darwin netbsd freebsd openbsd dragonfly package connstate + +import ( + "sync/atomic" + "syscall" + "unsafe" +) + +type kqueue struct { + fd int +} + +func (p *kqueue) wait() error { + events := make([]syscall.Kevent_t, 1024) + for { + // TODO: handoff p by entersyscallblock, or make poller run as a thread. + n, err := syscall.Kevent(p.fd, nil, events, nil) + if err != nil && err != syscall.EINTR { + // exit gracefully + if err == syscall.EBADF { + return nil + } + return err + } + for i := 0; i < n; i++ { + ev := &events[i] + op := *(**fdOperator)(unsafe.Pointer(&ev.Udata)) + if conn := (*connWithState)(atomic.LoadPointer(&op.conn)); conn != nil { + if ev.Flags&(syscall.EV_EOF) != 0 { + atomic.CompareAndSwapUint32(&conn.state, uint32(StateOK), uint32(StateRemoteClosed)) + } + } + } + // we can make sure that there is no op remaining if finished handling all events + pollcache.free() + } +} + +func (p *kqueue) control(fd *fdOperator, op op) error { + evs := make([]syscall.Kevent_t, 1) + evs[0].Ident = uint64(fd.fd) + *(**fdOperator)(unsafe.Pointer(&evs[0].Udata)) = fd + if op == opAdd { + evs[0].Filter = syscall.EVFILT_READ + evs[0].Flags = syscall.EV_ADD | syscall.EV_ENABLE | syscall.EV_CLEAR + // prevent ordinary data from triggering + evs[0].Flags |= syscall.EV_OOBAND + evs[0].Fflags = syscall.NOTE_LOWAT + evs[0].Data = 0x7FFFFFFF + _, err := syscall.Kevent(p.fd, evs, nil, nil) + return err + } else { + evs[0].Filter = syscall.EVFILT_READ + evs[0].Flags = syscall.EV_DELETE + _, err := syscall.Kevent(p.fd, evs, nil, nil) + return err + } +} + +func openpoll() (p poller, err error) { + fd, err := syscall.Kqueue() + if err != nil { + return nil, err + } + _, err = syscall.Kevent(fd, []syscall.Kevent_t{{ + Ident: 0, + Filter: syscall.EVFILT_USER, + Flags: syscall.EV_ADD | syscall.EV_CLEAR, + }}, nil, nil) + if err != nil { + syscall.Close(fd) + return nil, err + } + return &kqueue{fd: fd}, nil +} diff --git a/connstate/poll_linux.go b/connstate/poll_linux.go index fa78fdc..d364f43 100644 --- a/connstate/poll_linux.go +++ b/connstate/poll_linux.go @@ -13,8 +13,9 @@ type epoller struct { } func (p *epoller) wait() error { + events := make([]syscall.EpollEvent, 128) for { - events := make([]syscall.EpollEvent, 32) + // TODO: handoff p by entersyscallblock, or make poller run as a thread. n, err := syscall.EpollWait(p.epfd, events, -1) if err != nil && err != syscall.EINTR { return err @@ -36,7 +37,7 @@ func (p *epoller) wait() error { func (p *epoller) control(fd *fdOperator, op op) error { if op == opAdd { var ev syscall.EpollEvent - *(*unsafe.Pointer)(unsafe.Pointer(&ev.Fd)) = unsafe.Pointer(fd) + *(**fdOperator)(unsafe.Pointer(&ev.Fd)) = fd ev.Events = syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLERR | _EPOLLET return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd.fd, &ev) } else { From 9482470104aa6fef257ad3879a42402b38fd5fc0 Mon Sep 17 00:00:00 2001 From: jayantxie Date: Tue, 29 Jul 2025 22:22:00 +0800 Subject: [PATCH 05/21] fix: bugs --- connstate/conn.go | 19 +++++++++---------- connstate/poll_bsd.go | 2 +- connstate/poll_linux.go | 2 +- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/connstate/conn.go b/connstate/conn.go index 416d391..b0861e4 100644 --- a/connstate/conn.go +++ b/connstate/conn.go @@ -16,12 +16,12 @@ const ( StateClosed ) -type ConnWithState interface { - net.Conn +type ConnStater interface { + Close() error State() ConnState } -func ListenConnState(conn net.Conn) (ConnWithState, error) { +func ListenConnState(conn net.Conn) (ConnStater, error) { pollInitOnce.Do(createPoller) sysConn, ok := conn.(syscall.Conn) if !ok { @@ -36,7 +36,7 @@ func ListenConnState(conn net.Conn) (ConnWithState, error) { err = rawConn.Control(func(fileDescriptor uintptr) { fd = pollcache.alloc() fd.fd = int(fileDescriptor) - fd.conn = unsafe.Pointer(&connWithState{Conn: conn, fd: unsafe.Pointer(fd)}) + fd.conn = unsafe.Pointer(&connStater{fd: unsafe.Pointer(fd)}) opAddErr = poll.control(fd, opAdd) }) if fd != nil { @@ -53,25 +53,24 @@ func ListenConnState(conn net.Conn) (ConnWithState, error) { if opAddErr != nil { return nil, opAddErr } - return (*connWithState)(fd.conn), nil + return (*connStater)(fd.conn), nil } -type connWithState struct { - net.Conn +type connStater struct { fd unsafe.Pointer // *fdOperator state uint32 } -func (c *connWithState) Close() error { +func (c *connStater) Close() error { fd := (*fdOperator)(atomic.LoadPointer(&c.fd)) if fd != nil && atomic.CompareAndSwapPointer(&c.fd, unsafe.Pointer(fd), nil) { atomic.StoreUint32(&c.state, uint32(StateClosed)) _ = poll.control(fd, opDel) pollcache.freeable(fd) } - return c.Conn.Close() + return nil } -func (c *connWithState) State() ConnState { +func (c *connStater) State() ConnState { return ConnState(atomic.LoadUint32(&c.state)) } diff --git a/connstate/poll_bsd.go b/connstate/poll_bsd.go index 30896fe..01a0a0f 100644 --- a/connstate/poll_bsd.go +++ b/connstate/poll_bsd.go @@ -28,7 +28,7 @@ func (p *kqueue) wait() error { for i := 0; i < n; i++ { ev := &events[i] op := *(**fdOperator)(unsafe.Pointer(&ev.Udata)) - if conn := (*connWithState)(atomic.LoadPointer(&op.conn)); conn != nil { + if conn := (*connStater)(atomic.LoadPointer(&op.conn)); conn != nil { if ev.Flags&(syscall.EV_EOF) != 0 { atomic.CompareAndSwapUint32(&conn.state, uint32(StateOK), uint32(StateRemoteClosed)) } diff --git a/connstate/poll_linux.go b/connstate/poll_linux.go index d364f43..2948c75 100644 --- a/connstate/poll_linux.go +++ b/connstate/poll_linux.go @@ -23,7 +23,7 @@ func (p *epoller) wait() error { for i := 0; i < n; i++ { ev := &events[i] op := *(**fdOperator)(unsafe.Pointer(&ev.Fd)) - if conn := (*connWithState)(atomic.LoadPointer(&op.conn)); conn != nil { + if conn := (*connStater)(atomic.LoadPointer(&op.conn)); conn != nil { if ev.Events&(syscall.EPOLLHUP|syscall.EPOLLRDHUP|syscall.EPOLLERR) != 0 { atomic.CompareAndSwapUint32(&conn.state, uint32(StateOK), uint32(StateRemoteClosed)) } From 910a54224cc14e9805586c5dae0e7f09ca77f6c3 Mon Sep 17 00:00:00 2001 From: jayantxie Date: Tue, 29 Jul 2025 22:59:44 +0800 Subject: [PATCH 06/21] fix: bugs --- connstate/conn.go | 14 ++++++++ connstate/poll.go | 14 ++++++++ connstate/poll_bsd.go | 14 ++++++++ connstate/poll_cache.go | 14 ++++++++ connstate/poll_linux.go | 19 ++++++++-- internal/epoll/sys_epoll_linux.go | 43 +++++++++++++++++++++++ internal/epoll/sys_epoll_linux_arm64.go | 40 +++++++++++++++++++++ internal/epoll/sys_epoll_linux_loong64.go | 40 +++++++++++++++++++++ internal/epoll/unsafe.go | 23 ++++++++++++ 9 files changed, 219 insertions(+), 2 deletions(-) create mode 100644 internal/epoll/sys_epoll_linux.go create mode 100644 internal/epoll/sys_epoll_linux_arm64.go create mode 100644 internal/epoll/sys_epoll_linux_loong64.go create mode 100644 internal/epoll/unsafe.go diff --git a/connstate/conn.go b/connstate/conn.go index b0861e4..1d80e9a 100644 --- a/connstate/conn.go +++ b/connstate/conn.go @@ -1,3 +1,17 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package connstate import ( diff --git a/connstate/poll.go b/connstate/poll.go index 7ff3d70..1f27ef6 100644 --- a/connstate/poll.go +++ b/connstate/poll.go @@ -1,3 +1,17 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package connstate import ( diff --git a/connstate/poll_bsd.go b/connstate/poll_bsd.go index 01a0a0f..e1e1fb8 100644 --- a/connstate/poll_bsd.go +++ b/connstate/poll_bsd.go @@ -1,3 +1,17 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + //go:build darwin || netbsd || freebsd || openbsd || dragonfly // +build darwin netbsd freebsd openbsd dragonfly diff --git a/connstate/poll_cache.go b/connstate/poll_cache.go index 9161466..f831583 100644 --- a/connstate/poll_cache.go +++ b/connstate/poll_cache.go @@ -1,3 +1,17 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package connstate import ( diff --git a/connstate/poll_linux.go b/connstate/poll_linux.go index 2948c75..6d3688c 100644 --- a/connstate/poll_linux.go +++ b/connstate/poll_linux.go @@ -1,9 +1,25 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package connstate import ( "sync/atomic" "syscall" "unsafe" + + iepoll "github.com/cloudwego/gopkg/internal/epoll" ) const _EPOLLET uint32 = 0x80000000 @@ -15,8 +31,7 @@ type epoller struct { func (p *epoller) wait() error { events := make([]syscall.EpollEvent, 128) for { - // TODO: handoff p by entersyscallblock, or make poller run as a thread. - n, err := syscall.EpollWait(p.epfd, events, -1) + n, err := iepoll.EpollWait(p.epfd, events, -1) if err != nil && err != syscall.EINTR { return err } diff --git a/internal/epoll/sys_epoll_linux.go b/internal/epoll/sys_epoll_linux.go new file mode 100644 index 0000000..190e42f --- /dev/null +++ b/internal/epoll/sys_epoll_linux.go @@ -0,0 +1,43 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !arm64 && !loong64 +// +build !arm64,!loong64 + +package netpoll + +import ( + "syscall" + "unsafe" +) + +var _zero uintptr + +// EpollWait implements epoll_wait. +func EpollWait(epfd int, events []syscall.EpollEvent, msec int) (n int, err error) { + var _p0 unsafe.Pointer + if len(events) > 0 { + _p0 = unsafe.Pointer(&events[0]) + } else { + _p0 = unsafe.Pointer(&_zero) + } + entersyscallblock() + r0, _, e1 := syscall.RawSyscall6(syscall.SYS_EPOLL_WAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) + exitsyscall() + n = int(r0) + if e1 != 0 { + err = e1 + } + return +} diff --git a/internal/epoll/sys_epoll_linux_arm64.go b/internal/epoll/sys_epoll_linux_arm64.go new file mode 100644 index 0000000..9e59723 --- /dev/null +++ b/internal/epoll/sys_epoll_linux_arm64.go @@ -0,0 +1,40 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +import ( + "syscall" + "unsafe" +) + +var _zero uintptr + +// EpollWait implements epoll_wait. +func EpollWait(epfd int, events []syscall.EpollEvent, msec int) (n int, err error) { + var _p0 unsafe.Pointer + if len(events) > 0 { + _p0 = unsafe.Pointer(&events[0]) + } else { + _p0 = unsafe.Pointer(&_zero) + } + entersyscallblock() + r0, _, e1 := syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) + exitsyscall() + n = int(r0) + if e1 != 0 { + err = e1 + } + return +} diff --git a/internal/epoll/sys_epoll_linux_loong64.go b/internal/epoll/sys_epoll_linux_loong64.go new file mode 100644 index 0000000..9e59723 --- /dev/null +++ b/internal/epoll/sys_epoll_linux_loong64.go @@ -0,0 +1,40 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +import ( + "syscall" + "unsafe" +) + +var _zero uintptr + +// EpollWait implements epoll_wait. +func EpollWait(epfd int, events []syscall.EpollEvent, msec int) (n int, err error) { + var _p0 unsafe.Pointer + if len(events) > 0 { + _p0 = unsafe.Pointer(&events[0]) + } else { + _p0 = unsafe.Pointer(&_zero) + } + entersyscallblock() + r0, _, e1 := syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) + exitsyscall() + n = int(r0) + if e1 != 0 { + err = e1 + } + return +} diff --git a/internal/epoll/unsafe.go b/internal/epoll/unsafe.go new file mode 100644 index 0000000..46ba7cc --- /dev/null +++ b/internal/epoll/unsafe.go @@ -0,0 +1,23 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package netpoll + +import _ "unsafe" + +//go:linkname entersyscallblock runtime.entersyscallblock +func entersyscallblock() + +//go:linkname exitsyscall runtime.exitsyscall +func exitsyscall() From 817c1140631d2af50e95b43a4864fb3585ab491d Mon Sep 17 00:00:00 2001 From: jayantxie Date: Tue, 29 Jul 2025 23:18:57 +0800 Subject: [PATCH 07/21] fix: bugs --- internal/epoll/sys_epoll_linux_loong64.go | 40 ------------------- ...linux_arm64.go => sys_epoll_linux_risc.go} | 4 ++ 2 files changed, 4 insertions(+), 40 deletions(-) delete mode 100644 internal/epoll/sys_epoll_linux_loong64.go rename internal/epoll/{sys_epoll_linux_arm64.go => sys_epoll_linux_risc.go} (93%) diff --git a/internal/epoll/sys_epoll_linux_loong64.go b/internal/epoll/sys_epoll_linux_loong64.go deleted file mode 100644 index 9e59723..0000000 --- a/internal/epoll/sys_epoll_linux_loong64.go +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2025 CloudWeGo Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package netpoll - -import ( - "syscall" - "unsafe" -) - -var _zero uintptr - -// EpollWait implements epoll_wait. -func EpollWait(epfd int, events []syscall.EpollEvent, msec int) (n int, err error) { - var _p0 unsafe.Pointer - if len(events) > 0 { - _p0 = unsafe.Pointer(&events[0]) - } else { - _p0 = unsafe.Pointer(&_zero) - } - entersyscallblock() - r0, _, e1 := syscall.RawSyscall6(syscall.SYS_EPOLL_PWAIT, uintptr(epfd), uintptr(_p0), uintptr(len(events)), uintptr(msec), 0, 0) - exitsyscall() - n = int(r0) - if e1 != 0 { - err = e1 - } - return -} diff --git a/internal/epoll/sys_epoll_linux_arm64.go b/internal/epoll/sys_epoll_linux_risc.go similarity index 93% rename from internal/epoll/sys_epoll_linux_arm64.go rename to internal/epoll/sys_epoll_linux_risc.go index 9e59723..fe97e9a 100644 --- a/internal/epoll/sys_epoll_linux_arm64.go +++ b/internal/epoll/sys_epoll_linux_risc.go @@ -12,6 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +//go:build linux && (arm64 || loong64) +// +build linux +// +build arm64 loong64 + package netpoll import ( From c4afa267f3653bb28378e1823cef7aefa3542a7e Mon Sep 17 00:00:00 2001 From: jayantxie Date: Wed, 30 Jul 2025 11:54:44 +0800 Subject: [PATCH 08/21] fix: bugs --- connstate/poll_linux.go | 4 ++-- internal/{epoll => syscall}/sys_epoll_linux.go | 0 .../sys_epoll_linux_arm64_.go} | 0 internal/{epoll => syscall}/unsafe.go | 0 4 files changed, 2 insertions(+), 2 deletions(-) rename internal/{epoll => syscall}/sys_epoll_linux.go (100%) rename internal/{epoll/sys_epoll_linux_risc.go => syscall/sys_epoll_linux_arm64_.go} (100%) rename internal/{epoll => syscall}/unsafe.go (100%) diff --git a/connstate/poll_linux.go b/connstate/poll_linux.go index 6d3688c..e21dc16 100644 --- a/connstate/poll_linux.go +++ b/connstate/poll_linux.go @@ -19,7 +19,7 @@ import ( "syscall" "unsafe" - iepoll "github.com/cloudwego/gopkg/internal/epoll" + isyscall "github.com/cloudwego/gopkg/internal/syscall" ) const _EPOLLET uint32 = 0x80000000 @@ -31,7 +31,7 @@ type epoller struct { func (p *epoller) wait() error { events := make([]syscall.EpollEvent, 128) for { - n, err := iepoll.EpollWait(p.epfd, events, -1) + n, err := isyscall.EpollWait(p.epfd, events, -1) if err != nil && err != syscall.EINTR { return err } diff --git a/internal/epoll/sys_epoll_linux.go b/internal/syscall/sys_epoll_linux.go similarity index 100% rename from internal/epoll/sys_epoll_linux.go rename to internal/syscall/sys_epoll_linux.go diff --git a/internal/epoll/sys_epoll_linux_risc.go b/internal/syscall/sys_epoll_linux_arm64_.go similarity index 100% rename from internal/epoll/sys_epoll_linux_risc.go rename to internal/syscall/sys_epoll_linux_arm64_.go diff --git a/internal/epoll/unsafe.go b/internal/syscall/unsafe.go similarity index 100% rename from internal/epoll/unsafe.go rename to internal/syscall/unsafe.go From 487a4f780155411b507fcd60db70dc91018f85c9 Mon Sep 17 00:00:00 2001 From: jayantxie Date: Wed, 30 Jul 2025 17:28:20 +0800 Subject: [PATCH 09/21] fix: bugs --- connstate/conn.go | 8 ++++++++ connstate/conn_test.go | 15 +++++++++++++++ connstate/poll.go | 5 ++++- connstate/poll_cache.go | 2 +- 4 files changed, 28 insertions(+), 2 deletions(-) create mode 100644 connstate/conn_test.go diff --git a/connstate/conn.go b/connstate/conn.go index 1d80e9a..3e8650f 100644 --- a/connstate/conn.go +++ b/connstate/conn.go @@ -25,16 +25,24 @@ import ( type ConnState uint32 const ( + // StateOK means the connection is normal. StateOK ConnState = iota + // StateRemoteClosed means the remote side has closed the connection. StateRemoteClosed + // StateClosed means the connection has been closed by local side. StateClosed ) +// ConnStater is the interface to get the ConnState of a connection. +// Must call Close to release it if you're going to close the connection. type ConnStater interface { Close() error State() ConnState } +// ListenConnState returns a ConnStater for the given connection. +// It's generally used for availability checks when obtaining connections from a connection pool. +// Conn must be a syscall.Conn. func ListenConnState(conn net.Conn) (ConnStater, error) { pollInitOnce.Do(createPoller) sysConn, ok := conn.(syscall.Conn) diff --git a/connstate/conn_test.go b/connstate/conn_test.go new file mode 100644 index 0000000..43c25b7 --- /dev/null +++ b/connstate/conn_test.go @@ -0,0 +1,15 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connstate diff --git a/connstate/poll.go b/connstate/poll.go index 1f27ef6..f6c4264 100644 --- a/connstate/poll.go +++ b/connstate/poll.go @@ -42,5 +42,8 @@ func createPoller() { if err != nil { panic(fmt.Sprintf("gopkg.connstate openpoll failed, err: %v", err)) } - go poll.wait() + go func() { + err := poll.wait() + fmt.Printf("gopkg.connstate epoll wait exit, err: %v\n", err) + }() } diff --git a/connstate/poll_cache.go b/connstate/poll_cache.go index f831583..c65fcf1 100644 --- a/connstate/poll_cache.go +++ b/connstate/poll_cache.go @@ -27,7 +27,7 @@ type fdOperator struct { index int32 fd int - conn unsafe.Pointer // *connWithState + conn unsafe.Pointer // *connStater } var pollcache pollCache From 625674bc33e251f48265a82333ea0572db8bdaba Mon Sep 17 00:00:00 2001 From: jayantxie Date: Wed, 30 Jul 2025 18:06:30 +0800 Subject: [PATCH 10/21] feat: add tests --- connstate/conn_test.go | 167 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) diff --git a/connstate/conn_test.go b/connstate/conn_test.go index 43c25b7..49dab3a 100644 --- a/connstate/conn_test.go +++ b/connstate/conn_test.go @@ -13,3 +13,170 @@ // limitations under the License. package connstate + +import ( + "errors" + "net" + "syscall" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestListenConnState(t *testing.T) { + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + panic(err) + } + go func() { + for { + conn, err := ln.Accept() + assert.Nil(t, err) + go func() { + buf := make([]byte, 11) + _, err := conn.Read(buf) + assert.Nil(t, err) + conn.Close() + }() + } + }() + conn, err := net.Dial("tcp", ln.Addr().String()) + assert.Nil(t, err) + stater, err := ListenConnState(conn) + assert.Nil(t, err) + assert.Equal(t, StateOK, stater.State()) + _, err = conn.Write([]byte("hello world")) + assert.Nil(t, err) + time.Sleep(100 * time.Millisecond) + assert.Equal(t, StateRemoteClosed, stater.State()) + assert.Nil(t, stater.Close()) + assert.Nil(t, conn.Close()) + assert.Equal(t, StateClosed, stater.State()) +} + +type mockPoller struct { + controlFunc func(fd *fdOperator, op op) error +} + +func (m *mockPoller) wait() error { + return nil +} + +func (m *mockPoller) control(fd *fdOperator, op op) error { + return m.controlFunc(fd, op) +} + +type mockConn struct { + net.Conn + controlFunc func(f func(fd uintptr)) error +} + +func (c *mockConn) SyscallConn() (syscall.RawConn, error) { + return &mockRawConn{ + controlFunc: c.controlFunc, + }, nil +} + +type mockRawConn struct { + syscall.RawConn + controlFunc func(f func(fd uintptr)) error +} + +func (r *mockRawConn) Control(f func(fd uintptr)) error { + return r.controlFunc(f) +} + +func TestListenConnState_Err(t *testing.T) { + var expectDetach bool + pollInitOnce.Do(func() {}) + cases := []struct { + name string + connControlFunc func(f func(fd uintptr)) error + pollControlFunc func(fd *fdOperator, op op) error + expectErr error + }{ + { + name: "err conn control", + connControlFunc: func(f func(fd uintptr)) error { + return errors.New("err conn control") + }, + expectErr: errors.New("err conn control"), + }, + { + name: "err poll control", + connControlFunc: func(f func(fd uintptr)) error { + f(1) + return nil + }, + pollControlFunc: func(fd *fdOperator, op op) error { + assert.Equal(t, fd.fd, 1) + return errors.New("err poll control") + }, + expectErr: errors.New("err poll control"), + }, + { + name: "err conn control after poll add", + connControlFunc: func(f func(fd uintptr)) error { + f(1) + return errors.New("err conn control after poll add") + }, + pollControlFunc: func(fd *fdOperator, op op) error { + if op == opDel { + expectDetach = true + } + assert.Equal(t, fd.fd, 1) + return nil + }, + expectErr: errors.New("err conn control after poll add"), + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + poll = &mockPoller{ + controlFunc: c.pollControlFunc, + } + conn := &mockConn{ + controlFunc: c.connControlFunc, + } + _, err := ListenConnState(conn) + assert.Equal(t, c.expectErr, err) + }) + } + assert.True(t, expectDetach) +} + +func BenchmarkListenConnState(b *testing.B) { + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + panic(err) + } + go func() { + for { + conn, err := ln.Accept() + assert.Nil(b, err) + go func() { + buf := make([]byte, 11) + _, err := conn.Read(buf) + assert.Nil(b, err) + conn.Close() + }() + } + }() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + conn, err := net.Dial("tcp", ln.Addr().String()) + assert.Nil(b, err) + stater, err := ListenConnState(conn) + assert.Nil(b, err) + assert.Equal(b, StateOK, stater.State()) + _, err = conn.Write([]byte("hello world")) + assert.Nil(b, err) + time.Sleep(20 * time.Millisecond) + assert.Equal(b, StateRemoteClosed, stater.State()) + assert.Nil(b, stater.Close()) + assert.Nil(b, conn.Close()) + assert.Equal(b, StateClosed, stater.State()) + } + }) +} From e86ddd39a67e6afc77297cd5f56acfa308aec628 Mon Sep 17 00:00:00 2001 From: jayantxie Date: Wed, 30 Jul 2025 18:24:39 +0800 Subject: [PATCH 11/21] optimize: epoll max 1024 events --- connstate/poll_bsd.go | 1 - connstate/poll_linux.go | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/connstate/poll_bsd.go b/connstate/poll_bsd.go index e1e1fb8..818e3e8 100644 --- a/connstate/poll_bsd.go +++ b/connstate/poll_bsd.go @@ -30,7 +30,6 @@ type kqueue struct { func (p *kqueue) wait() error { events := make([]syscall.Kevent_t, 1024) for { - // TODO: handoff p by entersyscallblock, or make poller run as a thread. n, err := syscall.Kevent(p.fd, nil, events, nil) if err != nil && err != syscall.EINTR { // exit gracefully diff --git a/connstate/poll_linux.go b/connstate/poll_linux.go index e21dc16..ca61814 100644 --- a/connstate/poll_linux.go +++ b/connstate/poll_linux.go @@ -29,7 +29,7 @@ type epoller struct { } func (p *epoller) wait() error { - events := make([]syscall.EpollEvent, 128) + events := make([]syscall.EpollEvent, 1024) for { n, err := isyscall.EpollWait(p.epfd, events, -1) if err != nil && err != syscall.EINTR { From 922c272d92afae4d6f311a60dbde9af144433391 Mon Sep 17 00:00:00 2001 From: jayantxie Date: Thu, 31 Jul 2025 11:40:55 +0800 Subject: [PATCH 12/21] fix: bugs --- connstate/conn_test.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/connstate/conn_test.go b/connstate/conn_test.go index 49dab3a..ff82227 100644 --- a/connstate/conn_test.go +++ b/connstate/conn_test.go @@ -17,6 +17,7 @@ package connstate import ( "errors" "net" + "sync" "syscall" "testing" "time" @@ -24,7 +25,11 @@ import ( "github.com/stretchr/testify/assert" ) +var testMutex sync.Mutex + func TestListenConnState(t *testing.T) { + testMutex.Lock() + defer testMutex.Unlock() ln, err := net.Listen("tcp", "localhost:0") if err != nil { panic(err) @@ -88,8 +93,11 @@ func (r *mockRawConn) Control(f func(fd uintptr)) error { } func TestListenConnState_Err(t *testing.T) { + testMutex.Lock() + defer testMutex.Unlock() var expectDetach bool pollInitOnce.Do(func() {}) + oldPoll := poll cases := []struct { name string connControlFunc func(f func(fd uintptr)) error @@ -144,9 +152,16 @@ func TestListenConnState_Err(t *testing.T) { }) } assert.True(t, expectDetach) + if oldPoll != nil { + poll = oldPoll + } else { + createPoller() + } } func BenchmarkListenConnState(b *testing.B) { + testMutex.Lock() + defer testMutex.Unlock() ln, err := net.Listen("tcp", "localhost:0") if err != nil { panic(err) From 1bd39815edecbe6b796b83fe0fcf8bbc8d99b582 Mon Sep 17 00:00:00 2001 From: jayantxie Date: Thu, 31 Jul 2025 14:16:37 +0800 Subject: [PATCH 13/21] fix: bugs --- connstate/conn.go | 4 ++-- connstate/poll_linux.go | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/connstate/conn.go b/connstate/conn.go index 3e8650f..4dd5ba3 100644 --- a/connstate/conn.go +++ b/connstate/conn.go @@ -58,7 +58,7 @@ func ListenConnState(conn net.Conn) (ConnStater, error) { err = rawConn.Control(func(fileDescriptor uintptr) { fd = pollcache.alloc() fd.fd = int(fileDescriptor) - fd.conn = unsafe.Pointer(&connStater{fd: unsafe.Pointer(fd)}) + atomic.StorePointer(&fd.conn, unsafe.Pointer(&connStater{fd: unsafe.Pointer(fd)})) opAddErr = poll.control(fd, opAdd) }) if fd != nil { @@ -75,7 +75,7 @@ func ListenConnState(conn net.Conn) (ConnStater, error) { if opAddErr != nil { return nil, opAddErr } - return (*connStater)(fd.conn), nil + return (*connStater)(atomic.LoadPointer(&fd.conn)), nil } type connStater struct { diff --git a/connstate/poll_linux.go b/connstate/poll_linux.go index ca61814..b60c156 100644 --- a/connstate/poll_linux.go +++ b/connstate/poll_linux.go @@ -28,6 +28,7 @@ type epoller struct { epfd int } +//go:nocheckptr func (p *epoller) wait() error { events := make([]syscall.EpollEvent, 1024) for { From d0b7168208e1edc0ada1fb6c07df002c91490797 Mon Sep 17 00:00:00 2001 From: jayantxie Date: Thu, 31 Jul 2025 14:24:51 +0800 Subject: [PATCH 14/21] fix: typo --- _typos.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/_typos.toml b/_typos.toml index dfd475b..567e835 100644 --- a/_typos.toml +++ b/_typos.toml @@ -2,3 +2,6 @@ [files] extend-exclude = ["go.mod", "go.sum"] + +[default.extend-words] +typ = "typ" # type \ No newline at end of file From b780b6f349981557ea3b326c18e7a366610a39d5 Mon Sep 17 00:00:00 2001 From: jayantxie Date: Thu, 31 Jul 2025 14:35:37 +0800 Subject: [PATCH 15/21] chore: add comments --- connstate/conn_test.go | 29 +++++++++++++++++++---------- connstate/poll_linux.go | 2 ++ 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/connstate/conn_test.go b/connstate/conn_test.go index ff82227..ed41092 100644 --- a/connstate/conn_test.go +++ b/connstate/conn_test.go @@ -16,6 +16,7 @@ package connstate import ( "errors" + "io" "net" "sync" "syscall" @@ -53,7 +54,10 @@ func TestListenConnState(t *testing.T) { assert.Equal(t, StateOK, stater.State()) _, err = conn.Write([]byte("hello world")) assert.Nil(t, err) - time.Sleep(100 * time.Millisecond) + buf := make([]byte, 1) + _, err = conn.Read(buf) + assert.Equal(t, io.EOF, err) + time.Sleep(10 * time.Millisecond) assert.Equal(t, StateRemoteClosed, stater.State()) assert.Nil(t, stater.Close()) assert.Nil(t, conn.Close()) @@ -95,9 +99,17 @@ func (r *mockRawConn) Control(f func(fd uintptr)) error { func TestListenConnState_Err(t *testing.T) { testMutex.Lock() defer testMutex.Unlock() - var expectDetach bool - pollInitOnce.Do(func() {}) + // replace poll + pollInitOnce.Do(createPoller) oldPoll := poll + defer func() { + poll = oldPoll + }() + // test detach + var expectDetach bool + defer func() { + assert.True(t, expectDetach) + }() cases := []struct { name string connControlFunc func(f func(fd uintptr)) error @@ -151,12 +163,6 @@ func TestListenConnState_Err(t *testing.T) { assert.Equal(t, c.expectErr, err) }) } - assert.True(t, expectDetach) - if oldPoll != nil { - poll = oldPoll - } else { - createPoller() - } } func BenchmarkListenConnState(b *testing.B) { @@ -187,7 +193,10 @@ func BenchmarkListenConnState(b *testing.B) { assert.Equal(b, StateOK, stater.State()) _, err = conn.Write([]byte("hello world")) assert.Nil(b, err) - time.Sleep(20 * time.Millisecond) + buf := make([]byte, 1) + _, err = conn.Read(buf) + assert.Equal(b, io.EOF, err) + time.Sleep(10 * time.Millisecond) assert.Equal(b, StateRemoteClosed, stater.State()) assert.Nil(b, stater.Close()) assert.Nil(b, conn.Close()) diff --git a/connstate/poll_linux.go b/connstate/poll_linux.go index b60c156..50ede74 100644 --- a/connstate/poll_linux.go +++ b/connstate/poll_linux.go @@ -32,6 +32,8 @@ type epoller struct { func (p *epoller) wait() error { events := make([]syscall.EpollEvent, 1024) for { + // epoll wait is a blocking syscall, so we need to call entersyscallblock to handoff P, + // and let the P run other goroutines. n, err := isyscall.EpollWait(p.epfd, events, -1) if err != nil && err != syscall.EINTR { return err From 25b770f562b315e0e42382f916c166f5a060f4ee Mon Sep 17 00:00:00 2001 From: jayantxie Date: Thu, 31 Jul 2025 21:39:19 +0800 Subject: [PATCH 16/21] test: handoff p --- connstate/conn_test.go | 129 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 117 insertions(+), 12 deletions(-) diff --git a/connstate/conn_test.go b/connstate/conn_test.go index ed41092..325a06d 100644 --- a/connstate/conn_test.go +++ b/connstate/conn_test.go @@ -18,6 +18,7 @@ import ( "errors" "io" "net" + "runtime" "sync" "syscall" "testing" @@ -26,11 +27,7 @@ import ( "github.com/stretchr/testify/assert" ) -var testMutex sync.Mutex - func TestListenConnState(t *testing.T) { - testMutex.Lock() - defer testMutex.Unlock() ln, err := net.Listen("tcp", "localhost:0") if err != nil { panic(err) @@ -39,12 +36,12 @@ func TestListenConnState(t *testing.T) { for { conn, err := ln.Accept() assert.Nil(t, err) - go func() { + go func(conn net.Conn) { buf := make([]byte, 11) _, err := conn.Read(buf) assert.Nil(t, err) conn.Close() - }() + }(conn) } }() conn, err := net.Dial("tcp", ln.Addr().String()) @@ -97,8 +94,6 @@ func (r *mockRawConn) Control(f func(fd uintptr)) error { } func TestListenConnState_Err(t *testing.T) { - testMutex.Lock() - defer testMutex.Unlock() // replace poll pollInitOnce.Do(createPoller) oldPoll := poll @@ -166,8 +161,6 @@ func TestListenConnState_Err(t *testing.T) { } func BenchmarkListenConnState(b *testing.B) { - testMutex.Lock() - defer testMutex.Unlock() ln, err := net.Listen("tcp", "localhost:0") if err != nil { panic(err) @@ -176,14 +169,15 @@ func BenchmarkListenConnState(b *testing.B) { for { conn, err := ln.Accept() assert.Nil(b, err) - go func() { + go func(conn net.Conn) { buf := make([]byte, 11) _, err := conn.Read(buf) assert.Nil(b, err) conn.Close() - }() + }(conn) } }() + b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { conn, err := net.Dial("tcp", ln.Addr().String()) @@ -204,3 +198,114 @@ func BenchmarkListenConnState(b *testing.B) { } }) } + +type statefulConn struct { + net.Conn + stater ConnStater +} + +func (s *statefulConn) Close() error { + s.stater.Close() + return s.Conn.Close() +} + +type connpool struct { + mu sync.Mutex + conns []*statefulConn +} + +func (p *connpool) get(dialFunc func() *statefulConn) *statefulConn { + p.mu.Lock() + if len(p.conns) == 0 { + p.mu.Unlock() + return dialFunc() + } + for i := len(p.conns) - 1; i >= 0; i-- { + conn := p.conns[i] + if conn.stater.State() == StateOK { + p.conns = p.conns[:i] + p.mu.Unlock() + return conn + } else { + conn.Close() + } + } + p.conns = p.conns[:0] + p.mu.Unlock() + return dialFunc() +} + +func (p *connpool) put(conn *statefulConn) { + p.mu.Lock() + defer p.mu.Unlock() + p.conns = append(p.conns, conn) +} + +// BenchmarkHandoffP is used to verify the impact on performance caused by P being occupied by the poller +// when using syscall.EpollWait() in a high-load scenario. +// To compare with syscall.EpollWait(), you could run `go test -bench=BenchmarkHandoffP -benchtime=10s .` +// to test the first time, and replace isyscall.EpollWait() with syscall.EpollWait() to test the second time. +func BenchmarkHandoffP(b *testing.B) { + // set GOMAXPROCS to 1 to make P resources scarce + runtime.GOMAXPROCS(1) + ln, err := net.Listen("tcp", "localhost:0") + if err != nil { + panic(err) + } + go func() { + for { + conn, err := ln.Accept() + assert.Nil(b, err) + go func(conn net.Conn) { + var count uint64 + for { + buf := make([]byte, 11) + _, err := conn.Read(buf) + if err != nil { + conn.Close() + return + } + _, err = conn.Write(buf) + if err != nil { + conn.Close() + return + } + count++ + if count == 1000 { + conn.Close() + return + } + } + }(conn) + } + }() + cp := &connpool{} + dialFunc := func() *statefulConn { + conn, err := net.Dial("tcp", ln.Addr().String()) + assert.Nil(b, err) + stater, err := ListenConnState(conn) + assert.Nil(b, err) + return &statefulConn{ + Conn: conn, + stater: stater, + } + } + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + conn := cp.get(dialFunc) + buf := make([]byte, 11) + _, err := conn.Write(buf) + if err != nil { + conn.Close() + continue + } + _, err = conn.Read(buf) + if err != nil { + conn.Close() + continue + } + cp.put(conn) + } + }) +} From 547d609812b56d9672e87b60253f2d60a7a29fc5 Mon Sep 17 00:00:00 2001 From: jayantxie Date: Thu, 31 Jul 2025 22:11:51 +0800 Subject: [PATCH 17/21] fix: bugs --- connstate/conn.go | 2 ++ connstate/poll_cache.go | 21 ++++++++------------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/connstate/conn.go b/connstate/conn.go index 4dd5ba3..3a7b6cc 100644 --- a/connstate/conn.go +++ b/connstate/conn.go @@ -66,6 +66,7 @@ func ListenConnState(conn net.Conn) (ConnStater, error) { _ = poll.control(fd, opDel) } if err != nil || opAddErr != nil { + atomic.StorePointer(&fd.conn, nil) pollcache.freeable(fd) } } @@ -88,6 +89,7 @@ func (c *connStater) Close() error { if fd != nil && atomic.CompareAndSwapPointer(&c.fd, unsafe.Pointer(fd), nil) { atomic.StoreUint32(&c.state, uint32(StateClosed)) _ = poll.control(fd, opDel) + atomic.StorePointer(&fd.conn, nil) pollcache.freeable(fd) } return nil diff --git a/connstate/poll_cache.go b/connstate/poll_cache.go index c65fcf1..707413a 100644 --- a/connstate/poll_cache.go +++ b/connstate/poll_cache.go @@ -16,7 +16,6 @@ package connstate import ( "sync" - "sync/atomic" "unsafe" ) @@ -39,7 +38,6 @@ type pollCache struct { // freelist store the freeable operator // to reduce GC pressure, we only store op index here freelist []int32 - freeack int32 } func (c *pollCache) alloc() *fdOperator { @@ -68,21 +66,18 @@ func (c *pollCache) alloc() *fdOperator { // freeable mark the operator that could be freed // only poller could do the real free action func (c *pollCache) freeable(op *fdOperator) { - atomic.StorePointer(&op.conn, nil) c.lock.Lock() - // reset all state - if atomic.CompareAndSwapInt32(&c.freeack, 1, 0) { - for _, idx := range c.freelist { - op := c.cache[idx] - op.link = c.first - c.first = op - } - c.freelist = c.freelist[:0] - } c.freelist = append(c.freelist, op.index) c.lock.Unlock() } func (c *pollCache) free() { - atomic.StoreInt32(&c.freeack, 1) + c.lock.Lock() + for _, idx := range c.freelist { + op := c.cache[idx] + op.link = c.first + c.first = op + } + c.freelist = c.freelist[:0] + c.lock.Unlock() } From 0f9dc99d043aa734e0b2631f988537f97ad71790 Mon Sep 17 00:00:00 2001 From: jayantxie Date: Thu, 31 Jul 2025 22:53:41 +0800 Subject: [PATCH 18/21] fix: bugs --- connstate/conn_test.go | 50 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/connstate/conn_test.go b/connstate/conn_test.go index 325a06d..15aae54 100644 --- a/connstate/conn_test.go +++ b/connstate/conn_test.go @@ -209,6 +209,17 @@ func (s *statefulConn) Close() error { return s.Conn.Close() } +type mockStater struct { +} + +func (m *mockStater) State() ConnState { + return StateOK +} + +func (m *mockStater) Close() error { + return nil +} + type connpool struct { mu sync.Mutex conns []*statefulConn @@ -241,11 +252,32 @@ func (p *connpool) put(conn *statefulConn) { p.conns = append(p.conns, conn) } -// BenchmarkHandoffP is used to verify the impact on performance caused by P being occupied by the poller -// when using syscall.EpollWait() in a high-load scenario. -// To compare with syscall.EpollWait(), you could run `go test -bench=BenchmarkHandoffP -benchtime=10s .` +func (p *connpool) Close() error { + p.mu.Lock() + defer p.mu.Unlock() + for _, conn := range p.conns { + conn.Close() + } + p.conns = p.conns[:0] + return nil +} + +var withListenConnState bool + +// BenchmarkWithConnState is used to verify the impact of adding ConnState logic on performance. +// To compare with syscall.EpollWait(), you could run `go test -bench=BenchmarkWith -benchtime=10s .` // to test the first time, and replace isyscall.EpollWait() with syscall.EpollWait() to test the second time. -func BenchmarkHandoffP(b *testing.B) { +func BenchmarkWithConnState(b *testing.B) { + withListenConnState = true + benchmarkConnState(b) +} + +func BenchmarkWithoutConnState(b *testing.B) { + withListenConnState = false + benchmarkConnState(b) +} + +func benchmarkConnState(b *testing.B) { // set GOMAXPROCS to 1 to make P resources scarce runtime.GOMAXPROCS(1) ln, err := net.Listen("tcp", "localhost:0") @@ -283,8 +315,13 @@ func BenchmarkHandoffP(b *testing.B) { dialFunc := func() *statefulConn { conn, err := net.Dial("tcp", ln.Addr().String()) assert.Nil(b, err) - stater, err := ListenConnState(conn) - assert.Nil(b, err) + var stater ConnStater + if withListenConnState { + stater, err = ListenConnState(conn) + assert.Nil(b, err) + } else { + stater = &mockStater{} + } return &statefulConn{ Conn: conn, stater: stater, @@ -308,4 +345,5 @@ func BenchmarkHandoffP(b *testing.B) { cp.put(conn) } }) + _ = cp.Close() } From 62b66ea9d3df3d511e0b9b295d1244c9c9c1390b Mon Sep 17 00:00:00 2001 From: jayantxie Date: Thu, 31 Jul 2025 23:09:17 +0800 Subject: [PATCH 19/21] feat: add tests --- connstate/poll_cache_test.go | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 connstate/poll_cache_test.go diff --git a/connstate/poll_cache_test.go b/connstate/poll_cache_test.go new file mode 100644 index 0000000..86b5707 --- /dev/null +++ b/connstate/poll_cache_test.go @@ -0,0 +1,34 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connstate + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestPollCache(t *testing.T) { + pollcache.free() + fd := pollcache.alloc() + pollcache.freeable(fd) + assert.Equal(t, 1, len(pollcache.freelist)) + fd1 := pollcache.alloc() + assert.NotEqual(t, fd, fd1) + pollcache.free() + assert.Equal(t, 0, len(pollcache.freelist)) + fd2 := pollcache.alloc() + assert.Equal(t, fd, fd2) +} From f8c1059810daa3e98772cfa08a8585fa3eb38f36 Mon Sep 17 00:00:00 2001 From: jayantxie Date: Thu, 31 Jul 2025 23:34:48 +0800 Subject: [PATCH 20/21] feat: windows --- connstate/poll_windows.go | 35 +++++++++++++++++++++++++++++++++++ 1 file changed, 35 insertions(+) create mode 100644 connstate/poll_windows.go diff --git a/connstate/poll_windows.go b/connstate/poll_windows.go new file mode 100644 index 0000000..ad9404d --- /dev/null +++ b/connstate/poll_windows.go @@ -0,0 +1,35 @@ +// Copyright 2025 CloudWeGo Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package connstate + +import "errors" + +var ( + errNotSupportedForWindows = errors.New("connstate not supported for windows") +) + +type mockWindowsPoller struct{} + +func (m *mockWindowsPoller) wait() error { + return nil +} + +func (m *mockWindowsPoller) control(fd *fdOperator, op op) error { + return errNotSupportedForWindows +} + +func openpoll() (p poller, err error) { + return &mockWindowsPoller{}, nil +} From 11edf6bbb4697ef34cf57ddfe0269b1e102e04a1 Mon Sep 17 00:00:00 2001 From: jayantxie Date: Fri, 1 Aug 2025 14:42:04 +0800 Subject: [PATCH 21/21] fix: delete fd --- connstate/conn.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/connstate/conn.go b/connstate/conn.go index 3a7b6cc..b7f52ff 100644 --- a/connstate/conn.go +++ b/connstate/conn.go @@ -63,7 +63,10 @@ func ListenConnState(conn net.Conn) (ConnStater, error) { }) if fd != nil { if err != nil && opAddErr == nil { - _ = poll.control(fd, opDel) + // if rawConn is closed, poller will delete the fd by itself + _ = rawConn.Control(func(_ uintptr) { + _ = poll.control(fd, opDel) + }) } if err != nil || opAddErr != nil { atomic.StorePointer(&fd.conn, nil)