Skip to content

Commit

Permalink
add ring buffer package
Browse files Browse the repository at this point in the history
  • Loading branch information
danthegoodman1 committed Jul 12, 2024
1 parent 940f1bb commit 396ed1d
Show file tree
Hide file tree
Showing 6 changed files with 648 additions and 3 deletions.
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/prometheus/client_golang v1.16.0
github.com/rs/zerolog v1.29.1
github.com/segmentio/ksuid v1.0.4
github.com/stretchr/testify v1.8.4
github.com/uber-go/tally/v4 v4.1.7
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0
Expand All @@ -36,6 +37,7 @@ require (
github.com/cockroachdb/pebble v0.0.0-20210331181633-27fc006b8bfb // indirect
github.com/cockroachdb/redact v1.0.6 // indirect
github.com/cockroachdb/sentry-go v0.6.1-cockroachdb.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.3.0 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-playground/locales v0.14.0 // indirect
Expand Down Expand Up @@ -69,6 +71,7 @@ require (
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/miekg/dns v1.1.26 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
Expand All @@ -92,4 +95,5 @@ require (
google.golang.org/genproto/googleapis/api v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230822172742-b8732ec3820d // indirect
google.golang.org/grpc v1.59.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
25 changes: 25 additions & 0 deletions raft/epoch_host.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package raft

import (
"context"
"github.com/lni/dragonboat/v3"
"sync/atomic"
)
Expand All @@ -16,9 +17,27 @@ type (
// lastEpoch was the last epoch that we served to a request,
// used to check whether we need to swap the epoch index
lastEpoch atomic.Uint64

readerAgentStopChan chan struct{}

// timestampRequestChan is a buffered channel for incoming requests
timestampRequestChan chan chan []byte
}
)

// readerAgentLoop should be launched in a goroutine
func (e *EpochHost) readerAgentLoop() {
for {
func() { // wrapper for defer
select {
case <-e.readerAgentStopChan:
logger.Warn().Msg("reader agent loop received on stop chan, stopping")
return
}
}()
}
}

// GetLeader returns the leader node ID of the specified Raft cluster based
// on local node's knowledge. The returned boolean value indicates whether the
// leader information is available.
Expand All @@ -29,3 +48,9 @@ func (e *EpochHost) GetLeader() (uint64, bool, error) {
func (e *EpochHost) Stop() {
e.nodeHost.Stop()
}

// GetUniqueTimestamp gets a unique hybrid timestamp to serve to a client
func (e *EpochHost) GetUniqueTimestamp(ctx context.Context) ([]byte, error) {
// TODO register entry in ringbuffer
// TODO if request not in flight
}
10 changes: 7 additions & 3 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,14 @@ func StartRaft() (*EpochHost, error) {
}()

eh := &EpochHost{
nodeHost: nh,
epochIndex: atomic.Uint64{},
lastEpoch: atomic.Uint64{},
nodeHost: nh,
epochIndex: atomic.Uint64{},
lastEpoch: atomic.Uint64{},
readerAgentStopChan: make(chan struct{}),
timestampRequestChan: make(chan chan []byte, utils.TimestampRequestBuffer),
}

go eh.readerAgentLoop()

return eh, nil
}
206 changes: 206 additions & 0 deletions ring/ring.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
// Generic modification of
// https://github.com/Workiva/go-datastructures/blob/master/queue/ring.go

package ring

import (
"errors"
"runtime"
"sync/atomic"
"time"
)

var (
// ErrDisposed is returned when an operation is performed on a disposed
// queue.
ErrDisposed = errors.New(`queue: disposed`)

// ErrTimeout is returned when an applicable queue operation times out.
ErrTimeout = errors.New(`queue: poll timed out`)

// ErrEmptyQueue is returned when an non-applicable queue operation was called
// due to the queue's empty item state
ErrEmptyQueue = errors.New(`queue: empty queue`)
)

// roundUp takes a uint64 greater than 0 and rounds it up to the next
// power of 2.
func roundUp(v uint64) uint64 {
v--
v |= v >> 1
v |= v >> 2
v |= v >> 4
v |= v >> 8
v |= v >> 16
v |= v >> 32
v++
return v
}

type node struct {
position uint64
data any
}

type nodes []node

// RingBuffer is a MPMC buffer that achieves threadsafety with CAS operations
// only. A put on full or get on empty call will block until an item
// is put or retrieved. Calling Dispose on the RingBuffer will unblock
// any blocked threads with an error. This buffer is similar to the buffer
// described here: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
// with some minor additions.
type RingBuffer[T any] struct {
_padding0 [8]uint64
queue uint64
_padding1 [8]uint64
dequeue uint64
_padding2 [8]uint64
mask, disposed uint64
_padding3 [8]uint64
nodes nodes
}

func (rb *RingBuffer[T]) init(size uint64) {
size = roundUp(size)
rb.nodes = make(nodes, size)
for i := uint64(0); i < size; i++ {
rb.nodes[i] = node{position: i}
}
rb.mask = size - 1 // so we don't have to do this with every put/get operation
}

// Put adds the provided item to the queue. If the queue is full, this
// call will block until an item is added to the queue or Dispose is called
// on the queue. An error will be returned if the queue is disposed.
func (rb *RingBuffer[T]) Put(item T) error {
_, err := rb.put(item, false)
return err
}

// Offer adds the provided item to the queue if there is space. If the queue
// is full, this call will return false. An error will be returned if the
// queue is disposed.
func (rb *RingBuffer[T]) Offer(item T) (bool, error) {
return rb.put(item, true)
}

func (rb *RingBuffer[T]) put(item T, offer bool) (bool, error) {
var n *node
pos := atomic.LoadUint64(&rb.queue)
L:
for {
if atomic.LoadUint64(&rb.disposed) == 1 {
return false, ErrDisposed
}

n = &rb.nodes[pos&rb.mask]
seq := atomic.LoadUint64(&n.position)
switch dif := seq - pos; {
case dif == 0:
if atomic.CompareAndSwapUint64(&rb.queue, pos, pos+1) {
break L
}
case dif < 0:
panic(`Ring buffer in a compromised state during a put operation.`)
default:
pos = atomic.LoadUint64(&rb.queue)
}

if offer {
return false, nil
}

runtime.Gosched() // free up the cpu before the next iteration
}

n.data = item
atomic.StoreUint64(&n.position, pos+1)
return true, nil
}

// Get will return the next item in the queue. This call will block
// if the queue is empty. This call will unblock when an item is added
// to the queue or Dispose is called on the queue. An error will be returned
// if the queue is disposed.
func (rb *RingBuffer[T]) Get() (T, error) {
return rb.Poll(0)
}

// Poll will return the next item in the queue. This call will block
// if the queue is empty. This call will unblock when an item is added
// to the queue, Dispose is called on the queue, or the timeout is reached. An
// error will be returned if the queue is disposed or a timeout occurs. A
// non-positive timeout will block indefinitely.
func (rb *RingBuffer[T]) Poll(timeout time.Duration) (t T, err error) {
var (
n *node
pos = atomic.LoadUint64(&rb.dequeue)
start time.Time
)
if timeout > 0 {
start = time.Now()
}
L:
for {
if atomic.LoadUint64(&rb.disposed) == 1 {
err = ErrDisposed
return
}

n = &rb.nodes[pos&rb.mask]
seq := atomic.LoadUint64(&n.position)
switch dif := seq - (pos + 1); {
case dif == 0:
if atomic.CompareAndSwapUint64(&rb.dequeue, pos, pos+1) {
break L
}
case dif < 0:
panic(`Ring buffer in compromised state during a get operation.`)
default:
pos = atomic.LoadUint64(&rb.dequeue)
}

if timeout > 0 && time.Since(start) >= timeout {
err = ErrTimeout
return
}

runtime.Gosched() // free up the cpu before the next iteration
}
data := n.data
n.data = nil
atomic.StoreUint64(&n.position, pos+rb.mask+1)
return data.(T), nil
}

// Len returns the number of items in the queue.
func (rb *RingBuffer[T]) Len() uint64 {
return atomic.LoadUint64(&rb.queue) - atomic.LoadUint64(&rb.dequeue)
}

// Cap returns the capacity of this ring buffer.
func (rb *RingBuffer[T]) Cap() uint64 {
return uint64(len(rb.nodes))
}

// Dispose will dispose of this queue and free any blocked threads
// in the Put and/or Get methods. Calling those methods on a disposed
// queue will return an error.
func (rb *RingBuffer[T]) Dispose() {
atomic.CompareAndSwapUint64(&rb.disposed, 0, 1)
}

// IsDisposed will return a bool indicating if this queue has been
// disposed.
func (rb *RingBuffer[T]) IsDisposed() bool {
return atomic.LoadUint64(&rb.disposed) == 1
}

// NewRingBuffer will allocate, initialize, and return a ring buffer
// with the specified size.
func NewRingBuffer[T any](size uint64) *RingBuffer[T] {
rb := &RingBuffer[T]{}
rb.init(size)
return rb
}
Loading

0 comments on commit 396ed1d

Please sign in to comment.