Skip to content

Commit

Permalink
support requesting multiple timestamps
Browse files Browse the repository at this point in the history
  • Loading branch information
danthegoodman1 committed Jul 13, 2024
1 parent aeb8e76 commit 1ac22ec
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 19 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,9 @@ This also ensures that the request and response are each a single TCP frame.

`/timestamp` can be used to fetch a unique monotonic 16 byte hybrid timestamp value (this is the one you want to use).

This will automatically redirect (308) if the server requested to is not the leader. Clients should not lean on this at scale, as this causes a Raft read to get the address of the leader. Clients should use client-aware routing to update their local address cache if they encounter this (see [CLIENT_DESIGN.md](CLIENT_DESIGN.md)).
This will return a 409 if the node is not the leader. Clients should use client-aware routing to update their local address cache if they encounter this (see [CLIENT_DESIGN.md](CLIENT_DESIGN.md)).

Can use the query param `n` to specify a number >= 1, which will return multiple timestamps that are guaranteed to share the same epoch and have a sequential epoch index. These timestamps are appended to each other, so `n=2` will return a 32 byte body.


`/members` returns a JSON in the shape of:
Expand Down
22 changes: 13 additions & 9 deletions http_server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"net"
"net/http"
"os"
"strconv"
"time"

"github.com/danthegoodman1/EpicEpoch/gologger"
Expand Down Expand Up @@ -90,7 +91,7 @@ func StartHTTPServer(epochHost *raft.EpochHost) *HTTPServer {

// Create HTTP/3 server
s.quicServer = &http3.Server{
Addr: ":443",
Addr: listener.Addr().String(),
Handler: s.Echo,
TLSConfig: tlsConfig,
}
Expand Down Expand Up @@ -170,21 +171,24 @@ func (s *HTTPServer) GetTimestamp(c echo.Context) error {
}

if leader != utils.NodeID {
membership, err := s.EpochHost.GetMembership(ctx)
if err != nil {
return fmt.Errorf("error in EpochHost.GetMembership: %w", err)
}
return c.String(http.StatusConflict, fmt.Sprintf("node (%d) is not the leader (%d)", utils.NodeID, leader))
}

return c.Redirect(http.StatusPermanentRedirect, membership.Leader.Addr)
// Get one or more timestamps
count := 1
if n := c.QueryParam("n"); n != "" {
count, err = strconv.Atoi(n)
if err != nil || count < 1 {
return c.String(http.StatusBadRequest, fmt.Sprintf("invalid n param, must be a number >= 1 if provided"))
}
}

// Get a timestamp
timestamp, err := s.EpochHost.GetUniqueTimestamp(ctx)
payload, err := s.EpochHost.GetUniqueTimestamp(ctx, count)
if err != nil {
return fmt.Errorf("error in EpochHost.GetUniqueTimestamp: %w", err)
}

return c.Blob(http.StatusOK, "application/octet-stream", timestamp)
return c.Blob(http.StatusOK, "application/octet-stream", payload)
}

func (s *HTTPServer) GetMembership(c echo.Context) error {
Expand Down
24 changes: 15 additions & 9 deletions raft/epoch_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package raft
import (
"context"
"encoding/binary"
"errors"
"fmt"
"github.com/danthegoodman1/EpicEpoch/utils"
"github.com/lni/dragonboat/v3"
Expand Down Expand Up @@ -38,6 +37,7 @@ type (
pendingRead struct {
// callbackChan is a channel to write back to with the produced timestamp
callbackChan chan []byte
count int
}
)

Expand Down Expand Up @@ -118,11 +118,16 @@ func (e *EpochHost) generateTimestamps() {
// Write to the pending requests
req := <-e.requestChan

reqIndex := e.epochIndex.Add(1)
// Build the timestamp
timestamp := make([]byte, 16) // 8 for epoch, 8 for index
binary.BigEndian.PutUint64(timestamp[:8], currentEpoch.Epoch)
binary.BigEndian.PutUint64(timestamp[8:], reqIndex)
timestamp := make([]byte, 16*req.count) // 8 for epoch, 8 for index, multiply for each
logger.Debug().Msgf("writing for %d", req.count)
for i := 0; i < req.count; i++ {
reqIndex := e.epochIndex.Add(1)
offset := i * 16
fmt.Println("writing to bounds", offset, offset+16)
binary.BigEndian.PutUint64(timestamp[offset:offset+8], currentEpoch.Epoch)
binary.BigEndian.PutUint64(timestamp[offset+8:offset+16], reqIndex)
}

select {
case req.callbackChan <- timestamp:
Expand Down Expand Up @@ -152,11 +157,12 @@ func (e *EpochHost) Stop() {
e.nodeHost.Stop()
}

var ErrNoDeadline = errors.New("missing deadline in context")

// GetUniqueTimestamp gets a unique hybrid timestamp to serve to a client
func (e *EpochHost) GetUniqueTimestamp(ctx context.Context) ([]byte, error) {
pr := &pendingRead{callbackChan: make(chan []byte, 1)}
func (e *EpochHost) GetUniqueTimestamp(ctx context.Context, count int) ([]byte, error) {
if count < 1 {
return nil, fmt.Errorf("count must be >= 1")
}
pr := &pendingRead{callbackChan: make(chan []byte, 1), count: count}

// Register request
err := utils.WriteWithContext(ctx, e.requestChan, pr)
Expand Down

0 comments on commit 1ac22ec

Please sign in to comment.