-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathgpbft.go
1493 lines (1348 loc) · 52.1 KB
/
gpbft.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package gpbft
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"math"
"slices"
"sort"
"time"
"github.com/filecoin-project/go-bitfield"
rlepluslazy "github.com/filecoin-project/go-bitfield/rle"
"github.com/ipfs/go-cid"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
type Phase uint8
const (
INITIAL_PHASE Phase = iota
QUALITY_PHASE
CONVERGE_PHASE
PREPARE_PHASE
COMMIT_PHASE
DECIDE_PHASE
TERMINATED_PHASE
)
func (p Phase) String() string {
switch p {
case INITIAL_PHASE:
return "INITIAL"
case QUALITY_PHASE:
return "QUALITY"
case CONVERGE_PHASE:
return "CONVERGE"
case PREPARE_PHASE:
return "PREPARE"
case COMMIT_PHASE:
return "COMMIT"
case DECIDE_PHASE:
return "DECIDE"
case TERMINATED_PHASE:
return "TERMINATED"
default:
return "UNKNOWN"
}
}
const DomainSeparationTag = "GPBFT"
// A message in the Granite protocol.
// The same message structure is used for all rounds and phases.
// Note that the message is self-attesting so no separate envelope or signature is needed.
// - The signature field fixes the included sender ID via the implied public key;
// - The signature payload includes all fields a sender can freely choose;
// - The ticket field is a signature of the same public key, so also self-attesting.
type GMessage struct {
// ID of the sender/signer of this message (a miner actor ID).
Sender ActorID
// Vote is the payload that is signed by the signature
Vote Payload
// Signature by the sender's public key over Instance || Round || Phase || Value.
Signature []byte `cborgen:"maxlen=96"`
// VRF ticket for CONVERGE messages (otherwise empty byte array).
Ticket Ticket `cborgen:"maxlen=96"`
// Justification for this message (some messages must be justified by a strong quorum of messages from some previous phase).
Justification *Justification
}
type Justification struct {
// Vote is the payload that is signed by the signature
Vote Payload
// Indexes in the base power table of the signers (bitset)
Signers bitfield.BitField
// BLS aggregate signature of signers
Signature []byte `cborgen:"maxlen=96"`
}
type SupplementalData struct {
// Merkle-tree of instance-specific commitments. Currently empty but this will eventually
// include things like snark-friendly power-table commitments.
Commitments [32]byte `cborgen:"maxlen=32"`
// The DagCBOR-blake2b256 CID of the power table used to validate the next instance, taking
// lookback into account.
PowerTable cid.Cid // []PowerEntry
}
func (d *SupplementalData) Eq(other *SupplementalData) bool {
return d.Commitments == other.Commitments && d.PowerTable == other.PowerTable
}
// Custom JSON marshalling for SupplementalData to achieve a commitment field
// that is a base64-encoded string.
type supplementalDataSub SupplementalData
type supplementalDataJson struct {
Commitments []byte
*supplementalDataSub
}
func (sd SupplementalData) MarshalJSON() ([]byte, error) {
return json.Marshal(&supplementalDataJson{
Commitments: sd.Commitments[:],
supplementalDataSub: (*supplementalDataSub)(&sd),
})
}
func (sd *SupplementalData) UnmarshalJSON(b []byte) error {
aux := &supplementalDataJson{supplementalDataSub: (*supplementalDataSub)(sd)}
var err error
if err = json.Unmarshal(b, &aux); err != nil {
return err
}
if len(aux.Commitments) != 32 {
return errors.New("commitments must be 32 bytes")
}
copy(sd.Commitments[:], aux.Commitments)
return nil
}
// Fields of the message that make up the signature payload.
type Payload struct {
// GossiPBFT instance (epoch) number.
Instance uint64
// GossiPBFT round number.
Round uint64
// GossiPBFT phase name.
Phase Phase
// The common data.
SupplementalData SupplementalData
// The value agreed-upon in a single instance.
Value *ECChain
}
func (p *Payload) Eq(other *Payload) bool {
if p == other {
return true
}
if other == nil {
return false
}
return p.Instance == other.Instance &&
p.Round == other.Round &&
p.Phase == other.Phase &&
p.SupplementalData.Eq(&other.SupplementalData) &&
p.Value.Eq(other.Value)
}
func (p *Payload) MarshalForSigning(nn NetworkName) []byte {
var buf bytes.Buffer
buf.WriteString(DomainSeparationTag)
buf.WriteString(":")
buf.WriteString(string(nn))
buf.WriteString(":")
_ = binary.Write(&buf, binary.BigEndian, p.Phase)
_ = binary.Write(&buf, binary.BigEndian, p.Round)
_ = binary.Write(&buf, binary.BigEndian, p.Instance)
_, _ = buf.Write(p.SupplementalData.Commitments[:])
key := p.Value.Key()
_, _ = buf.Write(key[:])
_, _ = buf.Write(p.SupplementalData.PowerTable.Bytes())
return buf.Bytes()
}
func (m GMessage) String() string {
return fmt.Sprintf("%s{%d}(%d %s)", m.Vote.Phase, m.Vote.Instance, m.Vote.Round, m.Vote.Value)
}
// A single Granite consensus instance.
type instance struct {
participant *Participant
// The EC chain input to this instance.
input *ECChain
// The power table for the base chain, used for power in this instance.
powerTable *PowerTable
// The aggregate signature verifier/aggregator.
aggregateVerifier Aggregate
// The beacon value from the base chain, used for tickets in this instance.
beacon []byte
// current stores information about the current GPBFT instant in terms of
// instance ID, round and phase.
current InstanceProgress
// Time at which the current phase can or must end.
// For QUALITY, PREPARE, and COMMIT, this is the latest time (the phase can end sooner).
// For CONVERGE, this is the exact time (the timeout solely defines the phase end).
phaseTimeout time.Time
// rebroadcastTimeout is the time at which the current phase should attempt to
// rebroadcast messages in order to further its progress.
//
// See tryRebroadcast.
rebroadcastTimeout time.Time
// rebroadcastAttempts counts the number of times messages at a round have been
// rebroadcasted in order to determine the backoff duration until next rebroadcast.
//
// See tryRebroadcast.
rebroadcastAttempts int
// Supplemental data that all participants must agree on ahead of time. Messages that
// propose supplemental data that differs with our supplemental data will be discarded.
supplementalData *SupplementalData
// This instance's proposal for the current round. Never bottom.
// This is set after the QUALITY phase, and changes only at the end of a full round.
proposal *ECChain
// The value to be transmitted at the next phase, which may be bottom.
// This value may change away from the proposal between phases.
value *ECChain
// candidates contain a set of values that are acceptable candidates to this
// instance. This includes the base chain, all prefixes of proposal that found a
// strong quorum of support in the QUALITY phase or late arriving quality
// messages, including any chains that could possibly have been decided by
// another participant.
candidates map[ECChainKey]struct{}
// The final termination value of the instance, for communication to the participant.
// This field is an alternative to plumbing an optional decision value out through
// all the method calls, or holding a callback handle to receive it here.
terminationValue *Justification
// Quality phase state (only for round 0)
quality *quorumState
// State for each round of phases.
// State from prior rounds must be maintained to provide justification for values in subsequent rounds.
rounds map[uint64]*roundState
// Decision state. Collects DECIDE messages until a decision can be made,
// independently of protocol phases/rounds.
decision *quorumState
// tracer traces logic logs for debugging and simulation purposes.
tracer Tracer
}
func newInstance(
participant *Participant,
instanceID uint64,
input *ECChain,
data *SupplementalData,
powerTable *PowerTable,
aggregateVerifier Aggregate,
beacon []byte) (*instance, error) {
if input.IsZero() {
return nil, fmt.Errorf("input is empty")
}
metrics.phaseCounter.Add(context.TODO(), 1, metric.WithAttributes(attrInitialPhase))
metrics.currentInstance.Record(context.TODO(), int64(instanceID))
metrics.currentPhase.Record(context.TODO(), int64(INITIAL_PHASE))
metrics.currentRound.Record(context.TODO(), int64(0))
{
totalPowerFloat, _ := powerTable.Total.Float64()
metrics.totalPower.Record(context.TODO(), totalPowerFloat)
}
return &instance{
participant: participant,
input: input,
powerTable: powerTable,
aggregateVerifier: aggregateVerifier,
beacon: beacon,
current: InstanceProgress{
Instant: Instant{
ID: instanceID,
Round: 0,
Phase: INITIAL_PHASE,
},
Input: input,
},
supplementalData: data,
proposal: input,
value: &ECChain{},
candidates: map[ECChainKey]struct{}{
input.BaseChain().Key(): {},
},
quality: newQuorumState(powerTable, attrQualityPhase, attrKeyRound.Int(0)),
rounds: map[uint64]*roundState{
0: newRoundState(0, powerTable),
},
decision: newQuorumState(powerTable, attrDecidePhase, attrKeyRound.Int(0)),
tracer: participant.tracer,
}, nil
}
type roundState struct {
converged *convergeState
prepared *quorumState
committed *quorumState
}
func newRoundState(roundNumber uint64, powerTable *PowerTable) *roundState {
roundAttr := attrKeyRound.Int(int(roundNumber))
return &roundState{
converged: newConvergeState(roundAttr),
prepared: newQuorumState(powerTable, attrPreparePhase, roundAttr),
committed: newQuorumState(powerTable, attrCommitPhase, roundAttr),
}
}
func (i *instance) Start() error {
return i.beginQuality()
}
// Receives and processes a message.
// Returns an error indicating either message invalidation or a programming error.
func (i *instance) Receive(msg *GMessage) error {
if i.terminated() {
return ErrReceivedAfterTermination
}
stateChanged, err := i.receiveOne(msg)
if err != nil {
return err
}
if stateChanged {
// Further process the message's round only if it may have had an effect.
// This avoids loading state for dropped messages (including spam).
i.postReceive(msg.Vote.Round)
}
return nil
}
// Receives and processes a batch of queued messages.
// Messages should be ordered by round for most effective processing.
func (i *instance) ReceiveMany(msgs []*GMessage) error {
if i.terminated() {
return ErrReceivedAfterTermination
}
// Received each message and remember which rounds were received.
roundsReceived := map[uint64]struct{}{}
for _, msg := range msgs {
stateChanged, err := i.receiveOne(msg)
if err != nil {
if errors.As(err, &ValidationError{}) {
// Drop late-binding validation errors.
i.log("dropping invalid message: %s", err)
} else {
return err
}
}
if stateChanged {
roundsReceived[msg.Vote.Round] = struct{}{}
}
}
// Build unique, ordered list of rounds received.
rounds := make([]uint64, 0, len(roundsReceived))
for r := range roundsReceived {
rounds = append(rounds, r)
}
sort.Slice(rounds, func(i, j int) bool { return rounds[i] < rounds[j] })
i.postReceive(rounds...)
return nil
}
func (i *instance) ReceiveAlarm() error {
if err := i.tryCurrentPhase(); err != nil {
return fmt.Errorf("failed completing protocol phase: %w", err)
}
return nil
}
func (i *instance) Describe() string {
return fmt.Sprintf("{%d}, round %d, phase %s", i.current.ID, i.current.Round, i.current.Phase)
}
// Processes a single message.
// Returns true if the message might have caused a change in state.
func (i *instance) receiveOne(msg *GMessage) (bool, error) {
// Check the message is for this instance, to guard against programming error.
if msg.Vote.Instance != i.current.ID {
return false, fmt.Errorf("%w: message for instance %d, expected %d",
ErrReceivedWrongInstance, msg.Vote.Instance, i.current.ID)
}
// Perform validation that could not be done until the instance started.
// Check supplemental data matches this instance's expectation.
if !msg.Vote.SupplementalData.Eq(i.supplementalData) {
return false, fmt.Errorf("%w: message supplement %s, expected %s",
ErrValidationWrongSupplement, msg.Vote.SupplementalData, i.supplementalData)
}
// Check proposal has the expected base chain.
if !(msg.Vote.Value.IsZero() || msg.Vote.Value.HasBase(i.input.Base())) {
return false, fmt.Errorf("%w: message base %s, expected %s",
ErrValidationWrongBase, msg.Vote.Value, i.input.Base())
}
if i.current.Phase == TERMINATED_PHASE {
return false, nil // No-op
}
// Ignore CONVERGE and PREPARE messages for prior rounds.
forPriorRound := msg.Vote.Round < i.current.Round
if (forPriorRound && msg.Vote.Phase == CONVERGE_PHASE) ||
(forPriorRound && msg.Vote.Phase == PREPARE_PHASE) {
return false, nil
}
// Drop message that:
// * belong to future rounds, beyond the configured max lookahead threshold, and
// * carry no justification, i.e. are spammable.
beyondMaxLookaheadRounds := msg.Vote.Round > i.current.Round+i.participant.maxLookaheadRounds
if beyondMaxLookaheadRounds && isSpammable(msg) {
return false, nil
}
// Load the round state and process further only valid, non-spammable messages.
// Equivocations are handled by the quorum state.
msgRound := i.getRound(msg.Vote.Round)
switch msg.Vote.Phase {
case QUALITY_PHASE:
// Receive each prefix of the proposal independently, which is accepted at any
// round/phase.
i.quality.ReceiveEachPrefix(msg.Sender, msg.Vote.Value)
// If the instance has surpassed QUALITY phase, update the candidates based
// on possible quorum of input prefixes.
if i.current.Phase != QUALITY_PHASE {
return true, i.updateCandidatesFromQuality()
}
case CONVERGE_PHASE:
if err := msgRound.converged.Receive(msg.Sender, i.powerTable, msg.Vote.Value, msg.Ticket, msg.Justification); err != nil {
return false, fmt.Errorf("failed processing CONVERGE message: %w", err)
}
case PREPARE_PHASE:
msgRound.prepared.Receive(msg.Sender, msg.Vote.Value, msg.Signature)
case COMMIT_PHASE:
msgRound.committed.Receive(msg.Sender, msg.Vote.Value, msg.Signature)
// The only justifications that need to be stored for future propagation are for COMMITs
// to non-bottom values.
// This evidence can be brought forward to justify a CONVERGE message in the next round.
if !msg.Vote.Value.IsZero() {
msgRound.committed.ReceiveJustification(msg.Vote.Value, msg.Justification)
}
// Every COMMIT phase stays open to new messages even after the protocol moves on
// to a new round. Late-arriving COMMITs can still (must) cause a local decision,
// *in that round*. Try to complete the COMMIT phase for the round specified by
// the message.
if i.current.Phase != DECIDE_PHASE {
return true, i.tryCommit(msg.Vote.Round)
}
case DECIDE_PHASE:
i.decision.Receive(msg.Sender, msg.Vote.Value, msg.Signature)
if i.current.Phase != DECIDE_PHASE {
i.skipToDecide(msg.Vote.Value, msg.Justification)
}
default:
return false, fmt.Errorf("unexpected message phase %s", msg.Vote.Phase)
}
// Try to complete the current phase in the current round.
return true, i.tryCurrentPhase()
}
func (i *instance) postReceive(roundsReceived ...uint64) {
// Check whether the instance should skip ahead to future round, in descending order.
slices.Reverse(roundsReceived)
for _, r := range roundsReceived {
round := i.getRound(r)
if chain, justification, skip := i.shouldSkipToRound(r, round); skip {
i.skipToRound(r, chain, justification)
return
}
}
}
// shouldSkipToRound determines whether to skip to round, and justification
// either for a value to sway to, or of COMMIT bottom to justify our own
// proposal. Otherwise, it returns nil chain, nil justification and false.
//
// See: skipToRound.
func (i *instance) shouldSkipToRound(round uint64, state *roundState) (*ECChain, *Justification, bool) {
// Check if the given round is ahead of current round and this instance is not in
// DECIDE phase.
if round <= i.current.Round || i.current.Phase == DECIDE_PHASE {
return nil, nil, false
}
if !state.prepared.ReceivedFromWeakQuorum() {
return nil, nil, false
}
proposal := state.converged.FindBestTicketProposal(nil)
if !proposal.IsValid() {
// FindMaxTicketProposal returns a zero-valued ConvergeValue if no such ticket is
// found. Hence the check for nil. Otherwise, if found such ConvergeValue must
// have a non-nil justification.
return nil, nil, false
}
return proposal.Chain, proposal.Justification, true
}
// Attempts to complete the current phase and round.
func (i *instance) tryCurrentPhase() error {
i.log("try phase %s", i.current.Phase)
switch i.current.Phase {
case QUALITY_PHASE:
return i.tryQuality()
case CONVERGE_PHASE:
return i.tryConverge()
case PREPARE_PHASE:
return i.tryPrepare()
case COMMIT_PHASE:
return i.tryCommit(i.current.Round)
case DECIDE_PHASE:
return i.tryDecide()
case TERMINATED_PHASE:
return nil // No-op
default:
return fmt.Errorf("unexpected phase %s", i.current.Phase)
}
}
func (i *instance) reportPhaseMetrics() {
attr := metric.WithAttributes(attrPhase[i.current.Phase])
metrics.phaseCounter.Add(context.TODO(), 1, attr)
metrics.currentPhase.Record(context.TODO(), int64(i.current.Phase))
metrics.proposalLength.Record(context.TODO(), int64(i.proposal.Len()-1), attr)
}
// Sends this node's QUALITY message and begins the QUALITY phase.
func (i *instance) beginQuality() error {
if i.current.Phase != INITIAL_PHASE {
return fmt.Errorf("cannot transition from %s to %s", i.current.Phase, QUALITY_PHASE)
}
// Broadcast input value and wait to receive from others.
i.current.Phase = QUALITY_PHASE
i.participant.progression.NotifyProgress(i.current)
i.phaseTimeout = i.alarmAfterSynchronyWithMulti(i.participant.qualityDeltaMulti)
i.resetRebroadcastParams()
i.broadcast(i.current.Round, QUALITY_PHASE, i.proposal, false, nil)
i.reportPhaseMetrics()
return nil
}
// Attempts to end the QUALITY phase and begin PREPARE based on current state.
func (i *instance) tryQuality() error {
if i.current.Phase != QUALITY_PHASE {
return fmt.Errorf("unexpected phase %s, expected %s", i.current.Phase, QUALITY_PHASE)
}
// Wait either for a strong quorum that agree on our proposal, or for the timeout
// to expire.
foundQuorum := i.quality.HasStrongQuorumFor(i.proposal.Key())
timeoutExpired := atOrAfter(i.participant.host.Time(), i.phaseTimeout)
if foundQuorum || timeoutExpired {
// If strong quorum of input is found the proposal will remain unchanged.
// Otherwise, change the proposal to the longest prefix of input with strong
// quorum.
i.proposal = i.quality.FindStrongQuorumValueForLongestPrefixOf(i.input)
// Add prefixes with quorum to candidates.
i.addCandidatePrefixes(i.proposal)
i.value = i.proposal
i.log("adopting proposal/value %s", i.proposal)
i.beginPrepare(nil)
}
return nil
}
// updateCandidatesFromQuality updates candidates as a result of late-arriving
// QUALITY messages based on the longest input prefix with strong quorum.
func (i *instance) updateCandidatesFromQuality() error {
// Find the longest input prefix that has reached strong quorum as a result of
// late-arriving QUALITY messages and update candidates with each of its
// prefixes.
longestPrefix := i.quality.FindStrongQuorumValueForLongestPrefixOf(i.input)
if i.addCandidatePrefixes(longestPrefix) {
i.log("expanded candidates for proposal %s from QUALITY quorum of %s", i.proposal, longestPrefix)
}
return nil
}
// beginConverge initiates CONVERGE_PHASE justified by the given justification.
func (i *instance) beginConverge(justification *Justification) {
if justification.Vote.Round != i.current.Round-1 {
// For safety assert that the justification given belongs to the right round.
panic("justification for which to begin converge does not belong to expected round")
}
i.current.Phase = CONVERGE_PHASE
i.participant.progression.NotifyProgress(i.current)
i.phaseTimeout = i.alarmAfterSynchrony()
i.resetRebroadcastParams()
// Notify the round's convergeState that the self participant has begun the
// CONVERGE phase. Because, we cannot guarantee that the CONVERGE message
// broadcasts are delivered to self synchronously.
i.getRound(i.current.Round).converged.SetSelfValue(i.proposal, justification)
i.broadcast(i.current.Round, CONVERGE_PHASE, i.proposal, true, justification)
i.reportPhaseMetrics()
}
// Attempts to end the CONVERGE phase and begin PREPARE based on current state.
func (i *instance) tryConverge() error {
if i.current.Phase != CONVERGE_PHASE {
return fmt.Errorf("unexpected phase %s, expected %s", i.current.Phase, CONVERGE_PHASE)
}
// The CONVERGE phase timeout doesn't wait to hear from >⅔ of power.
timeoutExpired := atOrAfter(i.participant.host.Time(), i.phaseTimeout)
if !timeoutExpired {
return nil
}
commitRoundState := i.getRound(i.current.Round - 1).committed
isValidConvergeValue := func(cv ConvergeValue) bool {
// If it is in candidate set
if i.isCandidate(cv.Chain) {
return true
}
// If it is not a candidate but it could possibly have been decided by another participant
// in the last round, consider it a candidate.
if cv.Justification.Vote.Phase != PREPARE_PHASE {
return false
}
possibleDecision := commitRoundState.CouldReachStrongQuorumFor(cv.Chain.Key(), true)
return possibleDecision
}
winner := i.getRound(i.current.Round).converged.FindBestTicketProposal(isValidConvergeValue)
if !winner.IsValid() {
return fmt.Errorf("no values at CONVERGE")
}
if !i.isCandidate(winner.Chain) {
// if winner.Chain is not in candidate set then it means we got swayed
i.log("⚠️ swaying from %s to %s by CONVERGE", i.proposal, winner.Chain)
i.addCandidate(winner.Chain)
} else {
i.log("adopting proposal %s after converge (old proposal %s)", winner.Chain, i.proposal)
}
i.proposal = winner.Chain
i.value = winner.Chain
i.beginPrepare(winner.Justification)
return nil
}
// Sends this node's PREPARE message and begins the PREPARE phase.
func (i *instance) beginPrepare(justification *Justification) {
// Broadcast preparation of value and wait for everyone to respond.
i.current.Phase = PREPARE_PHASE
i.participant.progression.NotifyProgress(i.current)
i.phaseTimeout = i.alarmAfterSynchrony()
i.resetRebroadcastParams()
i.broadcast(i.current.Round, PREPARE_PHASE, i.value, false, justification)
i.reportPhaseMetrics()
}
// Attempts to end the PREPARE phase and begin COMMIT based on current state.
func (i *instance) tryPrepare() error {
if i.current.Phase != PREPARE_PHASE {
return fmt.Errorf("unexpected phase %s, expected %s", i.current.Phase, PREPARE_PHASE)
}
prepared := i.getRound(i.current.Round).prepared
proposalKey := i.proposal.Key()
foundQuorum := prepared.HasStrongQuorumFor(proposalKey)
timedOut := atOrAfter(i.participant.host.Time(), i.phaseTimeout)
quorumNotPossible := !prepared.CouldReachStrongQuorumFor(proposalKey, false)
phaseComplete := timedOut && prepared.ReceivedFromStrongQuorum()
if foundQuorum {
i.value = i.proposal
} else if quorumNotPossible || phaseComplete {
i.value = &ECChain{}
}
if foundQuorum || quorumNotPossible || phaseComplete {
i.beginCommit()
} else if timedOut {
i.tryRebroadcast()
}
return nil
}
func (i *instance) beginCommit() {
i.current.Phase = COMMIT_PHASE
i.participant.progression.NotifyProgress(i.current)
i.phaseTimeout = i.alarmAfterSynchrony()
i.resetRebroadcastParams()
// The PREPARE phase exited either with i.value == i.proposal having a strong quorum agreement,
// or with i.value == bottom otherwise.
// No justification is required for committing bottom.
var justification *Justification
if !i.value.IsZero() {
if quorum, ok := i.getRound(i.current.Round).prepared.FindStrongQuorumFor(i.value.Key()); ok {
// Found a strong quorum of PREPARE, build the justification for it.
justification = i.buildJustification(quorum, i.current.Round, PREPARE_PHASE, i.value)
} else {
panic("beginCommit with no strong quorum for non-bottom value")
}
}
i.broadcast(i.current.Round, COMMIT_PHASE, i.value, false, justification)
i.reportPhaseMetrics()
}
func (i *instance) tryCommit(round uint64) error {
// Unlike all other phases, the COMMIT phase stays open to new messages even
// after an initial quorum is reached, and the algorithm moves on to the next
// round. A subsequent COMMIT message can cause the node to decide, so there is
// no check on the current phase.
committed := i.getRound(round).committed
quorumValue, foundStrongQuorum := committed.FindStrongQuorumValue()
timedOut := atOrAfter(i.participant.host.Time(), i.phaseTimeout)
phaseComplete := timedOut && committed.ReceivedFromStrongQuorum()
switch {
case foundStrongQuorum && !quorumValue.IsZero():
// There is a strong quorum for a non-zero value; accept it. A participant may be
// forced to decide a value that's not its preferred chain. The participant isn't
// influencing that decision against their interest, just accepting it.
i.value = quorumValue
i.beginDecide(round)
case i.current.Round != round, i.current.Phase != COMMIT_PHASE:
// We are at a phase other than COMMIT or round does not match the current one;
// nothing else to do.
case foundStrongQuorum:
// There is a strong quorum for bottom, carry forward the existing proposal.
i.beginNextRound()
case phaseComplete:
// There is no strong quorum for bottom, which implies there must be a COMMIT for
// some other value. There can only be one such value since it must be justified
// by a strong quorum of PREPAREs. Some other participant could possibly have
// observed a strong quorum for that value, since they might observe votes from ⅓
// of honest power plus a ⅓ equivocating adversary. Sway to consider that value
// as a candidate, even if it wasn't the local proposal.
for _, v := range committed.ListAllValues() {
if !v.IsZero() {
if !i.isCandidate(v) {
i.log("⚠️ swaying from %s to %s by COMMIT", i.input, v)
i.addCandidate(v)
}
if !v.Eq(i.proposal) {
i.proposal = v
i.log("adopting proposal %s after commit", i.proposal)
}
break
}
}
i.beginNextRound()
case timedOut:
// The phase has timed out. Attempt to re-broadcast messages.
i.tryRebroadcast()
}
return nil
}
func (i *instance) beginDecide(round uint64) {
i.current.Phase = DECIDE_PHASE
i.participant.progression.NotifyProgress(i.current)
i.resetRebroadcastParams()
var justification *Justification
// Value cannot be empty here.
if quorum, ok := i.getRound(round).committed.FindStrongQuorumFor(i.value.Key()); ok {
// Build justification for strong quorum of COMMITs for the value.
justification = i.buildJustification(quorum, round, COMMIT_PHASE, i.value)
} else {
panic("beginDecide with no strong quorum for value")
}
// DECIDE messages always specify round = 0.
// Extreme out-of-order message delivery could result in different nodes deciding
// in different rounds (but for the same value).
// Since each node sends only one DECIDE message, they must share the same vote
// in order to be aggregated.
i.broadcast(0, DECIDE_PHASE, i.value, false, justification)
i.reportPhaseMetrics()
}
// Skips immediately to the DECIDE phase and sends a DECIDE message
// without waiting for a strong quorum of COMMITs in any round.
// The provided justification must justify the value being decided.
func (i *instance) skipToDecide(value *ECChain, justification *Justification) {
i.current.Phase = DECIDE_PHASE
i.participant.progression.NotifyProgress(i.current)
i.proposal = value
i.value = i.proposal
i.resetRebroadcastParams()
i.broadcast(0, DECIDE_PHASE, i.value, false, justification)
metrics.skipCounter.Add(context.TODO(), 1, metric.WithAttributes(attrSkipToDecide))
i.reportPhaseMetrics()
}
func (i *instance) tryDecide() error {
quorumValue, ok := i.decision.FindStrongQuorumValue()
if ok {
if quorum, ok := i.decision.FindStrongQuorumFor(quorumValue.Key()); ok {
decision := i.buildJustification(quorum, 0, DECIDE_PHASE, quorumValue)
i.terminate(decision)
} else {
panic("tryDecide with no strong quorum for value")
}
} else {
i.tryRebroadcast()
}
return nil
}
func (i *instance) getRound(r uint64) *roundState {
round, ok := i.rounds[r]
if !ok {
round = newRoundState(r, i.powerTable)
i.rounds[r] = round
}
return round
}
var bottomECChain = &ECChain{}
func (i *instance) beginNextRound() {
i.log("moving to round %d with %s", i.current.Round+1, i.proposal.String())
i.current.Round += 1
metrics.currentRound.Record(context.TODO(), int64(i.current.Round))
prevRound := i.getRound(i.current.Round - 1)
// Proposal was updated at the end of COMMIT phase to be some value for which
// this node received a COMMIT message (bearing justification), if there were any.
// If there were none, there must have been a strong quorum for bottom instead.
var justification *Justification
if quorum, ok := prevRound.committed.FindStrongQuorumFor(bottomECChain.Key()); ok {
// Build justification for strong quorum of COMMITs for bottom in the previous round.
justification = i.buildJustification(quorum, i.current.Round-1, COMMIT_PHASE, nil)
} else {
// Extract the justification received from some participant (possibly this node itself).
justification, ok = prevRound.committed.receivedJustification[i.proposal.Key()]
if !ok {
panic("beginConverge called but no justification for proposal")
}
}
i.beginConverge(justification)
}
// skipToRound jumps ahead to the given round by initiating CONVERGE with the given justification.
//
// See shouldSkipToRound.
func (i *instance) skipToRound(round uint64, chain *ECChain, justification *Justification) {
i.log("skipping from round %d to round %d with %s", i.current.Round, round, i.proposal.String())
i.current.Round = round
metrics.currentRound.Record(context.TODO(), int64(i.current.Round))
metrics.skipCounter.Add(context.TODO(), 1, metric.WithAttributes(attrSkipToRound))
if justification.Vote.Phase == PREPARE_PHASE {
i.log("⚠️ swaying from %s to %s by skip to round %d", i.proposal, chain, i.current.Round)
i.addCandidate(chain)
i.proposal = chain
}
i.beginConverge(justification)
}
// Returns whether a chain is acceptable as a proposal for this instance to vote for.
// This is "EC Compatible" in the pseudocode.
func (i *instance) isCandidate(c *ECChain) bool {
_, exists := i.candidates[c.Key()]
return exists
}
func (i *instance) addCandidatePrefixes(c *ECChain) bool {
var addedAny bool
for l := c.Len() - 1; l > 0 && !addedAny; l-- {
addedAny = i.addCandidate(c.Prefix(l))
}
return addedAny
}
func (i *instance) addCandidate(c *ECChain) bool {
key := c.Key()
if _, exists := i.candidates[key]; !exists {
i.candidates[key] = struct{}{}
return true
}
return false
}
func (i *instance) terminate(decision *Justification) {
i.log("✅ terminated %s during round %d", i.value, i.current.Round)
i.current.Phase = TERMINATED_PHASE
i.participant.progression.NotifyProgress(i.current)
i.value = decision.Vote.Value
i.terminationValue = decision
i.resetRebroadcastParams()
metrics.roundHistogram.Record(context.TODO(), int64(i.current.Round))
i.reportPhaseMetrics()
}
func (i *instance) terminated() bool {
return i.current.Phase == TERMINATED_PHASE
}
func (i *instance) broadcast(round uint64, phase Phase, value *ECChain, createTicket bool, justification *Justification) {
p := Payload{
Instance: i.current.ID,
Round: round,
Phase: phase,
SupplementalData: *i.supplementalData,
Value: value,
}
mb := &MessageBuilder{
NetworkName: i.participant.host.NetworkName(),
PowerTable: i.powerTable,
Payload: p,
Justification: justification,
}
if createTicket {
mb.BeaconForTicket = i.beacon
}
metrics.broadcastCounter.Add(context.TODO(), 1, metric.WithAttributes(attrPhase[p.Phase]))
if err := i.participant.host.RequestBroadcast(mb); err != nil {
i.log("failed to request broadcast: %v", err)
}
}
// tryRebroadcast checks whether re-broadcast timeout has elapsed, and if so
// rebroadcasts messages from current and previous rounds. If not, it sets an
// alarm for re-broadcast relative to the number of attempts.
func (i *instance) tryRebroadcast() {
switch {
case i.rebroadcastAttempts == 0 && i.rebroadcastTimeout.IsZero():
// It is the first time that rebroadcast has become necessary; set initial
// rebroadcast timeout relative to the phase timeout, and schedule a rebroadcast.
//
// Determine the offset for the first rebroadcast alarm depending on current
// instance phase and schedule the first alarm:
// * If in DECIDE phase, use current time as offset. Because, DECIDE phase does
// not have any phase timeout and may be too far in the past.
// * Otherwise, use the phase timeout.
var rebroadcastTimeoutOffset time.Time
if i.current.Phase == DECIDE_PHASE {
rebroadcastTimeoutOffset = i.participant.host.Time()
} else {
rebroadcastTimeoutOffset = i.phaseTimeout
}
i.rebroadcastTimeout = rebroadcastTimeoutOffset.Add(i.participant.rebroadcastAfter(0))
i.participant.host.SetAlarm(i.rebroadcastTimeout)
i.log("scheduled initial rebroadcast at %v", i.rebroadcastTimeout)
case i.rebroadcastTimeoutElapsed():
// Rebroadcast now that the corresponding timeout has elapsed, and schedule the
// successive rebroadcast.
i.rebroadcast()
i.rebroadcastAttempts++
// Use current host time as the offset for the next alarm to assure that rate of
// broadcasted messages grows relative to the actual time at which an alarm is
// triggered , not the absolute alarm time. This would avoid a "runaway
// rebroadcast" scenario where rebroadcast timeout consistently remains behind
// current time due to the discrepancy between set alarm time and the actual time
// at which the alarm is triggered.
i.rebroadcastTimeout = i.participant.host.Time().Add(i.participant.rebroadcastAfter(i.rebroadcastAttempts))
i.participant.host.SetAlarm(i.rebroadcastTimeout)
i.log("scheduled next rebroadcast at %v", i.rebroadcastTimeout)
default:
// Rebroadcast timeout is set but has not elapsed yet; nothing to do.
}
}
func (i *instance) resetRebroadcastParams() {
i.rebroadcastAttempts = 0
i.rebroadcastTimeout = time.Time{}
}
func (i *instance) rebroadcastTimeoutElapsed() bool {
now := i.participant.host.Time()
return atOrAfter(now, i.rebroadcastTimeout)
}
func (i *instance) rebroadcast() {
// Rebroadcast quality and all messages from the current and previous rounds, unless the
// instance has progressed to DECIDE phase. In which case, only DECIDE message is
// rebroadcasted.
//
// Note that the implementation here rebroadcasts more messages than FIP-0086
// strictly requires. Because, the cost of rebroadcasting additional messages is
// small compared to the reduction in need for rebroadcast.
switch i.current.Phase {
case QUALITY_PHASE, CONVERGE_PHASE, PREPARE_PHASE, COMMIT_PHASE:
// Rebroadcast request for missing messages are silently ignored. Hence the
// simpler bulk rebroadcast if we are not in DECIDE phase.
i.rebroadcastQuietly(0, QUALITY_PHASE)
i.rebroadcastQuietly(i.current.Round, COMMIT_PHASE)
i.rebroadcastQuietly(i.current.Round, PREPARE_PHASE)
i.rebroadcastQuietly(i.current.Round, CONVERGE_PHASE)
if i.current.Round > 0 {
i.rebroadcastQuietly(i.current.Round-1, COMMIT_PHASE)
i.rebroadcastQuietly(i.current.Round-1, PREPARE_PHASE)
i.rebroadcastQuietly(i.current.Round-1, CONVERGE_PHASE)
}
case DECIDE_PHASE:
i.rebroadcastQuietly(0, DECIDE_PHASE)
default:
log.Errorw("rebroadcast attempted for unexpected phase", "round", i.current.Round, "phase", i.current.Phase)
}
}
func (i *instance) rebroadcastQuietly(round uint64, phase Phase) {
instant := Instant{i.current.ID, round, phase}