Skip to content

Commit

Permalink
Add self-equivocation filter (#648)
Browse files Browse the repository at this point in the history
* Add self-equivocation filter

Signed-off-by: Jakub Sztandera <[email protected]>

* Address review

Signed-off-by: Jakub Sztandera <[email protected]>

* Address review

Signed-off-by: Jakub Sztandera <[email protected]>

---------

Signed-off-by: Jakub Sztandera <[email protected]>
  • Loading branch information
Kubuxu authored Sep 23, 2024
1 parent 76ecca1 commit 16919c6
Show file tree
Hide file tree
Showing 8 changed files with 378 additions and 31 deletions.
143 changes: 143 additions & 0 deletions equivocation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package f3

import (
"bytes"
"slices"
"sync"

"github.com/filecoin-project/go-f3/gpbft"
"github.com/libp2p/go-libp2p/core/peer"
)

// zero value is valid
type equivocationFilter struct {
lk sync.Mutex
localPID peer.ID
currentInstance uint64
// seenMessages map unique message slot to its signature
seenMessages map[equivocationKey]equivMessage
activeSenders map[gpbft.ActorID]equivSenders
}

func newEquivocationFilter(localPID peer.ID) equivocationFilter {
return equivocationFilter{
localPID: localPID,
seenMessages: make(map[equivocationKey]equivMessage),
activeSenders: make(map[gpbft.ActorID]equivSenders),
}
}

type equivocationKey struct {
Sender gpbft.ActorID
Round uint64
Phase gpbft.Phase
}

func (ef *equivocationFilter) formKey(m *gpbft.GMessage) equivocationKey {
return equivocationKey{
Sender: m.Sender,
Round: m.Vote.Round,
Phase: m.Vote.Phase,
}
}

type equivSenders struct {
origins []peer.ID
equivocation bool
}

type equivMessage struct {
signature []byte
origin peer.ID
}

func (es *equivSenders) addSender(id peer.ID, equivocation bool) {
if !slices.Contains(es.origins, id) {
es.origins = append(es.origins, id)
if len(es.origins) > 10 {
es.origins = es.origins[:10]
}
slices.Sort(es.origins)
}
es.equivocation = es.equivocation || equivocation
}

func (ef *equivocationFilter) ProcessBroadcast(m *gpbft.GMessage) bool {
ef.lk.Lock()
defer ef.lk.Unlock()

if m.Vote.Instance < ef.currentInstance {
// disallow past instances
log.Warnw("disallowing broadcast for past instance", "sender", m.Sender, "instance",
m.Vote.Instance, "currentInstance", ef.currentInstance)
return false
}
// moved onto new instance
if m.Vote.Instance > ef.currentInstance {
ef.currentInstance = m.Vote.Instance
ef.seenMessages = make(map[equivocationKey]equivMessage)
ef.activeSenders = make(map[gpbft.ActorID]equivSenders)
}

key := ef.formKey(m)
msgInfo, ok := ef.seenMessages[key]
equivocationDetected := false
if ok && !bytes.Equal(msgInfo.signature, m.Signature) {
if msgInfo.origin == ef.localPID {
log.Warnw("local self-equivocation detected", "sender", m.Sender,
"instance", m.Vote.Instance, "round", m.Vote.Round, "phase", m.Vote.Phase)
return false
} else {
log.Warnw("detected equivocation during broadcast", "sender", m.Sender,
"instance", m.Vote.Instance, "round", m.Vote.Round, "phase", m.Vote.Phase)
equivocationDetected = true
}
} else if !ok {
// save the signature
ef.seenMessages[key] = equivMessage{signature: m.Signature, origin: ef.localPID}
}
// save ourselves as one of the senders
senders := ef.activeSenders[m.Sender]
senders.addSender(ef.localPID, equivocationDetected)
ef.activeSenders[m.Sender] = senders

if !senders.equivocation {
// we are alone in this dark forest
return true
}
// if we are not alone, broadcast the message if we have the best (lowest PeerID)

log.Warnw("self-equivocation detected during broadcast", "sender", m.Sender, "instance", m.Vote.Instance,
"round", m.Vote.Round, "phase", m.Vote.Phase, "sourcers", senders, "localPID", ef.localPID)

// if there are multiple senders, only broadcast if we are the smallest one
return senders.origins[0] == ef.localPID
}

func (ef *equivocationFilter) ProcessReceive(peerID peer.ID, m *gpbft.GMessage) {
ef.lk.Lock()
defer ef.lk.Unlock()

if m.Vote.Instance != ef.currentInstance {
// the instance does not match
return
}
senders, ok := ef.activeSenders[m.Sender]
if !ok {
// we do not track the sender because we didn't send any messages from that ID
// otherwise we would have to track all messages
return
}
key := ef.formKey(m)
msgInfo, ok := ef.seenMessages[key]
if ok && !bytes.Equal(msgInfo.signature, m.Signature) {
// equivocation detected
senders.addSender(peerID, true)
ef.activeSenders[m.Sender] = senders
}
if !ok {
// add the message
ef.seenMessages[key] = equivMessage{signature: m.Signature, origin: peerID}
}

}
161 changes: 161 additions & 0 deletions equivocation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
package f3

import (
"testing"

"github.com/filecoin-project/go-f3/gpbft"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/stretchr/testify/assert"
)

var localGoodPID = peer.ID("1local")
var remotePID0 = peer.ID("0remote")
var remotePID5 = peer.ID("5remote")

func TestEquivactionFilter_ProcessBroadcast(t *testing.T) {
localPID := localGoodPID
ef := newEquivocationFilter(localPID)

// Test case 1: First message should be processed
msg1 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature1"),
}
assert.True(t, ef.ProcessBroadcast(msg1), "First message should be processed")

// Test case 2: Duplicate message with same signature should be processed
msg2 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature1"),
}
assert.True(t, ef.ProcessBroadcast(msg2), "Duplicate message with same signature should be processed")

// Test case 3 Message with same key but different signature should not be processed
msg3 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature2"),
}
assert.False(t, ef.ProcessBroadcast(msg3), "Message with same key but different signature should not be processed")

// Test case 4: Message with new instance should be processed
msg4 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 2, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature3"),
}
assert.True(t, ef.ProcessBroadcast(msg4), "Message with new instance should be processed")

// Test case 5: Message with past instance should not be processed
msg5 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature4"),
}
assert.False(t, ef.ProcessBroadcast(msg5), "Message with past instance should not be processed")
}

func TestEquivactionFilter_formKey(t *testing.T) {
ef := newEquivocationFilter(localGoodPID)

msg := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Round: 1, Phase: gpbft.Phase(1)},
}

expectedKey := equivocationKey{
Sender: gpbft.ActorID(1),
Round: 1,
Phase: gpbft.Phase(1),
}

assert.Equal(t, expectedKey, ef.formKey(msg), "Keys should match")
}

func TestEquivactionFilter_remoteEquivocationResolution(t *testing.T) {
localPID := localGoodPID
ef := newEquivocationFilter(localPID)

msg1 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature1"),
}
assert.True(t, ef.ProcessBroadcast(msg1), "First message should be processed")
ef.ProcessReceive(localPID, msg1)

msg2 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature2"),
}
ef.ProcessReceive(remotePID5, msg2)
assert.Contains(t, ef.activeSenders[msg2.Sender].origins, remotePID5)

msg3 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 2, Phase: gpbft.Phase(1)},
Signature: []byte("signature3"),
}

assert.True(t, ef.ProcessBroadcast(msg3), "local sender should still be able to broadcast")
ef.ProcessReceive(localPID, msg1)

// lower PeerID sender comes along
msg4 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 2, Phase: gpbft.Phase(1)},
Signature: []byte("signature4"),
}
ef.ProcessReceive(remotePID0, msg4)
assert.Contains(t, ef.activeSenders[msg2.Sender].origins, remotePID0)

msg5 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 3, Phase: gpbft.Phase(1)},
Signature: []byte("signature5"),
}

assert.False(t, ef.ProcessBroadcast(msg5), "we should have backed off")

assert.False(t, ef.ProcessBroadcast(msg3), "trying to re-broadcast is now not allowed")
}

func TestEquivocationFilter_PeerIDBasedEquivocationHandling(t *testing.T) {
localPID := localGoodPID
ef := newEquivocationFilter(localPID)

// Local broadcast
msg1 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature1"),
}
assert.True(t, ef.ProcessBroadcast(msg1), "Local message should be processed")

// Remote equivocation with higher PeerID
msg2 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature2"),
}
ef.ProcessReceive(remotePID5, msg2)
assert.Contains(t, ef.activeSenders[msg2.Sender].origins, remotePID5, "Higher PeerID should be recorded")

// Local broadcast after higher PeerID equivocation
assert.True(t, ef.ProcessBroadcast(msg1), "Local message should still be processed after higher PeerID equivocation")

// Remote equivocation with lower PeerID
msg3 := &gpbft.GMessage{
Sender: gpbft.ActorID(1),
Vote: gpbft.Payload{Instance: 1, Round: 1, Phase: gpbft.Phase(1)},
Signature: []byte("signature3"),
}
ef.ProcessReceive(remotePID0, msg3)
assert.Contains(t, ef.activeSenders[msg3.Sender].origins, remotePID0, "Lower PeerID should be recorded")

// Local broadcast after lower PeerID equivocation
assert.False(t, ef.ProcessBroadcast(msg1), "Local message should not be processed after lower PeerID equivocation")
}
42 changes: 22 additions & 20 deletions f3.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type F3 struct {

mu sync.Mutex
cs *certstore.Store
wal *writeaheadlog.WriteAheadLog[walEntry, *walEntry]
manifest *manifest.Manifest
runner *gpbftRunner
ps *powerstore.Store
Expand Down Expand Up @@ -92,23 +91,28 @@ func (m *F3) Manifest() *manifest.Manifest {
}

func (m *F3) Broadcast(ctx context.Context, signatureBuilder *gpbft.SignatureBuilder, msgSig []byte, vrf []byte) {
msg := signatureBuilder.Build(msgSig, vrf)

m.mu.Lock()
runner := m.runner
wal := m.wal
manifest := m.manifest
m.mu.Unlock()

if runner == nil {
log.Error("attempted to broadcast message while F3 wasn't running")
return
}
err := wal.Append(walEntry{*msg})
if err != nil {
log.Error("appending to WAL: %+v", err)
if manifest == nil {
log.Error("attempted to broadcast message while manifest is nil")
return
}
if manifest.NetworkName != signatureBuilder.NetworkName {
log.Errorw("attempted to broadcast message for a wrong network",
"manifestNetwork", manifest.NetworkName, "messageNetwork", signatureBuilder.NetworkName)
return
}

err = runner.BroadcastMessage(msg)
msg := signatureBuilder.Build(msgSig, vrf)
err := runner.BroadcastMessage(msg)
if err != nil {
log.Warnf("failed to broadcast message: %+v", err)
}
Expand Down Expand Up @@ -336,12 +340,6 @@ func (m *F3) stopInternal(ctx context.Context) error {
}
m.certserv = nil
}
if m.wal != nil {
if serr := m.wal.Flush(); serr != nil {
err = multierr.Append(err, fmt.Errorf("failed to flush WAL: %w", serr))
}
m.wal = nil
}
return err
}

Expand All @@ -364,12 +362,6 @@ func (m *F3) resumeInternal(ctx context.Context) error {

m.cs = cs
}
walPath := filepath.Join(m.diskPath, "wal", strings.ReplaceAll(string(m.manifest.NetworkName), "/", "-"))
var err error
m.wal, err = writeaheadlog.Open[walEntry](walPath)
if err != nil {
return fmt.Errorf("opening WAL: %w", err)
}

if m.ps == nil {
pds := measurements.NewMeteredDatastore(meter, "f3_ohshitstore_datastore_", m.ds)
Expand Down Expand Up @@ -410,9 +402,19 @@ func (m *F3) resumeInternal(ctx context.Context) error {
return err
}

cleanName := strings.ReplaceAll(string(m.manifest.NetworkName), "/", "-")
cleanName = strings.ReplaceAll(cleanName, ".", "")
cleanName = strings.ReplaceAll(cleanName, "\u0000", "")

walPath := filepath.Join(m.diskPath, "wal", cleanName)
wal, err := writeaheadlog.Open[walEntry](walPath)
if err != nil {
return fmt.Errorf("opening WAL: %w", err)
}

if runner, err := newRunner(
ctx, m.cs, m.ps, m.pubsub, m.verifier,
m.outboundMessages, m.manifest, m.wal,
m.outboundMessages, m.manifest, wal, m.host.ID(),
); err != nil {
return err
} else {
Expand Down
Loading

0 comments on commit 16919c6

Please sign in to comment.