Skip to content

Commit ac24b94

Browse files
committed
refactor: summary flush handling
1 parent a8d5276 commit ac24b94

File tree

10 files changed

+233
-123
lines changed

10 files changed

+233
-123
lines changed

cmd/nebula/cmd_crawl.go

+41-106
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ import (
1313

1414
kaddht "github.com/libp2p/go-libp2p-kad-dht"
1515
"github.com/libp2p/go-libp2p/core/network"
16-
"github.com/libp2p/go-libp2p/core/peer"
1716
log "github.com/sirupsen/logrus"
1817
"github.com/urfave/cli/v2"
1918

@@ -273,6 +272,11 @@ func CrawlAction(c *cli.Context) error {
273272
MeterProvider: cfg.Root.MeterProvider,
274273
}
275274

275+
var (
276+
summary *core.Summary
277+
runErr error
278+
)
279+
276280
switch cfg.Network {
277281
case string(config.NetworkEthExec):
278282

@@ -328,18 +332,7 @@ func CrawlAction(c *cli.Context) error {
328332
}
329333

330334
// finally, start the crawl
331-
queuedPeers, runErr := eng.Run(ctx)
332-
333-
// a bit ugly but, but the handler will contain crawl statistics, that
334-
// we'll save to the database and print to the screen
335-
handler.QueuedPeers = len(queuedPeers)
336-
if err := persistCrawlInformation(dbc, handler, runErr); err != nil {
337-
return fmt.Errorf("persist crawl information: %w", err)
338-
}
339-
340-
logSummary(handler, time.Since(start))
341-
342-
return nil
335+
summary, runErr = eng.Run(ctx)
343336

344337
case string(config.NetworkBitcoin):
345338
bpEnodes, err := cfg.BootstrapBitcoinEntries()
@@ -377,16 +370,7 @@ func CrawlAction(c *cli.Context) error {
377370
}
378371

379372
// finally, start the crawl
380-
queuedPeers, runErr := eng.Run(ctx)
381-
382-
// a bit ugly but, but the handler will contain crawl statistics, that
383-
// we'll save to the database and print to the screen
384-
handler.QueuedPeers = len(queuedPeers)
385-
if err := persistCrawlInformation(dbc, handler, runErr); err != nil {
386-
return fmt.Errorf("persist crawl information: %w", err)
387-
}
388-
389-
return nil
373+
summary, runErr = eng.Run(ctx)
390374

391375
case string(config.NetworkEthCons),
392376
string(config.NetworkHolesky),
@@ -458,18 +442,8 @@ func CrawlAction(c *cli.Context) error {
458442
}
459443

460444
// finally, start the crawl
461-
queuedPeers, runErr := eng.Run(ctx)
445+
summary, runErr = eng.Run(ctx)
462446

463-
// a bit ugly but, but the handler will contain crawl statistics, that
464-
// we'll save to the database and print to the screen
465-
handler.QueuedPeers = len(queuedPeers)
466-
if err := persistCrawlInformation(dbc, handler, runErr); err != nil {
467-
return fmt.Errorf("persist crawl information: %w", err)
468-
}
469-
470-
logSummary(handler, time.Since(start))
471-
472-
return nil
473447
default:
474448

475449
addrInfos, err := cfg.BootstrapAddrInfos()
@@ -511,22 +485,20 @@ func CrawlAction(c *cli.Context) error {
511485
}
512486

513487
// finally, start the crawl
514-
queuedPeers, runErr := eng.Run(ctx)
515-
516-
// a bit ugly but, but the handler will contain crawl statistics, that
517-
// we'll save to the database and print to the screen
518-
handler.QueuedPeers = len(queuedPeers)
519-
if err := persistCrawlInformation(dbc, handler, runErr); err != nil {
520-
return fmt.Errorf("persist crawl information: %w", err)
521-
}
488+
summary, runErr = eng.Run(ctx)
489+
}
522490

523-
logSummary(handler, time.Since(start))
491+
// we're done with the crawl so seal the crawl and store aggregate information
492+
if err := persistCrawlInformation(dbc, summary, runErr); err != nil {
493+
return fmt.Errorf("persist crawl information: %w", err)
524494
}
525495

496+
logSummary(summary, time.Since(start))
497+
526498
return nil
527499
}
528500

529-
func persistCrawlInformation[I core.PeerInfo[I]](dbc db.Client, handler *core.CrawlHandler[I], runErr error) error {
501+
func persistCrawlInformation(dbc db.Client, summary *core.Summary, runErr error) error {
530502
// construct a new cleanup context to store the crawl results even
531503
// if the user cancelled the process.
532504
sigs := make(chan os.Signal, 1)
@@ -541,36 +513,36 @@ func persistCrawlInformation[I core.PeerInfo[I]](dbc db.Client, handler *core.Cr
541513
}()
542514

543515
// Persist the crawl results
544-
if err := updateCrawl(cleanupCtx, dbc, runErr, handler); err != nil {
516+
if err := updateCrawl(cleanupCtx, dbc, runErr, summary); err != nil {
545517
return fmt.Errorf("persist crawl: %w", err)
546518
}
547519

548520
// Persist associated crawl properties
549-
if err := persistCrawlProperties(cleanupCtx, dbc, handler); err != nil {
521+
if err := persistCrawlProperties(cleanupCtx, dbc, summary); err != nil {
550522
return fmt.Errorf("persist crawl properties: %w", err)
551523
}
552524

553-
// persist all neighbor information
554-
if err := storeNeighbors(cleanupCtx, dbc, handler); err != nil {
555-
return fmt.Errorf("store neighbors: %w", err)
525+
// flush any left-over information to the database.
526+
if err := dbc.Flush(cleanupCtx); err != nil {
527+
log.WithError(err).Warnln("Failed flushing information to database")
556528
}
557529

558530
return nil
559531
}
560532

561533
// updateCrawl writes crawl statistics to the database
562-
func updateCrawl[I core.PeerInfo[I]](ctx context.Context, dbc db.Client, runErr error, handler *core.CrawlHandler[I]) error {
534+
func updateCrawl(ctx context.Context, dbc db.Client, runErr error, summary *core.Summary) error {
563535
if _, ok := dbc.(*db.NoopClient); ok {
564536
return nil
565537
}
566538

567539
log.Infoln("Persisting crawl result...")
568540

569541
args := &db.SealCrawlArgs{
570-
Crawled: handler.CrawledPeers,
571-
Dialable: handler.CrawledPeers - handler.TotalErrors(),
572-
Undialable: handler.TotalErrors(),
573-
Remaining: handler.QueuedPeers,
542+
Crawled: summary.PeersCrawled,
543+
Dialable: summary.PeersDialable,
544+
Undialable: summary.PeersUndialable,
545+
Remaining: summary.PeersRemaining,
574546
}
575547

576548
if runErr == nil {
@@ -585,92 +557,55 @@ func updateCrawl[I core.PeerInfo[I]](ctx context.Context, dbc db.Client, runErr
585557
}
586558

587559
// persistCrawlProperties writes crawl property statistics to the database.
588-
func persistCrawlProperties[I core.PeerInfo[I]](ctx context.Context, dbc db.Client, handler *core.CrawlHandler[I]) error {
560+
func persistCrawlProperties(ctx context.Context, dbc db.Client, summary *core.Summary) error {
589561
if _, ok := dbc.(*db.NoopClient); ok {
590562
return nil
591563
}
592564

593565
log.Infoln("Persisting crawl properties...")
594566
avFull := map[string]int{}
595-
for version, count := range handler.AgentVersion {
567+
for version, count := range summary.AgentVersion {
596568
avFull[version] += count
597569
}
598570
pps := map[string]map[string]int{
599571
"agent_version": avFull,
600-
"protocol": handler.Protocols,
601-
"error": handler.ConnErrs,
572+
"protocol": summary.Protocols,
573+
"error": summary.ConnErrs,
602574
}
603575

604576
return dbc.InsertCrawlProperties(ctx, pps)
605577
}
606578

607-
// storeNeighbors fills the neighbors table with topology information
608-
func storeNeighbors[I core.PeerInfo[I]](ctx context.Context, dbc db.Client, handler *core.CrawlHandler[I]) error {
609-
if _, ok := dbc.(*db.NoopClient); ok {
610-
return nil
611-
}
612-
613-
if len(handler.RoutingTables) == 0 {
614-
return nil
615-
}
616-
617-
log.Infoln("Storing neighbor information...")
618-
619-
start := time.Now()
620-
neighborsCount := 0
621-
i := 0
622-
for p, routingTable := range handler.RoutingTables {
623-
if i%100 == 0 && i > 0 {
624-
log.Infof("Stored %d peers and their neighbors", i)
625-
}
626-
i++
627-
neighborsCount += len(routingTable.Neighbors)
628-
629-
neighbors := make([]peer.ID, len(routingTable.Neighbors))
630-
for j, n := range routingTable.Neighbors {
631-
neighbors[j] = n.ID()
632-
}
633-
634-
if err := dbc.InsertNeighbors(ctx, p, neighbors, routingTable.ErrorBits); err != nil {
635-
return fmt.Errorf("persiting neighbor information: %w", err)
636-
}
637-
}
638-
log.WithFields(log.Fields{
639-
"duration": time.Since(start).String(),
640-
"avg": fmt.Sprintf("%.2fms", time.Since(start).Seconds()/float64(len(handler.RoutingTables))*1000),
641-
"peers": len(handler.RoutingTables),
642-
"totalNeighbors": neighborsCount,
643-
}).Infoln("Finished storing neighbor information")
644-
return nil
645-
}
646-
647579
// logSummary logs the final results of the crawl.
648-
func logSummary[I core.PeerInfo[I]](handler *core.CrawlHandler[I], crawlDuration time.Duration) {
580+
func logSummary(summary *core.Summary, crawlDuration time.Duration) {
581+
log.Infoln("")
582+
log.Infoln("")
649583
log.Infoln("Crawl summary:")
650584

651585
log.Infoln("")
652-
for err, count := range handler.ConnErrs {
586+
for err, count := range summary.ConnErrs {
653587
log.WithField("count", count).WithField("value", err).Infoln("Dial Error")
654588
}
655589

656590
log.Infoln("")
657-
for err, count := range handler.CrawlErrs {
591+
for err, count := range summary.CrawlErrs {
658592
log.WithField("count", count).WithField("value", err).Infoln("Crawl Error")
659593
}
660594

661595
log.Infoln("")
662-
for agent, count := range handler.AgentVersion {
596+
for agent, count := range summary.AgentVersion {
663597
log.WithField("count", count).WithField("value", agent).Infoln("Agent")
664598
}
665599
log.Infoln("")
666-
for protocol, count := range handler.Protocols {
600+
for protocol, count := range summary.Protocols {
667601
log.WithField("count", count).WithField("value", protocol).Infoln("Protocol")
668602
}
669603
log.Infoln("")
670604
log.WithFields(log.Fields{
671-
"crawledPeers": handler.CrawledPeers,
605+
"crawledPeers": summary.PeersCrawled,
672606
"crawlDuration": crawlDuration.String(),
673-
"dialablePeers": handler.CrawledPeers - handler.TotalErrors(),
674-
"undialablePeers": handler.TotalErrors(),
607+
"dialablePeers": summary.PeersDialable,
608+
"undialablePeers": summary.PeersUndialable,
609+
"remainingPeers": summary.PeersRemaining,
675610
}).Infoln("Finished crawl")
676611
}

core/core.go

+43-2
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,7 @@ type Worker[T any, R any] interface {
7171
Work(ctx context.Context, task T) (R, error)
7272
}
7373

74-
// Handler defines the interface that the engine will call every time
75-
// it has received a result from any of its workers.
74+
// Handler defines the interface for managing peer processing and writing.
7675
type Handler[I PeerInfo[I], R WorkResult[I]] interface {
7776
// HandlePeerResult is called when the worker that has processed a peer
7877
// has emitted a new processing result. This can be a [CrawlResult] or
@@ -82,6 +81,48 @@ type Handler[I PeerInfo[I], R WorkResult[I]] interface {
8281
// HandleWriteResult is called when the writer has written a [CrawlResult]
8382
// or [DialResult] to disk.
8483
HandleWriteResult(context.Context, Result[WriteResult])
84+
85+
// Summary returns aggregate information about the crawl. Since the handler
86+
// processes all results it can track aggregate information along the crawl.
87+
// The Summary method returns that information which is then printed onto
88+
// the screen or stored into a database as well. It takes an argument of
89+
// the engine's current internal state which can be used to enhance the
90+
// summary information.
91+
Summary(*EngineState) *Summary
92+
}
93+
94+
// EngineState represents a subset of the internal state of the [Engine]. This
95+
// is used to compile aggregate summary information in the peer [Handler].
96+
type EngineState struct {
97+
// PeersQueued indicates the count of peers currently queued for processing.
98+
PeersQueued int
99+
}
100+
101+
// Summary represents aggregate information for a crawl operation.
102+
type Summary struct {
103+
// PeersCrawled is the number of peers successfully crawled.
104+
PeersCrawled int
105+
106+
// PeersDialable is the number of peers marked as dialable.
107+
PeersDialable int
108+
109+
// PeersUndialable is the number of peers marked as undialable.
110+
PeersUndialable int
111+
112+
// PeersRemaining is the number of peers left to process.
113+
PeersRemaining int
114+
115+
// AgentVersion maps agent versions to their respective counts.
116+
AgentVersion map[string]int
117+
118+
// Protocols maps protocol names to their respective counts.
119+
Protocols map[string]int
120+
121+
// ConnErrs maps connection error types to their occurrence counts.
122+
ConnErrs map[string]int
123+
124+
// CrawlErrs maps crawl error types to their occurrence counts.
125+
CrawlErrs map[string]int
85126
}
86127

87128
// RoutingTable captures the routing table information and crawl error of a particular peer

core/engine.go

+11-4
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ func NewEngine[I PeerInfo[I], R WorkResult[I]](driver Driver[I, R], handler Hand
206206
// channel was closed, the engine will process all remaining peers in the queue.
207207
// Each result is passed to a handler that may return additional peers to
208208
// process.
209-
func (e *Engine[I, R]) Run(ctx context.Context) (map[string]I, error) {
209+
func (e *Engine[I, R]) Run(ctx context.Context) (*Summary, error) {
210210
ctx, cancel := context.WithCancel(ctx)
211211
defer cancel()
212212

@@ -338,13 +338,19 @@ func (e *Engine[I, R]) Run(ctx context.Context) (map[string]I, error) {
338338
// stop the telemetry collection
339339
e.telemetry.Stop()
340340

341-
return e.peerQueue.All(), ctx.Err()
341+
summary := e.handler.Summary(&EngineState{
342+
PeersQueued: e.peerQueue.Len(),
343+
})
344+
return summary, ctx.Err()
342345
}
343346

344347
if peerResults == nil && writerResults == nil {
345348
log.Infoln("Closing driver...")
346349
e.driver.Close()
347-
return e.peerQueue.All(), nil // no work to do, natural end
350+
summary := e.handler.Summary(&EngineState{
351+
PeersQueued: e.peerQueue.Len(),
352+
})
353+
return summary, nil // no work to do, natural end
348354
}
349355

350356
// break the for loop after 1) all workers have stopped or 2) we have
@@ -391,10 +397,11 @@ func (e *Engine[I, R]) handlePeerResult(ctx context.Context, result Result[R]) {
391397
e.enqueueTask(task)
392398
}
393399

400+
pct := 100 * float64(len(e.processed)) / float64(len(e.processed)+e.peerQueue.Len()+len(e.inflight))
394401
logEntry.WithFields(map[string]interface{}{
395402
"queued": e.peerQueue.Len(),
396403
"inflight": len(e.inflight),
397-
}).Infoln("Handled worker result")
404+
}).Infof("Handled worker result [%.2f%%]\n", pct)
398405
}
399406

400407
func (e *Engine[I, R]) handleWriteResult(ctx context.Context, result Result[WriteResult]) {

0 commit comments

Comments
 (0)