Skip to content
This repository was archived by the owner on Jul 21, 2021. It is now read-only.

Commit 773d6b7

Browse files
committed
fix: operations hang if attempted after connection closed
Operations hang forever if attempted after client connection is closed Fixes: #125
1 parent c4fab1a commit 773d6b7

File tree

2 files changed

+34
-5
lines changed

2 files changed

+34
-5
lines changed

zk/conn.go

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -310,10 +310,14 @@ func WithMaxConnBufferSize(maxBufferSize int) connOption {
310310
}
311311

312312
func (c *Conn) Close() {
313+
rc, err := c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil)
314+
if err != nil {
315+
return
316+
}
313317
close(c.shouldQuit)
314318

315319
select {
316-
case <-c.queueRequest(opClose, &closeRequest{}, &closeResponse{}, nil):
320+
case <-rc:
317321
case <-time.After(time.Second):
318322
}
319323
}
@@ -933,7 +937,7 @@ func (c *Conn) addWatcher(path string, watchType watchType) <-chan Event {
933937
return ch
934938
}
935939

936-
func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) <-chan response {
940+
func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) (<-chan response, error) {
937941
rq := &request{
938942
xid: c.nextXid(),
939943
opcode: opcode,
@@ -942,12 +946,20 @@ func (c *Conn) queueRequest(opcode int32, req interface{}, res interface{}, recv
942946
recvChan: make(chan response, 1),
943947
recvFunc: recvFunc,
944948
}
945-
c.sendChan <- rq
946-
return rq.recvChan
949+
select {
950+
case c.sendChan <- rq:
951+
return rq.recvChan, nil
952+
case <-c.shouldQuit:
953+
return nil, ErrClosing
954+
}
947955
}
948956

949957
func (c *Conn) request(opcode int32, req interface{}, res interface{}, recvFunc func(*request, *responseHeader, error)) (int64, error) {
950-
r := <-c.queueRequest(opcode, req, res, recvFunc)
958+
rc, err := c.queueRequest(opcode, req, res, recvFunc)
959+
if err != nil {
960+
return 0, err
961+
}
962+
r := <-rc
951963
return r.zxid, r.err
952964
}
953965

zk/zk_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -666,6 +666,23 @@ func TestRequestFail(t *testing.T) {
666666
}
667667
}
668668

669+
func TestRequestFailAfterClosed(t *testing.T) {
670+
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
671+
if err != nil {
672+
t.Fatal(err)
673+
}
674+
defer ts.Stop()
675+
zk, _, err := ts.ConnectAll()
676+
if err != nil {
677+
t.Fatalf("Connect returned error: %+v", err)
678+
}
679+
zk.Close()
680+
_, _, err = zk.Get("/blah")
681+
if err != ErrClosing {
682+
t.Fatalf("unexpected err: %+v", err)
683+
}
684+
}
685+
669686
func TestSlowServer(t *testing.T) {
670687
ts, err := StartTestCluster(1, nil, logWriter{t: t, p: "[ZKERR] "})
671688
if err != nil {

0 commit comments

Comments
 (0)