Skip to content

Commit

Permalink
Switch to channels from ring buffer, performance testing (#6)
Browse files Browse the repository at this point in the history
* convert to channels

* logger

* rename comments

* debug log

* perf test

add info log for serving channel

* perf test

add info log for serving channel

* perf test

add info log for serving channel
  • Loading branch information
danthegoodman1 authored Jul 13, 2024
1 parent 154fe5e commit f9ba33f
Show file tree
Hide file tree
Showing 4 changed files with 157 additions and 28 deletions.
112 changes: 111 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ Used for distributed systems and clients, like distributed transactions. Self-su
* [Latency and concurrency](#latency-and-concurrency)
* [Latency optimizations](#latency-optimizations)
* [Concurrency optimizations](#concurrency-optimizations)
* [Performance testing](#performance-testing)
* [Simple test (buffer 10k)](#simple-test-buffer-10k)
* [Performance test (buffer 10k)](#performance-test-buffer-10k)
<!-- TOC -->

## Getting started
Expand Down Expand Up @@ -86,4 +89,111 @@ The nature of the hybrid timestamp allows concurrency limited only by the epoch

As a result the latency ceiling of request time is roughly the latency of a linearizable read through raft + the time to respond to all pending requests. The raft read is amortized across all pending requests.

To ensure that this node is the leader, a linearizable read across the cluster must take place, but many requests can share this via request collapsing.
To ensure that this node is the leader, a linearizable read across the cluster must take place, but many requests can share this via request collapsing.

## Performance testing

Using k6 on a 200 core C3D from GCP, running all 3 instances and the test, the following was observed:

### Simple test (buffer 10k)

5-7 cores used at peak

followers using 50-80% cpu

```
scenarios: (100.00%) 1 scenario, 100 max VUs, 2m30s max duration (incl. graceful stop):
* default: Up to 100 looping VUs for 2m0s over 3 stages (gracefulRampDown: 30s, gracefulStop: 30s)
running (0m22.0s), 073/100 VUs, 1385955 complete and 0 interrupted iteration
✗ is status 200
↳ 99% — ✓ 10270369 / ✗ 443
checks.........................: 99.99% ✓ 10270369 ✗ 443
data_received..................: 1.5 GB 13 MB/s
data_sent......................: 914 MB 7.6 MB/s
http_req_blocked...............: avg=1.42µs min=320ns med=1.1µs max=51.93ms p(90)=1.88µs p(95)=2.23µs
http_req_connecting............: avg=1ns min=0s med=0s max=1.07ms p(90)=0s p(95)=0s
✓ http_req_duration..............: avg=822.67µs min=132.05µs med=576.33µs max=1s p(90)=1.21ms p(95)=1.78ms
{ expected_response:true }...: avg=779.52µs min=132.05µs med=576.33µs max=1s p(90)=1.21ms p(95)=1.78ms
✓ { staticAsset:yes }..........: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
✓ http_req_failed................: 0.00% ✓ 443 ✗ 10270369
http_req_receiving.............: avg=21.84µs min=5.02µs med=18.35µs max=51.44ms p(90)=25.42µs p(95)=30.34µs
http_req_sending...............: avg=7.32µs min=1.71µs med=5.85µs max=31.67ms p(90)=7.98µs p(95)=9.27µs
http_req_tls_handshaking.......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
http_req_waiting...............: avg=793.5µs min=122.14µs med=548.77µs max=1s p(90)=1.17ms p(95)=1.73ms
http_reqs......................: 10270812 85589.648371/s
iteration_duration.............: avg=871.13µs min=151.51µs med=621.92µs max=1s p(90)=1.26ms p(95)=1.86ms
iterations.....................: 10270812 85589.648371/s
vus............................: 1 min=1 max=100
vus_max........................: 100 min=100 max=100
running (2m00.0s), 000/100 VUs, 10270812 complete and 0 interrupted iterations
```

### Performance test (buffer 10k)

_Also tested with 1M buffer, didn't make a difference in this test._

63-70 cores used at peak during the test
55-65 cores used by the test runner
<7GB of ram used

followers using 50-80% cpu

```
scenarios: (100.00%) 1 scenario, 10000 max VUs, 4m30s max duration (incl. graceful stop):
* default: Up to 10000 looping VUs for 4m0s over 7 stages (gracefulRampDown: 30s, gracefulStop: 30s)
✗ is status 200
↳ 99% — ✓ 34288973 / ✗ 61
checks.........................: 99.99% ✓ 34288973 ✗ 61
data_received..................: 5.0 GB 21 MB/s
data_sent......................: 3.1 GB 13 MB/s
http_req_blocked...............: avg=7.89µs min=320ns med=1.9µs max=221.57ms p(90)=2.71µs p(95)=3.24µs
http_req_connecting............: avg=294ns min=0s med=0s max=123.89ms p(90)=0s p(95)=0s
✓ http_req_duration..............: avg=16.99ms min=172.36µs med=6.31ms max=1s p(90)=67.13ms p(95)=83.12ms
{ expected_response:true }...: avg=16.99ms min=172.36µs med=6.31ms max=1s p(90)=67.13ms p(95)=83.12ms
✓ { staticAsset:yes }..........: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
✓ http_req_failed................: 0.00% ✓ 61 ✗ 34288973
http_req_receiving.............: avg=147.78µs min=5.3µs med=16.97µs max=215.76ms p(90)=27.1µs p(95)=41.97µs
http_req_sending...............: avg=92.2µs min=1.76µs med=6.3µs max=222.21ms p(90)=9.36µs p(95)=20.79µs
http_req_tls_handshaking.......: avg=0s min=0s med=0s max=0s p(90)=0s p(95)=0s
http_req_waiting...............: avg=16.75ms min=149.66µs med=6.22ms max=1s p(90)=66.73ms p(95)=82.74ms
http_reqs......................: 34289034 142866.904307/s
iteration_duration.............: avg=30.51ms min=205.04µs med=19.54ms max=1s p(90)=87.72ms p(95)=108.99ms
iterations.....................: 34289034 142866.904307/s
vus............................: 6 min=6 max=10000
vus_max........................: 10000 min=10000 max=10000
running (4m00.0s), 00000/10000 VUs, 34289034 complete and 0 interrupted iterations
```

It seems like the request completion rate did not grow much past 300 vus, so considering that we ran up to 10k vus I believe this is either limited by the HTTP framework or testing framework

Some log output of the duration between reading from raft and writing to the pending request channels with incremented hybrid timestamps:
```
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 27 requests in 14.72µs
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 1 requests in 5.75µs
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 6 requests in 5.9µs
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 2 requests in 6.52µs
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 1 requests in 1.78µs
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 67 requests in 63.51µs
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 87 requests in 83.8µs
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 212 requests in 2.81964ms
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 7139 requests in 6.427489ms
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 26 requests in 497.96µs
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 2034 requests in 1.50941ms
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 1 requests in 4.66µs
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 1 requests in 3.84µs
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 1 requests in 1.55µs
1:42PM INF raft/epoch_host.go:133 raft.(*EpochHost).generateTimestamps() > Served 1 requests in 1.35µs
```

Looks like GC could be causing some issues, but without some go perf investigations I almost want to blame the http testing framework for not loading evenly.
51 changes: 26 additions & 25 deletions raft/epoch_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/danthegoodman1/EpicEpoch/ring"
"github.com/danthegoodman1/EpicEpoch/utils"
"github.com/lni/dragonboat/v3"
"sync/atomic"
Expand All @@ -26,7 +25,7 @@ type (

readerAgentStopChan chan struct{}

requestBuffer *ring.RingBuffer[*pendingRead]
requestChan chan *pendingRead

readerAgentReading atomic.Bool

Expand All @@ -37,8 +36,8 @@ type (
}

pendingRead struct {
// callbackRing is a ring buffer to write back to with the produced timestamp
callbackRing *ring.RingBuffer[[]byte]
// callbackChan is a channel to write back to with the produced timestamp
callbackChan chan []byte
}
)

Expand All @@ -60,9 +59,11 @@ func (e *EpochHost) readerAgentLoop() {
// generateTimestamps generates timestamps for pending requests, and handles looping if more requests come in
func (e *EpochHost) generateTimestamps() {
// Capture the current pending requests so there's no case we get locked
pendingRequests := e.requestBuffer.Len()
pendingRequests := len(e.requestChan)
logger.Debug().Msgf("Serving %d pending requests", pendingRequests)

// Read the epoch
s := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*time.Duration(raftRttMs)*100)
defer cancel()
currentEpochI, err := e.nodeHost.SyncRead(ctx, ClusterID, nil)
Expand All @@ -71,6 +72,7 @@ func (e *EpochHost) generateTimestamps() {
logger.Fatal().Err(err).Msg("error in nodeHost.SyncRead")
return
}
logger.Debug().Msgf("Read from raft in %+v", time.Since(s))

currentEpoch, ok := currentEpochI.(PersistenceEpoch)
if !ok {
Expand Down Expand Up @@ -111,27 +113,28 @@ func (e *EpochHost) generateTimestamps() {
e.epochIndex.Store(0)
}

s = time.Now()
for range pendingRequests {
// Write to the pending requests
req, err := e.requestBuffer.Poll(time.Millisecond * 100) // some timeout (this is quite long for a ring buffer)
if err != nil {
// This means that we lost items in the buffer somehow, very bad, must crash
logger.Fatal().Err(err).Msg("timed out polling the ring buffer, did items get lost in the ring buffer?")
return
}
req := <-e.requestChan

reqIndex := e.epochIndex.Add(1)
// Build the timestamp
timestamp := make([]byte, 16) // 8 for epoch, 8 for index
binary.BigEndian.PutUint64(timestamp[:8], currentEpoch.Epoch)
binary.BigEndian.PutUint64(timestamp[8:], reqIndex)

req.callbackRing.Put(timestamp)
select {
case req.callbackChan <- timestamp:
default:
logger.Warn().Msg("did not have listener on callback chan when generating timestamp")
}
}
logger.Debug().Msgf("Served %d requests in %+v", pendingRequests, time.Since(s))

if e.requestBuffer.Len() > 0 {
if len(e.requestChan) > 0 {
// There are more requests, generating more timestamps
logger.Debug().Msg("found more requests in ring buffer, generating more timestamps")
logger.Debug().Msg("found more requests in request channel, generating more timestamps")
e.generateTimestamps()
}
}
Expand All @@ -153,10 +156,13 @@ var ErrNoDeadline = errors.New("missing deadline in context")

// GetUniqueTimestamp gets a unique hybrid timestamp to serve to a client
func (e *EpochHost) GetUniqueTimestamp(ctx context.Context) ([]byte, error) {
pr := &pendingRead{callbackRing: ring.NewRingBuffer[[]byte](1)}
pr := &pendingRead{callbackChan: make(chan []byte, 1)}

// Register entry in ring buffer
e.requestBuffer.Put(pr)
// Register request
err := utils.WriteWithContext(ctx, e.requestChan, pr)
if err != nil {
return nil, fmt.Errorf("error writing nil, pending request to request buffer: %w", err)
}

// Try to poke the reader goroutine
select {
Expand All @@ -166,15 +172,10 @@ func (e *EpochHost) GetUniqueTimestamp(ctx context.Context) ([]byte, error) {
logger.Debug().Msg("poke chan had nothing waiting")
}

deadline, ok := ctx.Deadline()
if !ok {
return nil, ErrNoDeadline
}

// Wait for the response...
timestamp, err := pr.callbackRing.Poll(deadline.Sub(time.Now()))
// Wait for the response
timestamp, err := utils.ReadWithContext(ctx, pr.callbackChan)
if err != nil {
return nil, fmt.Errorf("error in callbackRing.Get(): %w", err)
return nil, fmt.Errorf("error reading from callback channel with context: %w", err)
}

return timestamp, nil
Expand Down
3 changes: 1 addition & 2 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"github.com/danthegoodman1/EpicEpoch/ring"
"github.com/danthegoodman1/EpicEpoch/utils"
"github.com/lni/dragonboat/v3"
"github.com/lni/dragonboat/v3/config"
Expand Down Expand Up @@ -57,7 +56,7 @@ func StartRaft() (*EpochHost, error) {
epochIndex: atomic.Uint64{},
lastEpoch: atomic.Uint64{},
readerAgentStopChan: make(chan struct{}),
requestBuffer: ring.NewRingBuffer[*pendingRead](utils.TimestampRequestBuffer),
requestChan: make(chan *pendingRead, utils.TimestampRequestBuffer),
readerAgentReading: atomic.Bool{},
pokeChan: make(chan struct{}),
updateTicker: time.NewTicker(time.Millisecond * time.Duration(utils.EpochIntervalMS)),
Expand Down
19 changes: 19 additions & 0 deletions utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,3 +323,22 @@ func MustMarshal(v any) []byte {
}
return b
}

func ReadWithContext[T any](ctx context.Context, ch chan T) (res T, err error) {
select {
case val := <-ch:
return val, nil
case <-ctx.Done():
err = ctx.Err()
return
}
}

func WriteWithContext[T any](ctx context.Context, ch chan<- T, val T) error {
select {
case ch <- val:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

0 comments on commit f9ba33f

Please sign in to comment.