-
Notifications
You must be signed in to change notification settings - Fork 110
swap: prevent debt cheques #1983
Changes from 50 commits
671a30d
2723804
75cfe23
4b85c4d
01b42ab
8df2245
d69ced8
6bc0fe5
6044ec6
d0e8146
619129f
8ee2e2c
5f2b9f5
0aa47da
b271f2d
ab755d9
4a6c3a3
f02a9f2
03484ad
daa620a
eb7d34a
068b58c
9ba613d
b1a0cfc
ba65ef7
d558c16
1ceb4f2
ba551bc
03d601b
c7ceba3
f044532
1f386ae
e17b503
22b4dd3
7706df9
4d33e9e
de01e2d
5869533
78561f2
b449607
1e815e8
f34200a
585e77d
5e53773
fd99c9c
9102171
1ff452c
f4757e2
5010458
39f3aae
5b6f493
6ca077b
8e8df8a
3faf60a
63229eb
7d1ce9c
1d885fc
77a153b
7fa9e47
f1a46a0
d2402bb
594d900
415e7f6
8888cfc
3117c81
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -23,6 +23,7 @@ import ( | |
| "fmt" | ||
| "io/ioutil" | ||
| "math/big" | ||
| "math/rand" | ||
| "os" | ||
| "strings" | ||
| "sync" | ||
|
|
@@ -74,7 +75,7 @@ type swapSimulationParams struct { | |
| // define test message types | ||
| type testMsgBySender struct{} | ||
| type testMsgByReceiver struct{} | ||
| type testMsgBigPrice struct{} | ||
| type testMsgSmallPrice struct{} | ||
|
|
||
| // create a test Spec; every node has its Spec and its accounting Hook | ||
| func newTestSpec() *protocols.Spec { | ||
|
|
@@ -85,7 +86,7 @@ func newTestSpec() *protocols.Spec { | |
| Messages: []interface{}{ | ||
| testMsgBySender{}, | ||
| testMsgByReceiver{}, | ||
| testMsgBigPrice{}, | ||
| testMsgSmallPrice{}, | ||
| }, | ||
| } | ||
| } | ||
|
|
@@ -106,9 +107,9 @@ func (m *testMsgByReceiver) Price() *protocols.Price { | |
| } | ||
| } | ||
|
|
||
| func (m *testMsgBigPrice) Price() *protocols.Price { | ||
| func (m *testMsgSmallPrice) Price() *protocols.Price { | ||
| return &protocols.Price{ | ||
| Value: DefaultPaymentThreshold + 1, | ||
| Value: DefaultPaymentThreshold / 100, // ensures that the message won't put nodes into debt | ||
| PerByte: false, | ||
| Payer: protocols.Sender, | ||
| } | ||
|
|
@@ -250,135 +251,6 @@ func newSharedBackendSwaps(t *testing.T, nodeCount int) (*swapSimulationParams, | |
| return params, nil | ||
| } | ||
|
|
||
| // TestPingPongChequeSimulation just launches two nodes and sends each other | ||
| // messages which immediately crosses the PaymentThreshold and triggers cheques | ||
| // to each other. Checks that accounting and cheque handling works across multiple | ||
| // cheques and also if cheques are mutually sent | ||
| func TestPingPongChequeSimulation(t *testing.T) { | ||
| nodeCount := 2 | ||
| // create the shared backend and params | ||
| params, err := newSharedBackendSwaps(t, nodeCount) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| // cleanup backend | ||
| defer params.backend.Close() | ||
|
|
||
| // setup the wait for mined transaction function for testing | ||
| cleanup := setupContractTest() | ||
| defer cleanup() | ||
|
|
||
| params.backend.cashDone = make(chan struct{}, 1) | ||
| defer close(params.backend.cashDone) | ||
|
|
||
| // initialize the simulation | ||
| sim := simulation.NewBzzInProc(newSimServiceMap(params), false) | ||
| defer sim.Close() | ||
|
|
||
| log.Info("Initializing") | ||
|
|
||
| // we are going to use the metrics system to sync the test | ||
| // we are only going to continue with the next iteration after the message | ||
| // has been received on the other side | ||
| metricsReg := metrics.AccountingRegistry | ||
| // testMsgBigPrice is paid by the sender, so the credit counter will only be | ||
| // increased when receiving the message, which is what we want for this test | ||
| cter := metricsReg.Get("account.msg.credit") | ||
| counter := cter.(metrics.Counter) | ||
| counter.Clear() | ||
| var lastCount int64 | ||
| expectedPayout1, expectedPayout2 := DefaultPaymentThreshold+1, DefaultPaymentThreshold+1 | ||
|
|
||
| _, err = sim.AddNodesAndConnectFull(nodeCount) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| p1 := sim.UpNodeIDs()[0] | ||
| p2 := sim.UpNodeIDs()[1] | ||
| ts1 := sim.Service("swap", p1).(*testService) | ||
| ts2 := sim.Service("swap", p2).(*testService) | ||
|
|
||
| var ts1Len, ts2Len, ts1sLen, ts2sLen int | ||
| timeout := time.After(10 * time.Second) | ||
|
|
||
| for { | ||
| // let's always be nice and allow a time out to be catched | ||
| select { | ||
| case <-timeout: | ||
| t.Fatal("Timed out waiting for all swap peer connections to be established") | ||
| default: | ||
| } | ||
| // the node has all other peers in its peer list | ||
| ts1.swap.peersLock.Lock() | ||
| ts1Len = len(ts1.swap.peers) | ||
| ts1sLen = len(ts1.peers) | ||
| ts1.swap.peersLock.Unlock() | ||
|
|
||
| ts2.swap.peersLock.Lock() | ||
| ts2Len = len(ts2.swap.peers) | ||
| ts2sLen = len(ts2.peers) | ||
| ts2.swap.peersLock.Unlock() | ||
|
|
||
| if ts1Len == 1 && ts2Len == 1 && ts1sLen == 1 && ts2sLen == 1 { | ||
| break | ||
| } | ||
| // don't overheat the CPU... | ||
| time.Sleep(5 * time.Millisecond) | ||
| } | ||
|
|
||
| maxCheques := 42 | ||
|
|
||
| ts1.lock.Lock() | ||
| p2Peer := ts1.peers[p2] | ||
| ts1.lock.Unlock() | ||
|
|
||
| ts2.lock.Lock() | ||
| p1Peer := ts2.peers[p1] | ||
| ts2.lock.Unlock() | ||
|
|
||
| for i := 0; i < maxCheques; i++ { | ||
| if i%2 == 0 { | ||
| if err := p2Peer.Send(context.Background(), &testMsgBigPrice{}); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| if err := waitForChequeProcessed(t, params.backend, counter, lastCount, ts1.swap.peers[p2], expectedPayout1); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| lastCount += 1 | ||
| expectedPayout1 += DefaultPaymentThreshold + 1 | ||
| } else { | ||
| if err := p1Peer.Send(context.Background(), &testMsgBigPrice{}); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| if err := waitForChequeProcessed(t, params.backend, counter, lastCount, ts2.swap.peers[p1], expectedPayout2); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| lastCount += 1 | ||
| expectedPayout2 += DefaultPaymentThreshold + 1 | ||
| } | ||
| } | ||
|
|
||
| ch1, err := ts2.swap.loadLastReceivedCheque(p1) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| ch2, err := ts1.swap.loadLastReceivedCheque(p2) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
|
|
||
| expected := uint64(maxCheques) / 2 * (DefaultPaymentThreshold + 1) | ||
| if !ch1.CumulativePayout.Equals(uint256.FromUint64(expected)) { | ||
| t.Fatalf("expected cumulative payout to be %d, but is %v", expected, ch1.CumulativePayout) | ||
| } | ||
| if !ch2.CumulativePayout.Equals(uint256.FromUint64(expected)) { | ||
| t.Fatalf("expected cumulative payout to be %d, but is %v", expected, ch2.CumulativePayout) | ||
| } | ||
|
|
||
| log.Info("Simulation ended") | ||
| } | ||
|
|
||
| // TestMultiChequeSimulation just launches two nodes, a creditor and a debitor. | ||
| // The debitor is the one owing to the creditor, so the debitor is the one sending cheques | ||
| // It sends multiple cheques in a sequence to the same node. | ||
|
|
@@ -405,18 +277,18 @@ func TestMultiChequeSimulation(t *testing.T) { | |
| defer sim.Close() | ||
|
|
||
| log.Info("Initializing") | ||
|
|
||
| msgPrice := (&testMsgSmallPrice{}).Price().Value | ||
| // we are going to use the metrics system to sync the test | ||
| // we are only going to continue with the next iteration after the message | ||
| // has been received on the other side | ||
| metricsReg := metrics.AccountingRegistry | ||
| // testMsgBigPrice is paid by the sender, so the credit counter will only be | ||
| // testMsgSmallPrice is paid by the sender, so the credit counter will only be | ||
| // increased when receiving the message, which is what we want for this test | ||
| cter := metricsReg.Get("account.msg.credit") | ||
| counter := cter.(metrics.Counter) | ||
| counter.Clear() | ||
| var lastCount int64 | ||
| expectedPayout := DefaultPaymentThreshold + 1 | ||
| var expectedPayout uint64 | ||
|
|
||
| _, err = sim.AddNodesAndConnectFull(nodeCount) | ||
| if err != nil { | ||
|
|
@@ -434,7 +306,7 @@ func TestMultiChequeSimulation(t *testing.T) { | |
| var debLen, credLen, debSwapLen, credSwapLen int | ||
| timeout := time.After(10 * time.Second) | ||
| for { | ||
| // let's always be nice and allow a time out to be catched | ||
| // let's always be nice and allow a time out to be caught | ||
| select { | ||
| case <-timeout: | ||
| t.Fatal("Timed out waiting for all swap peer connections to be established") | ||
|
|
@@ -458,27 +330,74 @@ func TestMultiChequeSimulation(t *testing.T) { | |
| time.Sleep(5 * time.Millisecond) | ||
| } | ||
|
|
||
| // we will send just maxCheques number of cheques | ||
| maxCheques := 10 | ||
| paymentThreshold := debitorSvc.swap.params.PaymentThreshold | ||
|
|
||
| minCheques := 2 | ||
| maxCheques := 5 | ||
|
|
||
| msgsPerCheque := (uint64(paymentThreshold) / msgPrice) + 1 // +1 to round up without casting to float | ||
|
|
||
| minMsgs := msgsPerCheque * uint64(minCheques) | ||
| maxMsgs := msgsPerCheque * uint64(maxCheques) | ||
|
|
||
| msgAmount := rand.Intn(int(maxMsgs-minMsgs)) + int(minMsgs) | ||
| log.Debug("sending %d messages", msgAmount) | ||
|
|
||
| // the peer object used for sending | ||
| debitorSvc.lock.Lock() | ||
| creditorPeer := debitorSvc.peers[creditor] | ||
| debitorSvc.lock.Unlock() | ||
|
|
||
| // send maxCheques number of cheques | ||
| for i := 0; i < maxCheques; i++ { | ||
| // use a price which will trigger a cheque each time | ||
| if err := creditorPeer.Send(context.Background(), &testMsgBigPrice{}); err != nil { | ||
| allMessagesArrived := make(chan struct{}) | ||
|
|
||
| ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) | ||
| defer cancel() | ||
|
|
||
| go func() { | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| return | ||
| default: | ||
| } | ||
| // all messages have been received | ||
| if counter.Count() == int64(msgAmount) { | ||
| close(allMessagesArrived) | ||
| return | ||
| } | ||
| time.Sleep(10 * time.Millisecond) | ||
| } | ||
| }() | ||
|
|
||
| for i := 0; i < msgAmount; i++ { | ||
| debitorBalance, err := debitorSvc.swap.loadBalance(creditor) | ||
| if err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| // we need to wait a bit in order to give time for the cheque to be processed | ||
| if err := waitForChequeProcessed(t, params.backend, counter, lastCount, debitorSvc.swap.peers[creditor], expectedPayout); err != nil { | ||
|
|
||
| if err := creditorPeer.Send(context.Background(), &testMsgSmallPrice{}); err != nil { | ||
holisticode marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| t.Fatal(err) | ||
| } | ||
| lastCount += 1 | ||
| expectedPayout += DefaultPaymentThreshold + 1 | ||
|
|
||
| // check if cheque should have been sent | ||
| balanceAfterMessage := debitorBalance - int64(msgPrice) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we be sure at this point that the actual balance has been updated from the previous iteration run? I think it does, but give it another thought please.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thank you for raising this point, it's something i hadn't considered. my instinct was that the code (from this side of the exchange) would not be able to send a message in a following iteration until it was done with the current one, and this would include updating the balance. i added some prints to the here are the initial results. you can see that sometimes there are multiple balance updates within a single iteration. however, these seem to be from the creditor side, so it does not matter that they occur one time per iteration—at least for the point you've raised. this is the result obtained by filtering out the other peer's balance updates prints. it seems to confirm that every iteration does 1 balance update before moving on to the next one. curiously, this might give us insight into the cause of #2078. it's very possible that by the time the creditor is allowed to receive the messages and account for them, the debitor is way ahead and the cheque it has sent is perceived as debt from the other side, for being too early. but i think it would have to process the cheque before doing the balance updates for this to explain the situation. |
||
| if balanceAfterMessage <= -paymentThreshold { | ||
| // we need to wait a bit in order to give time for the cheque to be processed | ||
| if err := waitForChequeProcessed(t, params.backend, counter, lastCount, debitorSvc.swap.peers[creditor], expectedPayout); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| expectedPayout += uint64(-balanceAfterMessage) | ||
| } | ||
|
|
||
| lastCount++ | ||
| } | ||
| // give enough time for all messages to be processed | ||
| select { | ||
| case <-ctx.Done(): | ||
| t.Fatal("timed out waiting for all messages to arrive, aborting") | ||
| case <-allMessagesArrived: | ||
| } | ||
| log.Debug("all messages arrived") | ||
|
|
||
| // check balances: | ||
| b1, err := debitorSvc.swap.loadBalance(creditor) | ||
|
|
@@ -491,6 +410,7 @@ func TestMultiChequeSimulation(t *testing.T) { | |
| } | ||
|
|
||
| if b1 != -b2 { | ||
| fmt.Printf("balances") | ||
ralph-pichler marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
holisticode marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| t.Fatalf("Expected symmetric balances, but they are not: %d vs %d", b1, b2) | ||
| } | ||
| // check cheques | ||
|
|
@@ -507,9 +427,6 @@ func TestMultiChequeSimulation(t *testing.T) { | |
| t.Fatalf("Expected symmetric cheques payout, but they are not: %v vs %v", cheque1.CumulativePayout, cheque2.CumulativePayout) | ||
| } | ||
|
|
||
| // check also the actual expected amount | ||
| expectedPayout = uint64(maxCheques) * (DefaultPaymentThreshold + 1) | ||
|
|
||
| if !cheque2.CumulativePayout.Equals(uint256.FromUint64(expectedPayout)) { | ||
| t.Fatalf("Expected %d in cumulative payout, got %v", expectedPayout, cheque1.CumulativePayout) | ||
| } | ||
|
|
@@ -722,7 +639,7 @@ func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metr | |
| select { | ||
| case <-ctx.Done(): | ||
| lock.Lock() | ||
| errs = append(errs, "Timed out waiting for cheque to have been cached") | ||
| errs = append(errs, "Timed out waiting for cheque to be cashed") | ||
| lock.Unlock() | ||
| wg.Done() | ||
| return | ||
|
|
@@ -762,7 +679,7 @@ func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metr | |
| select { | ||
| case <-ctx.Done(): | ||
| lock.Lock() | ||
| errs = append(errs, "Timed out waiting for peer to have processed accounted message") | ||
| errs = append(errs, "Timed out waiting for peer to process accounted message") | ||
| lock.Unlock() | ||
| wg.Done() | ||
| return | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.