Skip to content

Commit

Permalink
update conn, req
Browse files Browse the repository at this point in the history
  • Loading branch information
travisjeffery committed Aug 6, 2018
1 parent edb1a73 commit afb5aaf
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 1 deletion.
42 changes: 42 additions & 0 deletions jocko/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,34 @@ func (c *Conn) StopReplica(req *protocol.StopReplicaRequest) (*protocol.StopRepl
return &resp, nil
}

// UpdateMetadata sends an update metadata request and returns the response.
func (c *Conn) UpdateMetadata(req *protocol.UpdateMetadataRequest) (*protocol.UpdateMetadataResponse, error) {
var resp protocol.UpdateMetadataResponse
err := c.readOperation(func(deadline time.Time, id int32) error {
return c.writeRequest(req)
}, func(deadline time.Time, size int) error {
return c.readResponse(&resp, size, req.Version())
})
if err != nil {
return nil, err
}
return &resp, nil
}

// ControlledShutdown sends a controlled shutdown request and returns the response.
func (c *Conn) ControlledShutdown(req *protocol.ControlledShutdownRequest) (*protocol.ControlledShutdownResponse, error) {
var resp protocol.ControlledShutdownResponse
err := c.readOperation(func(deadline time.Time, id int32) error {
return c.writeRequest(req)
}, func(deadline time.Time, size int) error {
return c.readResponse(&resp, size, req.Version())
})
if err != nil {
return nil, err
}
return &resp, nil
}

// OffsetCommit sends an offset commit and returns the response.
func (c *Conn) OffsetCommit(req *protocol.OffsetCommitRequest) (*protocol.OffsetCommitResponse, error) {
var resp protocol.OffsetCommitResponse
Expand All @@ -163,6 +191,20 @@ func (c *Conn) OffsetCommit(req *protocol.OffsetCommitRequest) (*protocol.Offset
return &resp, nil
}

// SaslHandshake sends a sasl handshake request and returns the response.
func (c *Conn) SaslHandshake(req *protocol.SaslHandshakeRequest) (*protocol.SaslHandshakeResponse, error) {
var resp protocol.SaslHandshakeResponse
err := c.readOperation(func(deadline time.Time, id int32) error {
return c.writeRequest(req)
}, func(deadline time.Time, size int) error {
return c.readResponse(&resp, size, req.Version())
})
if err != nil {
return nil, err
}
return &resp, nil
}

// OffsetFetch sends an offset fetch and returns the response.
func (c *Conn) OffsetFetch(req *protocol.OffsetFetchRequest) (*protocol.OffsetFetchResponse, error) {
var resp protocol.OffsetFetchResponse
Expand Down
2 changes: 1 addition & 1 deletion protocol/controlled_shutdown_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ func (r *ControlledShutdownRequest) Key() int16 {
}

func (r ControlledShutdownRequest) Version() int16 {
return r.Versoin
return r.APIVersion
}

0 comments on commit afb5aaf

Please sign in to comment.