Skip to content
Open
136 changes: 129 additions & 7 deletions ssdb/ssdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,41 @@ import (
)

type Client struct {
sock *net.TCPConn
sock chan *net.TCPConn
recv_buf bytes.Buffer
_sock *net.TCPConn
}

type ConnectionPoolWrapper struct {
size int
conn chan *Client
}

func InitPool(ip string, port int, size int) (*ConnectionPoolWrapper, error) {

cpm := new(ConnectionPoolWrapper)

cpm.conn = make(chan *Client, size)
for x := 0; x < size; x++ {
conn, err := Connect(ip, port)
if err != nil {
return cpm, err
}

// If the init function succeeded, add the connection to the channel
cpm.conn <- conn
}
cpm.size = size
return cpm, nil

}

func (p *ConnectionPoolWrapper) GetConnection() *Client {
return <-p.conn
}

func (p *ConnectionPoolWrapper) ReleaseConnection(conn *Client) {
p.conn <- conn
}

func Connect(ip string, port int) (*Client, error) {
Expand All @@ -22,11 +55,24 @@ func Connect(ip string, port int) (*Client, error) {
return nil, err
}
var c Client
c.sock = sock

c.sock = make(chan *net.TCPConn, 1)
c.sock <- sock

return &c, nil
}

func (c *Client) Do(args ...interface{}) ([]string, error) {

c._sock = <- c.sock
defer func () {
c.sock <- c._sock
}()

return c.do(args...)
}

func (c *Client) do(args ...interface{}) ([]string, error) {
err := c.send(args)
if err != nil {
return nil, err
Expand All @@ -40,19 +86,29 @@ func (c *Client) Set(key string, val string) (interface{}, error) {
if err != nil {
return nil, err
}
if len(resp) == 2 && resp[0] == "ok" {
if len(resp) > 0 && resp[0] == "ok" {
return true, nil
}
return nil, fmt.Errorf("bad response")
}


func (c *ConnectionPoolWrapper) Set(key string, val string) (interface{}, error) {

db := c.GetConnection()
defer c.ReleaseConnection(db)

return db.Set(key, val)
}

// TODO: Will somebody write addition semantic methods?
func (c *Client) Get(key string) (interface{}, error) {
resp, err := c.Do("get", key)
if err != nil {
return nil, err
}
if len(resp) == 2 && resp[0] == "ok" {
if len(resp) > 0 && resp[0] == "ok" {
// return resp[1], nil
return resp[1], nil
}
if resp[0] == "not_found" {
Expand All @@ -61,6 +117,28 @@ func (c *Client) Get(key string) (interface{}, error) {
return nil, fmt.Errorf("bad response")
}

func (c *Client) Info() (interface{}, error) {
resp, err := c.Do("info")
if err != nil {
return nil, err
}
if len(resp) > 0 && resp[0] == "ok" {
return resp, nil
}
if resp[0] == "not_found" {
return nil, nil
}
return nil, fmt.Errorf("bad response")
}

func (c *ConnectionPoolWrapper) Get(key string) (interface{}, error) {

db := c.GetConnection()
defer c.ReleaseConnection(db)

return db.Get(key)
}

func (c *Client) Del(key string) (interface{}, error) {
resp, err := c.Do("del", key)
if err != nil {
Expand All @@ -74,11 +152,21 @@ func (c *Client) Del(key string) (interface{}, error) {
return nil, fmt.Errorf("bad response:resp:%v:", resp)
}

func (c *ConnectionPoolWrapper) Del(key string) (interface{}, error) {

db := c.GetConnection()
defer c.ReleaseConnection(db)

return db.Del(key)
}

func (c *Client) Send(args ...interface{}) error {
return c.send(args);
}

func (c *Client) send(args []interface{}) error {

var sock = c._sock
var buf bytes.Buffer
for _, arg := range args {
var s string
Expand Down Expand Up @@ -118,7 +206,9 @@ func (c *Client) send(args []interface{}) error {
buf.WriteByte('\n')
}
buf.WriteByte('\n')
_, err := c.sock.Write(buf.Bytes())

_, err := sock.Write(buf.Bytes())

return err
}

Expand All @@ -127,14 +217,19 @@ func (c *Client) Recv() ([]string, error) {
}

func (c *Client) recv() ([]string, error) {

var sock = c._sock

var tmp [1]byte
for {
resp := c.parse()
if resp == nil || len(resp) > 0 {
return resp, nil
}
n, err := c.sock.Read(tmp[0:])
n, err := sock.Read(tmp[0:])

if err != nil {

return nil, err
}
c.recv_buf.Write(tmp[0:n])
Expand Down Expand Up @@ -184,5 +279,32 @@ func (c *Client) parse() []string {

// Close The Client Connection
func (c *Client) Close() error {
return c.sock.Close()

sock := <- c.sock

defer func () {

c.sock <- sock

}()

return sock.Close()

}


func (cpm *ConnectionPoolWrapper) Close() error {

for {

select {
case db := <- cpm.conn:
db.Close()
default:
return nil
}

}

return nil
}
Loading