Skip to content

Commit 1b78c37

Browse files
committed
gee-rpc: implement day5-timeout
1 parent dc840eb commit 1b78c37

25 files changed

+1117
-106
lines changed

gee-rpc/day1-codec/main/main.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,7 @@ func main() {
2929
defer func() { _ = conn.Close() }()
3030

3131
// send options
32-
_ = json.NewEncoder(conn).Encode(&geerpc.Options{
33-
MagicNumber: geerpc.MagicNumber,
34-
CodecType: codec.GobType,
35-
})
36-
32+
_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)
3733
cc := codec.NewGobCodec(conn)
3834
// send request & receive response
3935
for i := 0; i < 5; i++ {

gee-rpc/day1-codec/server.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ import (
1717

1818
const MagicNumber = 0x3bef5c
1919

20-
type Options struct {
20+
type Option struct {
2121
MagicNumber int // MagicNumber marks this's a geerpc request
2222
CodecType codec.Type // client may choose different Codec to encode body
2323
}
2424

25-
var defaultOptions = &Options{
25+
var DefaultOption = &Option{
2626
MagicNumber: MagicNumber,
2727
CodecType: codec.GobType,
2828
}
@@ -42,7 +42,7 @@ var DefaultServer = NewServer()
4242
// ServeConn blocks, serving the connection until the client hangs up.
4343
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
4444
defer func() { _ = conn.Close() }()
45-
var opt Options
45+
var opt Option
4646
if err := json.NewDecoder(conn).Decode(&opt); err != nil {
4747
log.Println("rpc server: options error: ", err)
4848
return

gee-rpc/day2-client/client.go

+23-16
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
// Call represents an active RPC.
1919
type Call struct {
20+
Seq uint64
2021
ServiceMethod string // format "<service>.<method>"
2122
Args interface{} // arguments to the function
2223
Reply interface{} // reply from the function
@@ -34,7 +35,7 @@ func (call *Call) done() {
3435
// multiple goroutines simultaneously.
3536
type Client struct {
3637
cc codec.Codec
37-
opt *Options
38+
opt *Option
3839
sending sync.Mutex // protect following
3940
header codec.Header
4041
mu sync.Mutex // protect following
@@ -96,6 +97,7 @@ func (client *Client) send(call *Call) {
9697

9798
// register this call.
9899
seq, err := client.registerCall(call)
100+
call.Seq = seq
99101
if err != nil {
100102
call.Error = err
101103
call.done()
@@ -173,44 +175,41 @@ func (client *Client) Call(serviceMethod string, args, reply interface{}) error
173175
return call.Error
174176
}
175177

176-
func parseOptions(opts ...*Options) (*Options, error) {
178+
func parseOptions(opts ...*Option) (*Option, error) {
177179
// if opts is nil or pass nil as parameter
178180
if len(opts) == 0 || opts[0] == nil {
179-
return defaultOptions, nil
181+
return DefaultOption, nil
180182
}
181183
if len(opts) != 1 {
182184
return nil, errors.New("number of options is more than 1")
183185
}
184186
opt := opts[0]
185-
opt.MagicNumber = defaultOptions.MagicNumber
187+
opt.MagicNumber = DefaultOption.MagicNumber
186188
if opt.CodecType == "" {
187-
opt.CodecType = defaultOptions.CodecType
189+
opt.CodecType = DefaultOption.CodecType
188190
}
189191
return opt, nil
190192
}
191193

192-
func NewClient(conn io.ReadWriteCloser, opts ...*Options) (*Client, error) {
193-
opt, err := parseOptions(opts...)
194-
if err != nil {
195-
return nil, err
196-
}
194+
func NewClient(conn net.Conn, opt *Option) (*Client, error) {
197195
f := codec.NewCodecFuncMap[opt.CodecType]
198196
if f == nil {
199-
err = fmt.Errorf("invalid codec type %s", opt.CodecType)
197+
err := fmt.Errorf("invalid codec type %s", opt.CodecType)
200198
log.Println("rpc client: codec error:", err)
201199
return nil, err
202200
}
203201
// send options with server
204-
if err = json.NewEncoder(conn).Encode(opt); err != nil {
202+
if err := json.NewEncoder(conn).Encode(opt); err != nil {
205203
log.Println("rpc client: options error: ", err)
206204
_ = conn.Close()
207205
return nil, err
208206
}
209207
return newClientCodec(f(conn), opt), nil
210208
}
211209

212-
func newClientCodec(cc codec.Codec, opt *Options) *Client {
210+
func newClientCodec(cc codec.Codec, opt *Option) *Client {
213211
client := &Client{
212+
seq: 1, // seq starts with 1, 0 means invalid call
214213
cc: cc,
215214
opt: opt,
216215
pending: make(map[uint64]*Call),
@@ -219,11 +218,19 @@ func newClientCodec(cc codec.Codec, opt *Options) *Client {
219218
return client
220219
}
221220

222-
// Dial connects to an RPC server at the specified network address
223-
func Dial(network, address string, opts ...*Options) (*Client, error) {
221+
func dial(network, address string, opt *Option) (*Client, error) {
224222
conn, err := net.Dial(network, address)
225223
if err != nil {
226224
return nil, err
227225
}
228-
return NewClient(conn, opts...)
226+
return NewClient(conn, opt)
227+
}
228+
229+
// Dial connects to an RPC server at the specified network address
230+
func Dial(network, address string, opts ...*Option) (*Client, error) {
231+
opt, err := parseOptions(opts...)
232+
if err != nil {
233+
return nil, err
234+
}
235+
return dial(network, address, opt)
229236
}

gee-rpc/day2-client/server.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@ import (
1717

1818
const MagicNumber = 0x3bef5c
1919

20-
type Options struct {
20+
type Option struct {
2121
MagicNumber int // MagicNumber marks this's a geerpc request
2222
CodecType codec.Type // client may choose different Codec to encode body
2323
}
2424

25-
var defaultOptions = &Options{
25+
var DefaultOption = &Option{
2626
MagicNumber: MagicNumber,
2727
CodecType: codec.GobType,
2828
}
@@ -42,7 +42,7 @@ var DefaultServer = NewServer()
4242
// ServeConn blocks, serving the connection until the client hangs up.
4343
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
4444
defer func() { _ = conn.Close() }()
45-
var opt Options
45+
var opt Option
4646
if err := json.NewDecoder(conn).Decode(&opt); err != nil {
4747
log.Println("rpc server: options error: ", err)
4848
return

gee-rpc/day3-service/client.go

+23-16
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717

1818
// Call represents an active RPC.
1919
type Call struct {
20+
Seq uint64
2021
ServiceMethod string // format "<service>.<method>"
2122
Args interface{} // arguments to the function
2223
Reply interface{} // reply from the function
@@ -34,7 +35,7 @@ func (call *Call) done() {
3435
// multiple goroutines simultaneously.
3536
type Client struct {
3637
cc codec.Codec
37-
opt *Options
38+
opt *Option
3839
sending sync.Mutex // protect following
3940
header codec.Header
4041
mu sync.Mutex // protect following
@@ -96,6 +97,7 @@ func (client *Client) send(call *Call) {
9697

9798
// register this call.
9899
seq, err := client.registerCall(call)
100+
call.Seq = seq
99101
if err != nil {
100102
call.Error = err
101103
call.done()
@@ -173,44 +175,41 @@ func (client *Client) Call(serviceMethod string, args, reply interface{}) error
173175
return call.Error
174176
}
175177

176-
func parseOptions(opts ...*Options) (*Options, error) {
178+
func parseOptions(opts ...*Option) (*Option, error) {
177179
// if opts is nil or pass nil as parameter
178180
if len(opts) == 0 || opts[0] == nil {
179-
return defaultOptions, nil
181+
return DefaultOption, nil
180182
}
181183
if len(opts) != 1 {
182184
return nil, errors.New("number of options is more than 1")
183185
}
184186
opt := opts[0]
185-
opt.MagicNumber = defaultOptions.MagicNumber
187+
opt.MagicNumber = DefaultOption.MagicNumber
186188
if opt.CodecType == "" {
187-
opt.CodecType = defaultOptions.CodecType
189+
opt.CodecType = DefaultOption.CodecType
188190
}
189191
return opt, nil
190192
}
191193

192-
func NewClient(conn io.ReadWriteCloser, opts ...*Options) (*Client, error) {
193-
opt, err := parseOptions(opts...)
194-
if err != nil {
195-
return nil, err
196-
}
194+
func NewClient(conn net.Conn, opt *Option) (*Client, error) {
197195
f := codec.NewCodecFuncMap[opt.CodecType]
198196
if f == nil {
199-
err = fmt.Errorf("invalid codec type %s", opt.CodecType)
197+
err := fmt.Errorf("invalid codec type %s", opt.CodecType)
200198
log.Println("rpc client: codec error:", err)
201199
return nil, err
202200
}
203201
// send options with server
204-
if err = json.NewEncoder(conn).Encode(opt); err != nil {
202+
if err := json.NewEncoder(conn).Encode(opt); err != nil {
205203
log.Println("rpc client: options error: ", err)
206204
_ = conn.Close()
207205
return nil, err
208206
}
209207
return newClientCodec(f(conn), opt), nil
210208
}
211209

212-
func newClientCodec(cc codec.Codec, opt *Options) *Client {
210+
func newClientCodec(cc codec.Codec, opt *Option) *Client {
213211
client := &Client{
212+
seq: 1, // seq starts with 1, 0 means invalid call
214213
cc: cc,
215214
opt: opt,
216215
pending: make(map[uint64]*Call),
@@ -219,11 +218,19 @@ func newClientCodec(cc codec.Codec, opt *Options) *Client {
219218
return client
220219
}
221220

222-
// Dial connects to an RPC server at the specified network address
223-
func Dial(network, address string, opts ...*Options) (*Client, error) {
221+
func dial(network, address string, opt *Option) (*Client, error) {
224222
conn, err := net.Dial(network, address)
225223
if err != nil {
226224
return nil, err
227225
}
228-
return NewClient(conn, opts...)
226+
return NewClient(conn, opt)
227+
}
228+
229+
// Dial connects to an RPC server at the specified network address
230+
func Dial(network, address string, opts ...*Option) (*Client, error) {
231+
opt, err := parseOptions(opts...)
232+
if err != nil {
233+
return nil, err
234+
}
235+
return dial(network, address, opt)
229236
}

gee-rpc/day3-service/server.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,12 @@ import (
1818

1919
const MagicNumber = 0x3bef5c
2020

21-
type Options struct {
21+
type Option struct {
2222
MagicNumber int // MagicNumber marks this's a geerpc request
2323
CodecType codec.Type // client may choose different Codec to encode body
2424
}
2525

26-
var defaultOptions = &Options{
26+
var DefaultOption = &Option{
2727
MagicNumber: MagicNumber,
2828
CodecType: codec.GobType,
2929
}
@@ -45,7 +45,7 @@ var DefaultServer = NewServer()
4545
// ServeConn blocks, serving the connection until the client hangs up.
4646
func (server *Server) ServeConn(conn io.ReadWriteCloser) {
4747
defer func() { _ = conn.Close() }()
48-
var opt Options
48+
var opt Option
4949
if err := json.NewDecoder(conn).Decode(&opt); err != nil {
5050
log.Println("rpc server: options error: ", err)
5151
return
@@ -162,6 +162,8 @@ func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.
162162
err := req.svc.call(req.mtype, req.argv, req.replyv)
163163
if err != nil {
164164
req.h.Error = err.Error()
165+
server.sendResponse(cc, req.h, invalidRequest, sending)
166+
return
165167
}
166168
server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
167169
}

0 commit comments

Comments
 (0)