Skip to content
This repository was archived by the owner on Dec 14, 2020. It is now read-only.

Commit ee6eb7e

Browse files
committedApr 11, 2018
Add support for graceful close timeouts via contect.Context
* `Close()` is now `Close(context.Context)` for sessions and links. * `Client.Close` does not take a context as it does not currently block. * Added `-count=1` to integration tests to prevent caching on Go1.10+.
1 parent fba4c61 commit ee6eb7e

File tree

6 files changed

+103
-62
lines changed

6 files changed

+103
-62
lines changed
 

‎Makefile

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ test:
2020
go test -tags gofuzz -v -race ./...
2121

2222
integration:
23-
go test -tags "integration pkgerrors" -v -race ./...
23+
go test -tags "integration pkgerrors" -count=1 -v -race ./...
2424

2525
coverage:
2626
TEST_CORPUS=1 go test -tags "integration gofuzz" -cover -coverprofile=cover.out -v

‎README.md

+7-4
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ import (
3636
"context"
3737
"fmt"
3838
"log"
39+
"time"
3940

4041
"pack.ag/amqp"
4142
)
@@ -76,8 +77,8 @@ func main() {
7677
log.Fatal("Sending message:", err)
7778
}
7879

80+
sender.Close(ctx)
7981
cancel()
80-
sender.Close()
8182
}
8283

8384
// Continuously read messages
@@ -90,9 +91,11 @@ func main() {
9091
if err != nil {
9192
log.Fatal("Creating receiver link:", err)
9293
}
93-
94-
ctx, cancel := context.WithCancel(ctx)
95-
defer cancel()
94+
defer func() {
95+
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
96+
receiver.Close(ctx)
97+
cancel()
98+
}()
9699

97100
for {
98101
// Receive next message

‎client.go

+30-15
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (c *Client) NewSession() (*Session, error) {
139139

140140
begin, ok := fr.body.(*performBegin)
141141
if !ok {
142-
_ = s.Close() // deallocate session on error
142+
_ = s.Close(context.Background()) // deallocate session on error
143143
return nil, errorErrorf("unexpected begin response: %+v", fr.body)
144144
}
145145

@@ -193,10 +193,17 @@ func newSession(c *conn, channel uint16) *Session {
193193
}
194194
}
195195

196-
// Close closes the session.
197-
func (s *Session) Close() error {
196+
// Close gracefully closes the session.
197+
//
198+
// If ctx expires while waiting for servers response, ctx.Err() will be returned.
199+
// The session will continue to wait for the response until the Client is closed.
200+
func (s *Session) Close(ctx context.Context) error {
198201
s.closeOnce.Do(func() { close(s.close) })
199-
<-s.done
202+
select {
203+
case <-s.done:
204+
case <-ctx.Done():
205+
return ctx.Err()
206+
}
200207
if s.err == ErrSessionClosed {
201208
return nil
202209
}
@@ -351,10 +358,8 @@ func (s *Sender) Address() string {
351358
}
352359

353360
// Close closes the Sender and AMQP link.
354-
func (s *Sender) Close() error {
355-
// TODO: Should this timeout? Close() take a context? Use one of the
356-
// other timeouts?
357-
return s.link.Close()
361+
func (s *Sender) Close(ctx context.Context) error {
362+
return s.link.Close(ctx)
358363
}
359364

360365
// NewSender opens a new sender link on the session.
@@ -1045,9 +1050,17 @@ func (l *link) mux() {
10451050
// close closes and requests deletion of the link.
10461051
//
10471052
// No operations on link are valid after close.
1048-
func (l *link) Close() error {
1053+
//
1054+
// If ctx expires while waiting for servers response, ctx.Err() will be returned.
1055+
// The session will continue to wait for the response until the Session or Client
1056+
// is closed.
1057+
func (l *link) Close(ctx context.Context) error {
10491058
l.closeOnce.Do(func() { close(l.close) })
1050-
<-l.done
1059+
select {
1060+
case <-l.done:
1061+
case <-ctx.Done():
1062+
return ctx.Err()
1063+
}
10511064
if l.err == ErrLinkClosed {
10521065
return nil
10531066
}
@@ -1350,7 +1363,7 @@ func (r *Receiver) Receive(ctx context.Context) (*Message, error) {
13501363
messageSize += len(fr.Payload)
13511364
if messageSize > maxMessageSize {
13521365
// TODO: send error
1353-
_ = r.Close()
1366+
_ = r.Close(ctx)
13541367
return nil, errorErrorf("received message larger than max size of ")
13551368
}
13561369

@@ -1389,10 +1402,12 @@ func (r *Receiver) Address() string {
13891402
}
13901403

13911404
// Close closes the Receiver and AMQP link.
1392-
func (r *Receiver) Close() error {
1393-
// TODO: Should this timeout? Close() take a context? Use one of the
1394-
// other timeouts?
1395-
return r.link.Close()
1405+
//
1406+
// If ctx expires while waiting for servers response, ctx.Err() will be returned.
1407+
// The session will continue to wait for the response until the Session or Client
1408+
// is closed.
1409+
func (r *Receiver) Close(ctx context.Context) error {
1410+
return r.link.Close(ctx)
13961411
}
13971412

13981413
type messageDisposition struct {

‎example_test.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ func Example() {
4545
log.Fatal("Sending message:", err)
4646
}
4747

48+
sender.Close(ctx)
4849
cancel()
49-
sender.Close()
5050
}
5151

5252
// Continuously read messages
@@ -59,9 +59,11 @@ func Example() {
5959
if err != nil {
6060
log.Fatal("Creating receiver link:", err)
6161
}
62-
63-
ctx, cancel := context.WithCancel(ctx)
64-
defer cancel()
62+
defer func() {
63+
ctx, cancel := context.WithTimeout(ctx, 1*time.Second)
64+
receiver.Close(ctx)
65+
cancel()
66+
}()
6567

6668
for {
6769
// Receive next message

‎fuzz.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,12 @@ func FuzzConn(data []byte) int {
3737

3838
msg.Accept()
3939

40-
// r.Close() // disabled until link close timeout implemented
40+
ctx, close := context.WithTimeout(context.Background(), 10*time.Millisecond)
41+
defer close()
4142

42-
s.Close()
43+
r.Close(ctx)
44+
45+
s.Close(ctx)
4346

4447
// Send
4548
client, err = New(testconn.New(data),
@@ -66,9 +69,12 @@ func FuzzConn(data []byte) int {
6669
return 0
6770
}
6871

69-
// r.Close() // disabled until link close timeout implemented
72+
ctx, close = context.WithTimeout(context.Background(), 10*time.Millisecond)
73+
defer close()
74+
75+
r.Close(ctx)
7076

71-
s.Close()
77+
s.Close(ctx)
7278

7379
return 1
7480
}

0 commit comments

Comments
 (0)
This repository has been archived.