Skip to content

CLIENT LIST cmd #1360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions internal/eval/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package eval

import (
"sync"

"github.com/dicedb/dice/internal/global"
)

var (
clients = make([]global.IOThread, 0)
mu = sync.Mutex{}
)

func GetClients() []global.IOThread {
return clients
}

func AddClient(client global.IOThread) {
mu.Lock()
defer mu.Unlock()
clients = append(clients, client)
}

// if id is monotonic binary search can be used
func RemoveClientByID(id string) {
mu.Lock()
defer mu.Unlock()
for i, client := range clients {
if client.ID() == id {
clients = append(clients[:i], clients[i+1:]...)
return
}
}
}
18 changes: 16 additions & 2 deletions internal/eval/store_eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -6261,8 +6261,22 @@ func evalKEYS(args []string, store *dstore.Store) *EvalResponse {
}

// TODO: Placeholder to support monitoring
func evalCLIENT(args []string, store *dstore.Store) *EvalResponse {
return makeEvalResult(clientio.OK)
func evalCLIENT(args []string, _ *dstore.Store) *EvalResponse {
if len(args) == 0 {
return makeEvalError(diceerrors.ErrWrongArgumentCount("CLIENT"))
}

subcommand := strings.ToUpper(args[0])
switch subcommand {
case List:
o := make([]string, 0, len(GetClients()))
for _, client := range GetClients() {
o = append(o, client.String())
}
return makeEvalResult(clientio.Encode(strings.Join(o, "\n"), false))
default:
return makeEvalError(diceerrors.ErrWrongArgumentCount("CLIENT"))
}
}

// TODO: Placeholder to support monitoring
Expand Down
10 changes: 10 additions & 0 deletions internal/global/iothread.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package global

import "context"

type IOThread interface {
ID() string
Start(context.Context) error
Stop() error
String() string
}
109 changes: 102 additions & 7 deletions internal/iothread/iothread.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
"log/slog"
"net"
"strconv"
"strings"
"sync/atomic"
"syscall"
"time"
Expand All @@ -15,9 +16,12 @@
"github.com/dicedb/dice/internal/auth"
"github.com/dicedb/dice/internal/clientio"
"github.com/dicedb/dice/internal/clientio/iohandler"
"github.com/dicedb/dice/internal/clientio/iohandler/netconn"
"github.com/dicedb/dice/internal/clientio/requestparser"
"github.com/dicedb/dice/internal/cmd"
diceerrors "github.com/dicedb/dice/internal/errors"
"github.com/dicedb/dice/internal/eval"
"github.com/dicedb/dice/internal/global"
"github.com/dicedb/dice/internal/ops"
"github.com/dicedb/dice/internal/querymanager"
"github.com/dicedb/dice/internal/shard"
Expand All @@ -31,15 +35,11 @@
var requestCounter uint32

// IOThread interface
type IOThread interface {
ID() string
Start(context.Context) error
Stop() error
}

type BaseIOThread struct {
IOThread
global.IOThread
id string
Name string
ioHandler iohandler.IOHandler
parser requestparser.Parser
shardManager *shard.ShardManager
Expand All @@ -50,13 +50,14 @@
preprocessingChan chan *ops.StoreResponse
cmdWatchSubscriptionChan chan watchmanager.WatchSubscription
wl wal.AbstractWAL
lastCmd *cmd.DiceDBCmd
}

func NewIOThread(wid string, responseChan, preprocessingChan chan *ops.StoreResponse,
cmdWatchSubscriptionChan chan watchmanager.WatchSubscription,
ioHandler iohandler.IOHandler, parser requestparser.Parser,
shardManager *shard.ShardManager, gec chan error, wl wal.AbstractWAL) *BaseIOThread {
return &BaseIOThread{
thread := &BaseIOThread{
id: wid,
ioHandler: ioHandler,
parser: parser,
Expand All @@ -69,12 +70,105 @@
cmdWatchSubscriptionChan: cmdWatchSubscriptionChan,
wl: wl,
}
eval.AddClient(thread)
return thread
}

func (t *BaseIOThread) ID() string {
return t.id
}

func addr(fd int) (addr, laddr string, err error) {
// addr
sa, err := syscall.Getpeername(fd)
if err != nil {
return "", "", err
}
switch v := sa.(type) {
case *syscall.SockaddrInet4:
addr = net.IP(v.Addr[:]).String() + ":" + strconv.Itoa(v.Port)
case *syscall.SockaddrInet6:
addr = net.IP(v.Addr[:]).String() + ":" + strconv.Itoa(v.Port)
}

// laddr
sa, err = syscall.Getsockname(fd)
if err != nil {
return "", "", err
}
switch v := sa.(type) {
case *syscall.SockaddrInet4:
laddr = net.IP(v.Addr[:]).String() + ":" + strconv.Itoa(v.Port)
case *syscall.SockaddrInet6:
laddr = net.IP(v.Addr[:]).String() + ":" + strconv.Itoa(v.Port)
}

return addr, laddr, nil
}

func (t *BaseIOThread) String() string {
var s strings.Builder

// id
s.WriteString("id=")
s.WriteString(t.id)
s.WriteString(" ")

// addr and laddr
switch hndlr := t.ioHandler.(type) {

Check failure on line 118 in internal/iothread/iothread.go

View workflow job for this annotation

GitHub Actions / lint

singleCaseSwitch: should rewrite switch statement to if statement (gocritic)
case *netconn.IOHandler:
addr, laddr, _ := addr(hndlr.FileDescriptor())
s.WriteString("addr=")
s.WriteString(addr)
s.WriteString(" ")

s.WriteString("laddr=")
s.WriteString(laddr)
s.WriteString(" ")

s.WriteString("fd=")
s.WriteString(strconv.FormatInt(int64(hndlr.FileDescriptor()), 10))
s.WriteString(" ")
}

// name
s.WriteString("name=")
s.WriteString(t.Name)
s.WriteString(" ")

// age
s.WriteString("age=")
s.WriteString(strconv.FormatFloat(time.Since(t.Session.CreatedAt).Seconds(), 'f', 0, 64))
s.WriteString(" ")

// idle
s.WriteString("idle=")
s.WriteString(strconv.FormatFloat(time.Since(t.Session.LastAccessedAt).Seconds(), 'f', 0, 64))
s.WriteString(" ")

// // flags

Check failure on line 149 in internal/iothread/iothread.go

View workflow job for this annotation

GitHub Actions / lint

commentedOutCode: may want to remove commented-out code (gocritic)
// s.WriteString("flags=")
// s.WriteString("")
// s.WriteString(" ")

// argv-mem

Check failure on line 154 in internal/iothread/iothread.go

View workflow job for this annotation

GitHub Actions / lint

commentedOutCode: may want to remove commented-out code (gocritic)
// s.WriteString("argv-mem=")
// s.WriteString(strconv.FormatInt(int64(c.ArgLenSum), 10))
// s.WriteString(" ")

// cmd
s.WriteString("cmd=")
// todo: handle `CLIENT ID` as "client|id" and `SET k 1` as "set"
if t.lastCmd == nil {
s.WriteString("NULL")
} else {
s.WriteString(strings.ToLower(t.lastCmd.Cmd))
}
s.WriteString(" ")

return s.String()
}

func (t *BaseIOThread) Start(ctx context.Context) error {
errChan := make(chan error, 1)
incomingDataChan := make(chan []byte)
Expand Down Expand Up @@ -603,6 +697,7 @@
func (t *BaseIOThread) Stop() error {
slog.Info("Stopping io-thread", slog.String("id", t.id))
t.Session.Expire()
eval.RemoveClientByID(t.id)
return nil
}

Expand Down
9 changes: 5 additions & 4 deletions internal/iothread/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"sync/atomic"

"github.com/dicedb/dice/internal/global"
"github.com/dicedb/dice/internal/shard"
)

Expand All @@ -28,7 +29,7 @@ func NewManager(maxClients int32, sm *shard.ShardManager) *Manager {
}
}

func (m *Manager) RegisterIOThread(ioThread IOThread) error {
func (m *Manager) RegisterIOThread(ioThread global.IOThread) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -54,17 +55,17 @@ func (m *Manager) IOThreadCount() int32 {
return m.numIOThreads.Load()
}

func (m *Manager) GetIOThread(id string) (IOThread, bool) {
func (m *Manager) GetIOThread(id string) (global.IOThread, bool) {
client, ok := m.connectedClients.Load(id)
if !ok {
return nil, false
}
return client.(IOThread), true
return client.(global.IOThread), true
}

func (m *Manager) UnregisterIOThread(id string) error {
if client, loaded := m.connectedClients.LoadAndDelete(id); loaded {
w := client.(IOThread)
w := client.(global.IOThread)
if err := w.Stop(); err != nil {
return err
}
Expand Down
Loading