Skip to content

Commit fd25f1d

Browse files
authored
services/horizon/cmd: Run metrics server when invoking horizon ingest commands (#5816)
1 parent ff542a0 commit fd25f1d

File tree

3 files changed

+89
-65
lines changed

3 files changed

+89
-65
lines changed

services/horizon/cmd/db.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -211,21 +211,23 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool,
211211
}
212212
defer system.Shutdown()
213213

214-
err = system.ReingestRange(ledgerRanges, reingestForce, true)
215-
if err != nil {
216-
if _, ok := errors.Cause(err).(ingest.ErrReingestRangeConflict); ok {
217-
return fmt.Errorf(`The range you have provided overlaps with Horizon's most recently ingested ledger.
214+
return runWithMetrics(config.AdminPort, system, func() error {
215+
err = system.ReingestRange(ledgerRanges, reingestForce, true)
216+
if err != nil {
217+
if _, ok := errors.Cause(err).(ingest.ErrReingestRangeConflict); ok {
218+
return fmt.Errorf(`The range you have provided overlaps with Horizon's most recently ingested ledger.
218219
It is not possible to run the reingest command on this range in parallel with
219220
Horizon's ingestion system.
220221
Either reduce the range so that it doesn't overlap with Horizon's ingestion system,
221222
or, use the force flag to ensure that Horizon's ingestion system is blocked until
222223
the reingest command completes.`)
223-
}
224+
}
224225

225-
return err
226-
}
227-
hlog.Info("Range run successfully!")
228-
return nil
226+
return err
227+
}
228+
hlog.Info("Range run successfully!")
229+
return nil
230+
})
229231
}
230232

231233
func runDBDetectGaps(config horizon.Config) ([]history.LedgerRange, error) {

services/horizon/cmd/ingest.go

Lines changed: 71 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,16 @@ import (
88
_ "net/http/pprof"
99
"time"
1010

11+
"github.com/go-chi/chi"
12+
chimiddleware "github.com/go-chi/chi/middleware"
13+
"github.com/prometheus/client_golang/prometheus"
1114
"github.com/spf13/cobra"
1215
"github.com/spf13/viper"
1316

1417
"github.com/stellar/go/historyarchive"
1518
horizon "github.com/stellar/go/services/horizon/internal"
1619
"github.com/stellar/go/services/horizon/internal/db2/history"
20+
"github.com/stellar/go/services/horizon/internal/httpx"
1721
"github.com/stellar/go/services/horizon/internal/ingest"
1822
"github.com/stellar/go/support/config"
1923
support "github.com/stellar/go/support/config"
@@ -23,7 +27,7 @@ import (
2327

2428
var ingestBuildStateSequence uint32
2529
var ingestBuildStateSkipChecks bool
26-
var ingestVerifyFrom, ingestVerifyTo, ingestVerifyDebugServerPort uint32
30+
var ingestVerifyFrom, ingestVerifyTo uint32
2731
var ingestVerifyState bool
2832
var ingestVerifyStorageBackendConfigPath string
2933
var ingestVerifyLedgerBackendType ingest.LedgerBackendType
@@ -73,14 +77,6 @@ var ingestVerifyRangeCmdOpts = support.ConfigOptions{
7377
FlagDefault: false,
7478
Usage: "[optional] verifies state at the last ledger of the range when true",
7579
},
76-
{
77-
Name: "debug-server-port",
78-
ConfigKey: &ingestVerifyDebugServerPort,
79-
OptType: types.Uint32,
80-
Required: false,
81-
FlagDefault: uint32(0),
82-
Usage: "[optional] opens a net/http/pprof server at given port",
83-
},
8480
generateLedgerBackendOpt(&ingestVerifyLedgerBackendType),
8581
generateDatastoreConfigOpt(&ingestVerifyStorageBackendConfigPath),
8682
}
@@ -159,19 +155,6 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
159155
return err
160156
}
161157

162-
if ingestVerifyDebugServerPort != 0 {
163-
go func() {
164-
log.Infof("Starting debug server at: %d", ingestVerifyDebugServerPort)
165-
err := http.ListenAndServe(
166-
fmt.Sprintf("localhost:%d", ingestVerifyDebugServerPort),
167-
nil,
168-
)
169-
if err != nil {
170-
log.Error(err)
171-
}
172-
}()
173-
}
174-
175158
mngr := historyarchive.NewCheckpointManager(horizonConfig.CheckpointFrequency)
176159
if !mngr.IsCheckpoint(ingestVerifyFrom) && ingestVerifyFrom != 1 {
177160
return fmt.Errorf("`--from` must be a checkpoint ledger")
@@ -249,17 +232,12 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
249232
if err != nil {
250233
return err
251234
}
252-
253-
err = system.StressTest(
254-
stressTestNumTransactions,
255-
stressTestChangesPerTransaction,
256-
)
257-
if err != nil {
258-
return err
259-
}
260-
261-
log.Info("Stress test completed successfully!")
262-
return nil
235+
return runWithMetrics(horizonConfig.AdminPort, system, func() error {
236+
return system.StressTest(
237+
stressTestNumTransactions,
238+
stressTestChangesPerTransaction,
239+
)
240+
})
263241
},
264242
}
265243

@@ -365,16 +343,12 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
365343
return err
366344
}
367345

368-
err = system.BuildState(
369-
ingestBuildStateSequence,
370-
ingestBuildStateSkipChecks,
371-
)
372-
if err != nil {
373-
return err
374-
}
375-
376-
log.Info("State built successfully!")
377-
return nil
346+
return runWithMetrics(horizonConfig.AdminPort, system, func() error {
347+
return system.BuildState(
348+
ingestBuildStateSequence,
349+
ingestBuildStateSkipChecks,
350+
)
351+
})
378352
},
379353
}
380354

@@ -444,11 +418,13 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
444418
return err
445419
}
446420

447-
return system.LoadTest(
448-
ingestionLoadTestLedgersPath,
449-
ingestionLoadTestCloseDuration,
450-
ingestionLoadTestFixturesPath,
451-
)
421+
return runWithMetrics(horizonConfig.AdminPort, system, func() error {
422+
return system.LoadTest(
423+
ingestionLoadTestLedgersPath,
424+
ingestionLoadTestCloseDuration,
425+
ingestionLoadTestFixturesPath,
426+
)
427+
})
452428
},
453429
}
454430

@@ -496,6 +472,46 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
496472
)
497473
}
498474

475+
func runWithMetrics(metricsPort uint, system ingest.System, f func() error) error {
476+
if metricsPort != 0 {
477+
log.Infof("Starting metrics server at: %d", metricsPort)
478+
mux := chi.NewMux()
479+
mux.Use(chimiddleware.StripSlashes)
480+
mux.Use(chimiddleware.RequestID)
481+
mux.Use(chimiddleware.RequestLogger(&chimiddleware.DefaultLogFormatter{
482+
Logger: log.DefaultLogger,
483+
NoColor: true,
484+
}))
485+
registry := prometheus.NewRegistry()
486+
system.RegisterMetrics(registry)
487+
httpx.AddMetricRoutes(mux, registry)
488+
metricsServer := &http.Server{
489+
Addr: fmt.Sprintf(":%d", metricsPort),
490+
Handler: mux,
491+
ReadTimeout: 5 * time.Second,
492+
}
493+
go func() {
494+
if err := metricsServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
495+
log.Fatalf("error running metrics server: %v", err)
496+
}
497+
}()
498+
defer func() {
499+
log.Info("Waiting for metrics to be flushed")
500+
// by default, the scrape_interval for prometheus is 1 minute
501+
// so if we sleep for 1.5 minutes we ensure that all remaining metrics
502+
// will be picked up by the prometheus scraper
503+
time.Sleep(time.Minute + time.Second*30)
504+
log.Info("Shutting down metrics server...")
505+
if err := metricsServer.Shutdown(context.Background()); err != nil {
506+
log.Warnf("error shutting down metrics server: %v", err)
507+
}
508+
}()
509+
} else {
510+
log.Info("Metrics server disabled")
511+
}
512+
return f()
513+
}
514+
499515
func init() {
500516
DefineIngestCommands(RootCmd, globalConfig, globalFlags)
501517
}
@@ -525,11 +541,13 @@ func processVerifyRange(horizonConfig *horizon.Config, horizonFlags config.Confi
525541
return err
526542
}
527543

528-
return system.VerifyRange(
529-
ingestVerifyFrom,
530-
ingestVerifyTo,
531-
ingestVerifyState,
532-
)
544+
return runWithMetrics(horizonConfig.AdminPort, system, func() error {
545+
return system.VerifyRange(
546+
ingestVerifyFrom,
547+
ingestVerifyTo,
548+
ingestVerifyState,
549+
)
550+
})
533551
}
534552

535553
// generateDatastoreConfigOpt returns a *support.ConfigOption for the datastore-config flag

services/horizon/internal/httpx/router.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -394,9 +394,7 @@ func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRate
394394
w.Header().Set("Content-Type", "application/openapi+yaml")
395395
w.Write(p)
396396
})
397-
r.Internal.Get("/metrics", promhttp.HandlerFor(config.PrometheusRegistry, promhttp.HandlerOpts{}).ServeHTTP)
398-
r.Internal.Get("/debug/pprof/heap", pprof.Index)
399-
r.Internal.Get("/debug/pprof/profile", pprof.Profile)
397+
AddMetricRoutes(r.Internal, config.PrometheusRegistry)
400398
r.Internal.Route("/ingestion/filters", func(r chi.Router) {
401399
handler := actions.FilterConfigHandler{}
402400
r.With(historyMiddleware).Put("/asset", handler.UpdateAssetConfig)
@@ -405,3 +403,9 @@ func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRate
405403
r.With(historyMiddleware).Get("/account", handler.GetAccountConfig)
406404
})
407405
}
406+
407+
func AddMetricRoutes(mux *chi.Mux, metrics *prometheus.Registry) {
408+
mux.Get("/metrics", promhttp.HandlerFor(metrics, promhttp.HandlerOpts{}).ServeHTTP)
409+
mux.Get("/debug/pprof/heap", pprof.Index)
410+
mux.Get("/debug/pprof/profile", pprof.Profile)
411+
}

0 commit comments

Comments
 (0)