Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions connstate/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package connstate

import (
"errors"
"net"
"sync/atomic"
"syscall"
"unsafe"
)

type ConnState uint32

const (
StateOK ConnState = iota
StateRemoteClosed
StateClosed
)

type ConnWithState interface {
net.Conn
State() ConnState
}

func ListenConnState(conn net.Conn) (ConnWithState, error) {
pollInitOnce.Do(createPoller)
sysConn, ok := conn.(syscall.Conn)
if !ok {
return nil, errors.New("conn is not syscall.Conn")
}
rawConn, err := sysConn.SyscallConn()
if err != nil {
return nil, err
}
var fd *fdOperator
var opAddErr error
err = rawConn.Control(func(fileDescriptor uintptr) {
fd = pollcache.alloc()
fd.fd = int(fileDescriptor)
fd.conn = unsafe.Pointer(&connWithState{Conn: conn, fd: unsafe.Pointer(fd)})
opAddErr = poll.control(fd, opAdd)
})
if fd != nil {
if err != nil && opAddErr == nil {
_ = poll.control(fd, opDel)
}
if err != nil || opAddErr != nil {
pollcache.freeable(fd)
}
}
if err != nil {
return nil, err
}
if opAddErr != nil {
return nil, opAddErr
}
return (*connWithState)(fd.conn), nil
}

type connWithState struct {
net.Conn
fd unsafe.Pointer // *fdOperator
state uint32
}

func (c *connWithState) Close() error {
fd := (*fdOperator)(atomic.LoadPointer(&c.fd))
if fd != nil && atomic.CompareAndSwapPointer(&c.fd, unsafe.Pointer(fd), nil) {
atomic.StoreUint32(&c.state, uint32(StateClosed))
_ = poll.control(fd, opDel)
pollcache.freeable(fd)
}
return c.Conn.Close()
}

func (c *connWithState) State() ConnState {
return ConnState(atomic.LoadUint32(&c.state))
}
32 changes: 32 additions & 0 deletions connstate/poll.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package connstate

import (
"fmt"
"sync"
)

type op int

const (
opAdd op = iota
opDel
)

var (
pollInitOnce sync.Once
poll poller
)

type poller interface {
wait() error
control(fd *fdOperator, op op) error
}

func createPoller() {
var err error
poll, err = openpoll()
if err != nil {
panic(fmt.Sprintf("gopkg.connstate openpoll failed, err: %v", err))
}
go poll.wait()

Check failure on line 31 in connstate/poll.go

View workflow job for this annotation

GitHub Actions / lint

Error return value of `poll.wait` is not checked (errcheck)
}
78 changes: 78 additions & 0 deletions connstate/poll_bsd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
//go:build darwin || netbsd || freebsd || openbsd || dragonfly
// +build darwin netbsd freebsd openbsd dragonfly

package connstate

import (
"sync/atomic"
"syscall"
"unsafe"
)

type kqueue struct {
fd int
}

func (p *kqueue) wait() error {
events := make([]syscall.Kevent_t, 1024)
for {
// TODO: handoff p by entersyscallblock, or make poller run as a thread.
n, err := syscall.Kevent(p.fd, nil, events, nil)
if err != nil && err != syscall.EINTR {
// exit gracefully
if err == syscall.EBADF {
return nil
}
return err
}
for i := 0; i < n; i++ {
ev := &events[i]
op := *(**fdOperator)(unsafe.Pointer(&ev.Udata))
if conn := (*connWithState)(atomic.LoadPointer(&op.conn)); conn != nil {
if ev.Flags&(syscall.EV_EOF) != 0 {
atomic.CompareAndSwapUint32(&conn.state, uint32(StateOK), uint32(StateRemoteClosed))
}
}
}
// we can make sure that there is no op remaining if finished handling all events
pollcache.free()
}
}

func (p *kqueue) control(fd *fdOperator, op op) error {
evs := make([]syscall.Kevent_t, 1)
evs[0].Ident = uint64(fd.fd)
*(**fdOperator)(unsafe.Pointer(&evs[0].Udata)) = fd
if op == opAdd {
evs[0].Filter = syscall.EVFILT_READ
evs[0].Flags = syscall.EV_ADD | syscall.EV_ENABLE | syscall.EV_CLEAR
// prevent ordinary data from triggering
evs[0].Flags |= syscall.EV_OOBAND
evs[0].Fflags = syscall.NOTE_LOWAT
evs[0].Data = 0x7FFFFFFF
_, err := syscall.Kevent(p.fd, evs, nil, nil)
return err
} else {
evs[0].Filter = syscall.EVFILT_READ
evs[0].Flags = syscall.EV_DELETE
_, err := syscall.Kevent(p.fd, evs, nil, nil)
return err
}
}

func openpoll() (p poller, err error) {
fd, err := syscall.Kqueue()
if err != nil {
return nil, err
}
_, err = syscall.Kevent(fd, []syscall.Kevent_t{{
Ident: 0,
Filter: syscall.EVFILT_USER,
Flags: syscall.EV_ADD | syscall.EV_CLEAR,
}}, nil, nil)
if err != nil {
syscall.Close(fd)
return nil, err
}
return &kqueue{fd: fd}, nil
}
74 changes: 74 additions & 0 deletions connstate/poll_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package connstate

import (
"sync"
"sync/atomic"
"unsafe"
)

const pollBlockSize = 4 * 1024

type fdOperator struct {
link *fdOperator // in pollcache, protected by pollcache.lock
index int32

fd int
conn unsafe.Pointer // *connWithState
}

var pollcache pollCache

type pollCache struct {
lock sync.Mutex
first *fdOperator
cache []*fdOperator
// freelist store the freeable operator
// to reduce GC pressure, we only store op index here
freelist []int32
freeack int32
}

func (c *pollCache) alloc() *fdOperator {
c.lock.Lock()
if c.first == nil {
const pdSize = unsafe.Sizeof(fdOperator{})
n := pollBlockSize / pdSize
if n == 0 {
n = 1
}
index := int32(len(c.cache))
for i := uintptr(0); i < n; i++ {
pd := &fdOperator{index: index}
c.cache = append(c.cache, pd)
pd.link = c.first
c.first = pd
index++
}
}
op := c.first
c.first = op.link
c.lock.Unlock()
return op
}

// freeable mark the operator that could be freed
// only poller could do the real free action
func (c *pollCache) freeable(op *fdOperator) {
atomic.StorePointer(&op.conn, nil)
c.lock.Lock()
// reset all state
if atomic.CompareAndSwapInt32(&c.freeack, 1, 0) {
for _, idx := range c.freelist {
op := c.cache[idx]
op.link = c.first
c.first = op
}
c.freelist = c.freelist[:0]
}
c.freelist = append(c.freelist, op.index)
c.lock.Unlock()
}

func (c *pollCache) free() {
atomic.StoreInt32(&c.freeack, 1)
}
56 changes: 56 additions & 0 deletions connstate/poll_linux.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package connstate

import (
"sync/atomic"
"syscall"
"unsafe"
)

const _EPOLLET uint32 = 0x80000000

type epoller struct {
epfd int
}

func (p *epoller) wait() error {
events := make([]syscall.EpollEvent, 128)
for {
// TODO: handoff p by entersyscallblock, or make poller run as a thread.
n, err := syscall.EpollWait(p.epfd, events, -1)
if err != nil && err != syscall.EINTR {
return err
}
for i := 0; i < n; i++ {
ev := &events[i]
op := *(**fdOperator)(unsafe.Pointer(&ev.Fd))
if conn := (*connWithState)(atomic.LoadPointer(&op.conn)); conn != nil {
if ev.Events&(syscall.EPOLLHUP|syscall.EPOLLRDHUP|syscall.EPOLLERR) != 0 {
atomic.CompareAndSwapUint32(&conn.state, uint32(StateOK), uint32(StateRemoteClosed))
}
}
}
// we can make sure that there is no op remaining if finished handling all events
pollcache.free()
}
}

func (p *epoller) control(fd *fdOperator, op op) error {
if op == opAdd {
var ev syscall.EpollEvent
*(**fdOperator)(unsafe.Pointer(&ev.Fd)) = fd
ev.Events = syscall.EPOLLHUP | syscall.EPOLLRDHUP | syscall.EPOLLERR | _EPOLLET
return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_ADD, fd.fd, &ev)
} else {
var ev syscall.EpollEvent
return syscall.EpollCtl(p.epfd, syscall.EPOLL_CTL_DEL, fd.fd, &ev)
}
}

func openpoll() (p poller, err error) {
var epfd int
epfd, err = syscall.EpollCreate(1)
if err != nil {
return nil, err
}
return &epoller{epfd: epfd}, nil
}
Loading