Skip to content

Commit

Permalink
op-service: Stop(ctx) calls, shutdown testing, op-node rpc server update
Browse files Browse the repository at this point in the history
  • Loading branch information
protolambda committed Oct 9, 2023
1 parent 246d7db commit 7e4821e
Show file tree
Hide file tree
Showing 16 changed files with 246 additions and 85 deletions.
6 changes: 5 additions & 1 deletion endpoint-monitor/endpoint_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ func Main(version string) func(cliCtx *cli.Context) error {
l.Error("error starting metrics server", err)
return err
}
defer srv.Close()
defer func() {
if err := srv.Stop(cliCtx.Context); err != nil {
l.Error("failed to stop metrics server", "err", err)
}
}()
opio.BlockOnInterrupts()

return nil
Expand Down
40 changes: 22 additions & 18 deletions indexer/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@ package api

import (
"context"
"errors"
"fmt"
"net"
"net/http"
"runtime/debug"
"strconv"
"sync"

"github.com/ethereum-optimism/optimism/indexer/api/routes"
"github.com/ethereum-optimism/optimism/indexer/config"
"github.com/ethereum-optimism/optimism/indexer/database"
"github.com/ethereum-optimism/optimism/op-service/httputil"
"github.com/ethereum-optimism/optimism/op-service/metrics"
"github.com/ethereum/go-ethereum/log"
"github.com/go-chi/chi/v5"
Expand Down Expand Up @@ -120,41 +123,42 @@ func (a *API) Port() int {

// startServer ... Starts the API server
func (a *API) startServer(ctx context.Context) error {
a.log.Info("api server listening...", "port", a.serverConfig.Port)
server := http.Server{Addr: fmt.Sprintf(":%d", a.serverConfig.Port), Handler: a.router}
a.log.Debug("api server listening...", "port", a.serverConfig.Port)
addr := net.JoinHostPort(a.serverConfig.Host, strconv.Itoa(a.serverConfig.Port))
srv, err := httputil.StartHTTPServer(addr, a.router)
if err != nil {
return fmt.Errorf("failed to start API server: %w", err)
}

addr := fmt.Sprintf(":%d", a.serverConfig.Port)
listener, err := net.Listen("tcp", addr)
host, portStr, err := net.SplitHostPort(srv.Addr().String())
if err != nil {
a.log.Error("Listen:", err)
return err
return errors.Join(err, srv.Close())
}
tcpAddr, ok := listener.Addr().(*net.TCPAddr)
if !ok {
return fmt.Errorf("failed to get TCP address from network listener")
port, err := strconv.Atoi(portStr)
if err != nil {
return errors.Join(err, srv.Close())
}

// Update the port in the config in case the OS chose a different port
// than the one we requested (e.g. using port 0 to fetch a random open port)
a.serverConfig.Port = tcpAddr.Port
a.serverConfig.Host = host
a.serverConfig.Port = port

err = http.Serve(listener, server.Handler)
if err != nil {
a.log.Error("api server stopped with error", "err", err)
} else {
a.log.Info("api server stopped")
<-ctx.Done()
if err := srv.Stop(context.Background()); err != nil {
return fmt.Errorf("failed to shutdown api server: %w", err)
}
return err
return nil
}

// startMetricsServer ... Starts the metrics server
func (a *API) startMetricsServer(ctx context.Context) error {
a.log.Info("starting metrics server...", "port", a.metricsConfig.Port)
a.log.Debug("starting metrics server...", "port", a.metricsConfig.Port)
srv, err := metrics.StartServer(a.metricsRegistry, a.metricsConfig.Host, a.metricsConfig.Port)
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
<-ctx.Done()
defer a.log.Info("metrics server stopped")
return srv.Close()
return srv.Stop(context.Background())
}
4 changes: 2 additions & 2 deletions indexer/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (i *Indexer) startHttpServer(ctx context.Context) error {
i.log.Info("http server started", "addr", srv.Addr())
<-ctx.Done()
defer i.log.Info("http server stopped")
return srv.Close()
return srv.Stop(context.Background())
}

func (i *Indexer) startMetricsServer(ctx context.Context) error {
Expand All @@ -131,7 +131,7 @@ func (i *Indexer) startMetricsServer(ctx context.Context) error {
i.log.Info("metrics server started", "addr", srv.Addr())
<-ctx.Done()
defer i.log.Info("metrics server stopped")
return srv.Close()
return srv.Stop(context.Background())
}

// Start starts the indexing service on L1 and L2 chains
Expand Down
12 changes: 10 additions & 2 deletions op-batcher/batcher/batch_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,11 @@ func Main(version string, cliCtx *cli.Context) error {
return err
}
l.Info("started pprof server", "addr", pprofSrv.Addr())
defer pprofSrv.Close()
defer func() {
if err := pprofSrv.Stop(context.Background()); err != nil {
l.Error("failed to stop pprof server", "err", err)
}
}()
}

metricsCfg := cfg.MetricsConfig
Expand All @@ -72,7 +76,11 @@ func Main(version string, cliCtx *cli.Context) error {
return fmt.Errorf("failed to start metrics server: %w", err)
}
l.Info("started metrics server", "addr", metricsSrv.Addr())
defer metricsSrv.Close()
defer func() {
if err := metricsSrv.Stop(context.Background()); err != nil {
l.Error("failed to stop pprof server", "err", err)
}
}()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
m.StartBalanceMetrics(ctx, l, batchSubmitter.L1Client, batchSubmitter.TxManager.From())
Expand Down
3 changes: 1 addition & 2 deletions op-batcher/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package metrics
import (
"context"

"github.com/ethereum-optimism/optimism/op-service/httputil"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
Expand All @@ -13,6 +11,7 @@ import (

"github.com/ethereum-optimism/optimism/op-node/rollup/derive"
"github.com/ethereum-optimism/optimism/op-service/eth"
"github.com/ethereum-optimism/optimism/op-service/httputil"
opmetrics "github.com/ethereum-optimism/optimism/op-service/metrics"
txmetrics "github.com/ethereum-optimism/optimism/op-service/txmgr/metrics"
)
Expand Down
6 changes: 5 additions & 1 deletion op-bootnode/bootnode/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@ func Main(cliCtx *cli.Context) error {
if err != nil {
return fmt.Errorf("failed to start metrics server: %w", err)
}
defer metricsSrv.Close()
defer func() {
if err := metricsSrv.Stop(context.Background()); err != nil {
log.Error("failed to stop metrics server", "err", err)
}
}()
log.Info("started metrics server", "addr", metricsSrv.Addr())
m.RecordUp()
}
Expand Down
25 changes: 15 additions & 10 deletions op-challenger/game/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ type Service struct {
metricsSrv *httputil.HTTPServer
}

func (s *Service) Close() error {
func (s *Service) Stop(ctx context.Context) error {
var result error
if s.sched != nil {
result = errors.Join(result, s.sched.Close())
}
if s.pprofSrv != nil {
result = errors.Join(result, s.pprofSrv.Close())
result = errors.Join(result, s.pprofSrv.Stop(ctx))
}
if s.metricsSrv != nil {
result = errors.Join(result, s.metricsSrv.Close())
result = errors.Join(result, s.metricsSrv.Stop(ctx))
}
return result
}
Expand Down Expand Up @@ -66,7 +69,7 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se
logger.Debug("starting pprof", "addr", pprofConfig.ListenAddr, "port", pprofConfig.ListenPort)
pprofSrv, err := oppprof.StartServer(pprofConfig.ListenAddr, pprofConfig.ListenPort)
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to start pprof server: %w", err), s.Close())
return nil, errors.Join(fmt.Errorf("failed to start pprof server: %w", err), s.Stop(ctx))
}
s.pprofSrv = pprofSrv
logger.Info("started pprof server", "addr", pprofSrv.Addr())
Expand All @@ -77,7 +80,7 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se
logger.Debug("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
metricsSrv, err := m.Start(metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to start metrics server: %w", err), s.Close())
return nil, errors.Join(fmt.Errorf("failed to start metrics server: %w", err), s.Stop(ctx))
}
logger.Info("started metrics server", "addr", metricsSrv.Addr())
s.metricsSrv = metricsSrv
Expand All @@ -86,7 +89,7 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se

factory, err := bindings.NewDisputeGameFactory(cfg.GameFactoryAddress, l1Client)
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to bind the fault dispute game factory contract: %w", err), s.Close())
return nil, errors.Join(fmt.Errorf("failed to bind the fault dispute game factory contract: %w", err), s.Stop(ctx))
}
loader := NewGameLoader(factory)

Expand All @@ -102,7 +105,7 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se

pollClient, err := opClient.NewRPCWithClient(ctx, logger, cfg.L1EthRpc, opClient.NewBaseRPCClient(l1Client.Client()), cfg.PollInterval)
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to create RPC client: %w", err), s.Close())
return nil, errors.Join(fmt.Errorf("failed to create RPC client: %w", err), s.Stop(ctx))
}
s.monitor = newGameMonitor(logger, cl, loader, s.sched, cfg.GameWindow, l1Client.BlockNumber, cfg.GameAllowlist, pollClient)

Expand All @@ -115,7 +118,9 @@ func NewService(ctx context.Context, logger log.Logger, cfg *config.Config) (*Se
// MonitorGame monitors the fault dispute game and attempts to progress it.
func (s *Service) MonitorGame(ctx context.Context) error {
s.sched.Start(ctx)
defer s.sched.Close()
defer s.Close()
return s.monitor.MonitorGames(ctx)
err := s.monitor.MonitorGames(ctx)
// The other ctx is the close-trigger.
// We need to refactor Service more to allow for graceful/force-shutdown granularity.
err = errors.Join(err, s.Stop(context.Background()))
return err
}
21 changes: 11 additions & 10 deletions op-heartbeat/service.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package op_heartbeat

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -41,7 +42,7 @@ func Main(version string) func(ctx *cli.Context) error {
oplog.SetGlobalLogHandler(l.GetHandler())
l.Info("starting heartbeat monitor", "version", version)

srv, err := Start(l, cfg, version)
srv, err := Start(cliCtx.Context, l, cfg, version)
if err != nil {
l.Crit("error starting application", "err", err)
}
Expand All @@ -54,29 +55,29 @@ func Main(version string) func(ctx *cli.Context) error {
syscall.SIGQUIT,
}...)
<-doneCh
return srv.Close()
return srv.Stop(context.Background())
}
}

type HeartbeatService struct {
pprof, metrics, http *httputil.HTTPServer
}

func (hs *HeartbeatService) Close() error {
func (hs *HeartbeatService) Stop(ctx context.Context) error {
var result error
if hs.pprof != nil {
result = errors.Join(result, hs.pprof.Close())
result = errors.Join(result, hs.pprof.Stop(ctx))
}
if hs.metrics != nil {
result = errors.Join(result, hs.metrics.Close())
result = errors.Join(result, hs.metrics.Stop(ctx))
}
if hs.http != nil {
result = errors.Join(result, hs.http.Close())
result = errors.Join(result, hs.http.Stop(ctx))
}
return result
}

func Start(l log.Logger, cfg Config, version string) (*HeartbeatService, error) {
func Start(ctx context.Context, l log.Logger, cfg Config, version string) (*HeartbeatService, error) {
hs := &HeartbeatService{}

registry := opmetrics.NewRegistry()
Expand All @@ -85,7 +86,7 @@ func Start(l log.Logger, cfg Config, version string) (*HeartbeatService, error)
l.Debug("starting metrics server", "addr", metricsCfg.ListenAddr, "port", metricsCfg.ListenPort)
metricsSrv, err := opmetrics.StartServer(registry, metricsCfg.ListenAddr, metricsCfg.ListenPort)
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to start metrics server: %w", err), hs.Close())
return nil, errors.Join(fmt.Errorf("failed to start metrics server: %w", err), hs.Stop(ctx))
}
hs.metrics = metricsSrv
l.Info("started metrics server", "addr", metricsSrv.Addr())
Expand All @@ -96,7 +97,7 @@ func Start(l log.Logger, cfg Config, version string) (*HeartbeatService, error)
l.Debug("starting pprof", "addr", pprofCfg.ListenAddr, "port", pprofCfg.ListenPort)
pprofSrv, err := oppprof.StartServer(pprofCfg.ListenAddr, pprofCfg.ListenPort)
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to start pprof server: %w", err), hs.Close())
return nil, errors.Join(fmt.Errorf("failed to start pprof server: %w", err), hs.Stop(ctx))
}
l.Info("started pprof server", "addr", pprofSrv.Addr())
hs.pprof = pprofSrv
Expand All @@ -121,7 +122,7 @@ func Start(l log.Logger, cfg Config, version string) (*HeartbeatService, error)
}),
httputil.WithMaxHeaderBytes(HTTPMaxHeaderSize))
if err != nil {
return nil, errors.Join(fmt.Errorf("failed to start HTTP server: %w", err), hs.Close())
return nil, errors.Join(fmt.Errorf("failed to start HTTP server: %w", err), hs.Stop(ctx))
}
hs.http = srv

Expand Down
4 changes: 2 additions & 2 deletions op-heartbeat/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ func TestService(t *testing.T) {
}

ctx, cancel := context.WithCancel(context.Background())
srv, err := Start(log.New(), cfg, "foobar")
srv, err := Start(ctx, log.New(), cfg, "foobar")
// Make sure that the service properly starts
require.NoError(t, err)

defer cancel()
defer func() {
require.NoError(t, srv.Close(), "close heartbeat server")
require.NoError(t, srv.Stop(ctx), "close heartbeat server")
}()

tests := []struct {
Expand Down
17 changes: 9 additions & 8 deletions op-node/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,9 @@ func (n *OpNode) Stop(ctx context.Context) error {
var result *multierror.Error

if n.server != nil {
n.server.Stop()
if err := n.server.Stop(ctx); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close RPC server: %w", err))
}
}
if n.p2pNode != nil {
if err := n.p2pNode.Close(); err != nil {
Expand Down Expand Up @@ -623,12 +625,12 @@ func (n *OpNode) Stop(ctx context.Context) error {

// Close metrics and pprof only after we are done idling
if n.pprofSrv != nil {
if err := n.pprofSrv.Close(); err != nil {
if err := n.pprofSrv.Stop(ctx); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close pprof server: %w", err))
}
}
if n.metricsSrv != nil {
if err := n.metricsSrv.Close(); err != nil {
if err := n.metricsSrv.Stop(ctx); err != nil {
result = multierror.Append(result, fmt.Errorf("failed to close metrics server: %w", err))
}
}
Expand All @@ -640,10 +642,9 @@ func (n *OpNode) Stopped() bool {
return n.closed.Load()
}

func (n *OpNode) ListenAddr() string {
return n.server.listenAddr.String()
}

func (n *OpNode) HTTPEndpoint() string {
return fmt.Sprintf("http://%s", n.ListenAddr())
if n.server == nil {
return ""
}
return fmt.Sprintf("http://%s", n.server.Addr().String())
}
Loading

0 comments on commit 7e4821e

Please sign in to comment.