Skip to content

Commit

Permalink
epoch host wrapper
Browse files Browse the repository at this point in the history
http server knows raft readiness
  • Loading branch information
danthegoodman1 committed Jul 12, 2024
1 parent 500858a commit 940f1bb
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 54 deletions.
38 changes: 32 additions & 6 deletions http_server/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"github.com/danthegoodman1/EpicEpoch/raft"
"net"
"net/http"
"os"
Expand All @@ -21,21 +22,23 @@ import (
var logger = gologger.NewLogger()

type HTTPServer struct {
Echo *echo.Echo
Echo *echo.Echo
EpochHost *raft.EpochHost
}

type CustomValidator struct {
validator *validator.Validate
}

func StartHTTPServer() *HTTPServer {
func StartHTTPServer(epochHost *raft.EpochHost) *HTTPServer {
listener, err := net.Listen("tcp", fmt.Sprintf(":%s", utils.GetEnvOrDefault("HTTP_PORT", "8080")))
if err != nil {
logger.Error().Err(err).Msg("error creating tcp listener, exiting")
os.Exit(1)
}
s := &HTTPServer{
Echo: echo.New(),
Echo: echo.New(),
EpochHost: epochHost,
}
s.Echo.HideBanner = true
s.Echo.HidePort = true
Expand All @@ -46,8 +49,8 @@ func StartHTTPServer() *HTTPServer {
s.Echo.Use(middleware.CORS())
s.Echo.Validator = &CustomValidator{validator: validator.New()}

// technical - no auth
s.Echo.GET("/hc", s.HealthCheck)
s.Echo.GET("/up", s.UpCheck)
s.Echo.GET("/ready", s.ReadyCheck)

s.Echo.Listener = listener
go func() {
Expand Down Expand Up @@ -84,10 +87,33 @@ func ValidateRequest(c echo.Context, s interface{}) error {
return nil
}

func (*HTTPServer) HealthCheck(c echo.Context) error {
// UpCheck is just whether the HTTP server is running,
// not necessarily that it's able to serve requests
func (s *HTTPServer) UpCheck(c echo.Context) error {
return c.String(http.StatusOK, "ok")
}

// ReadyCheck checks whether everything is ready to start serving requests
func (s *HTTPServer) ReadyCheck(c echo.Context) error {
ctx := c.Request().Context()
logger := zerolog.Ctx(ctx)
// Verify that raft leadership information is available
leader, available, err := s.EpochHost.GetLeader()
if err != nil {
return fmt.Errorf("error in NodeHost.GetLeaderID: %w", err)
}

if !available {
return c.String(http.StatusInternalServerError, "raft leadership not ready")
}

if leader == utils.NodeID {
logger.Debug().Msgf("Is leader (%d)", leader)
}

return c.String(http.StatusOK, fmt.Sprintf("leader=%d nodeID=%d raftAvailable=%t\n", leader, utils.NodeID, available))
}

func (s *HTTPServer) Shutdown(ctx context.Context) error {
err := s.Echo.Shutdown(ctx)
return err
Expand Down
33 changes: 19 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
package main

import (
"context"
"github.com/danthegoodman1/EpicEpoch/gologger"
"github.com/danthegoodman1/EpicEpoch/http_server"
"github.com/danthegoodman1/EpicEpoch/raft"
"os"
"os/signal"
"syscall"
"time"
)

var logger = gologger.NewLogger()

func main() {
logger.Debug().Msg("starting epic epoch api")

// start raft
nodeHost, err := raft.StartRaft()
if err != nil {
logger.Error().Err(err).Msg("raft couldn't start")
os.Exit(1)
}

// prometheusReporter := observability.NewPrometheusReporter()
// go func() {
// err := observability.StartInternalHTTPServer(":8042", prometheusReporter)
Expand All @@ -22,25 +32,20 @@ func main() {
// }
// }()

// httpServer := http_server.StartHTTPServer()
nodeHost, err := raft.StartRaft()
if err != nil {
logger.Error().Err(err).Msg("raft couldn't start")
os.Exit(1)
}
httpServer := http_server.StartHTTPServer(nodeHost)

c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
<-c
logger.Warn().Msg("received shutdown signal!")

nodeHost.Stop()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
defer cancel()
if err := httpServer.Shutdown(ctx); err != nil {
logger.Error().Err(err).Msg("failed to shutdown HTTP server")
} else {
logger.Info().Msg("successfully shutdown HTTP server")
}

// ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
// defer cancel()
// if err := httpServer.Shutdown(ctx); err != nil {
// logger.Error().Err(err).Msg("failed to shutdown HTTP server")
// } else {
// logger.Info().Msg("successfully shutdown HTTP server")
// }
nodeHost.Stop()
}
31 changes: 31 additions & 0 deletions raft/epoch_host.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package raft

import (
"github.com/lni/dragonboat/v3"
"sync/atomic"
)

type (
EpochHost struct {
nodeHost *dragonboat.NodeHost

// The monotonic incrementing index of a single epoch.
// Each request must be servied a unique (lastEpoch, epochIndex) value
epochIndex atomic.Uint64

// lastEpoch was the last epoch that we served to a request,
// used to check whether we need to swap the epoch index
lastEpoch atomic.Uint64
}
)

// GetLeader returns the leader node ID of the specified Raft cluster based
// on local node's knowledge. The returned boolean value indicates whether the
// leader information is available.
func (e *EpochHost) GetLeader() (uint64, bool, error) {
return e.nodeHost.GetLeaderID(ClusterID)
}

func (e *EpochHost) Stop() {
e.nodeHost.Stop()
}
63 changes: 29 additions & 34 deletions raft/raft.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,25 @@
package raft

import (
"context"
"fmt"
"github.com/danthegoodman1/EpicEpoch/gologger"
"github.com/danthegoodman1/EpicEpoch/utils"
"github.com/lni/dragonboat/v3"
"github.com/lni/dragonboat/v3/config"
dragonlogger "github.com/lni/dragonboat/v3/logger"
"os"
"path/filepath"
"sync/atomic"
"time"
)

func StartRaft() (*dragonboat.NodeHost, error) {
nodeID := uint64(utils.GetEnvOrDefaultInt("NODE_ID", 0))
const ClusterID = 100

func StartRaft() (*EpochHost, error) {
nodeID := utils.NodeID
rc := config.Config{
NodeID: nodeID,
ClusterID: 111,
ClusterID: ClusterID,
ElectionRTT: 10,
HeartbeatRTT: 1,
CheckQuorum: true,
Expand All @@ -27,7 +30,7 @@ func StartRaft() (*dragonboat.NodeHost, error) {
nhc := config.NodeHostConfig{
WALDir: datadir,
NodeHostDir: datadir,
RTTMillisecond: 10, // TODO: make this configurable, start lower (1ms)
RTTMillisecond: uint64(utils.GetEnvOrDefaultInt("RTT_MS", 10)),
RaftAddress: os.Getenv("RAFT_ADDR"),
}
dragonlogger.SetLoggerFactory(CreateLogger)
Expand All @@ -36,39 +39,31 @@ func StartRaft() (*dragonboat.NodeHost, error) {
panic(err)
}

err = nh.StartOnDiskCluster(map[uint64]dragonboat.Target{
1: "localhost:60001",
2: "localhost:60002",
3: "localhost:60003",
}, false, NewEpochStateMachine, rc)
if err != nil {
return nil, fmt.Errorf("error in StartOnDiskCluster: %w", err)
}

// Debug log loop
go func() {
t := time.NewTicker(time.Second * 3)
logger := gologger.NewLogger()
t := time.NewTicker(time.Second * 5)
for {
<-t.C
fmt.Println("LeaderID:")
fmt.Println(nh.GetLeaderID(111))
fmt.Println("")

leader, _, _ := nh.GetLeaderID(111)
if leader != nodeID && nodeID == 3 {
// Propose something
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
nh.SyncPropose(ctx, nh.GetNoOPSession(111), utils.MustMarshal(PersistenceEpoch{
Epoch: uint64(time.Now().UnixNano()),
}))
cancel()
}
if leader == nodeID {
// Let's read
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
i, err := nh.SyncRead(ctx, 111, nil)
if err != nil {
panic(err)
}
logger.Info().Msgf("GOT READ VALUE: %+v", i)
cancel()
}
leader, available, err := nh.GetLeaderID(ClusterID)
logger.Debug().Err(err).Msgf("Leader=%d available=%+v", leader, available)
}
}()

return nh, nh.StartOnDiskCluster(map[uint64]dragonboat.Target{
1: "localhost:60001",
2: "localhost:60002",
3: "localhost:60003",
}, false, NewEpochStateMachine, rc)
eh := &EpochHost{
nodeHost: nh,
epochIndex: atomic.Uint64{},
lastEpoch: atomic.Uint64{},
}

return eh, nil
}
2 changes: 2 additions & 0 deletions utils/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ var (
Env = os.Getenv("ENV")
Env_TracingServiceName = os.Getenv("TRACING_SERVICE_NAME")
Env_OLTPEndpoint = os.Getenv("OLTP_ENDPOINT")

NodeID = uint64(GetEnvOrDefaultInt("NODE_ID", 0))
)

0 comments on commit 940f1bb

Please sign in to comment.