diff --git a/swap/cashout.go b/swap/cashout.go index 9d62250c64..08379d40b3 100644 --- a/swap/cashout.go +++ b/swap/cashout.go @@ -21,6 +21,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/metrics" contract "github.com/ethersphere/swarm/contracts/swap" "github.com/ethersphere/swarm/swap/chain" @@ -30,10 +31,9 @@ import ( // CashChequeBeneficiaryTransactionCost is the expected gas cost of a CashChequeBeneficiary transaction const CashChequeBeneficiaryTransactionCost = 50000 -// CashoutProcessor holds all relevant fields needed for processing cashouts -type CashoutProcessor struct { - backend chain.Backend // ethereum backend to use - privateKey *ecdsa.PrivateKey // private key to use +var CashoutRequestTypeID = chain.TxRequestTypeID{ + Handler: "cashout", + RequestType: "CashoutRequest", } // CashoutRequest represents a request for a cashout operation @@ -42,42 +42,94 @@ type CashoutRequest struct { Destination common.Address // destination for the payout } -// ActiveCashout stores the necessary information for a cashout in progess -type ActiveCashout struct { - Request CashoutRequest // the request that caused this cashout - TransactionHash common.Hash // the hash of the current transaction for this request +// CashoutProcessor holds all relevant fields needed for processing cashouts +type CashoutProcessor struct { + backend chain.Backend // ethereum backend to use + txScheduler chain.TxScheduler // transaction queue to use + cashoutResultHandler CashoutResultHandler + cashoutDone chan *CashoutRequest +} + +type CashoutResultHandler interface { + HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error } // newCashoutProcessor creates a new instance of CashoutProcessor -func newCashoutProcessor(backend chain.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor { - return &CashoutProcessor{ - backend: backend, - privateKey: privateKey, +func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, privateKey *ecdsa.PrivateKey, cashoutResultHandler CashoutResultHandler) *CashoutProcessor { + c := &CashoutProcessor{ + backend: backend, + txScheduler: txScheduler, + cashoutResultHandler: cashoutResultHandler, } -} -// cashCheque tries to cash the cheque specified in the request -// after the transaction is sent it waits on its success -func (c *CashoutProcessor) cashCheque(ctx context.Context, request *CashoutRequest) error { - cheque := request.Cheque - opts := bind.NewKeyedTransactor(c.privateKey) - opts.Context = ctx + txScheduler.SetHandlers(CashoutRequestTypeID, &chain.TxRequestHandlers{ + Send: func(id uint64, backend chain.Backend, opts *bind.TransactOpts) (common.Hash, error) { + var request CashoutRequest + if err := c.txScheduler.GetRequest(id, &request); err != nil { + return common.Hash{}, err + } + + cheque := request.Cheque + + otherSwap, err := contract.InstanceAt(cheque.Contract, backend) + if err != nil { + return common.Hash{}, err + } + + tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature) + if err != nil { + return common.Hash{}, err + } + return tx.Hash(), nil + }, + NotifyReceipt: func(ctx context.Context, id uint64, notification *chain.TxReceiptNotification) error { + var request *CashoutRequest + err := c.txScheduler.GetRequest(id, &request) + if err != nil { + return err + } + + otherSwap, err := contract.InstanceAt(request.Cheque.Contract, c.backend) + if err != nil { + return err + } + + receipt := ¬ification.Receipt + if receipt.Status == 0 { + swapLog.Error("cheque cashing transaction reverted", "tx", receipt.TxHash) + return nil + } + + result := otherSwap.CashChequeBeneficiaryResult(receipt) + return c.cashoutResultHandler.HandleCashoutResult(request, result, receipt) + }, + }) + return c +} - otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend) +func (c *CashoutProcessor) submitCheque(ctx context.Context, request *CashoutRequest) { + expectedPayout, transactionCosts, err := c.estimatePayout(ctx, &request.Cheque) if err != nil { - return err + swapLog.Error("could not estimate payout", "error", err) + return } - tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature) + costsMultiplier := uint256.FromUint64(2) + costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier) if err != nil { - return err + swapLog.Error("overflow in transaction fee", "error", err) + return } - // this blocks until the cashout has been successfully processed - return c.waitForAndProcessActiveCashout(&ActiveCashout{ - Request: *request, - TransactionHash: tx.Hash(), - }) + // do a payout transaction if we get 2 times the gas costs + if expectedPayout.Cmp(costThreshold) == 1 { + swapLog.Info("queueing cashout", "cheque", &request.Cheque) + _, err := c.txScheduler.ScheduleRequest(CashoutRequestTypeID, request) + if err != nil { + metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1) + swapLog.Error("cashing cheque:", "error", err) + } + } } // estimatePayout estimates the payout for a given cheque as well as the transaction cost @@ -123,31 +175,3 @@ func (c *CashoutProcessor) estimatePayout(ctx context.Context, cheque *Cheque) ( return expectedPayout, transactionCosts, nil } - -// waitForAndProcessActiveCashout waits for activeCashout to complete -func (c *CashoutProcessor) waitForAndProcessActiveCashout(activeCashout *ActiveCashout) error { - ctx, cancel := context.WithTimeout(context.Background(), DefaultTransactionTimeout) - defer cancel() - - receipt, err := chain.WaitMined(ctx, c.backend, activeCashout.TransactionHash) - if err != nil { - return err - } - - otherSwap, err := contract.InstanceAt(activeCashout.Request.Cheque.Contract, c.backend) - if err != nil { - return err - } - - result := otherSwap.CashChequeBeneficiaryResult(receipt) - - metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64()) - - if result.Bounced { - metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1) - swapLog.Warn("cheque bounced", "tx", receipt.TxHash) - } - - swapLog.Info("cheque cashed", "honey", activeCashout.Request.Cheque.Honey) - return nil -} diff --git a/swap/cashout_test.go b/swap/cashout_test.go index 18d56da4ce..085f984a12 100644 --- a/swap/cashout_test.go +++ b/swap/cashout_test.go @@ -19,9 +19,11 @@ package swap import ( "context" "testing" + "time" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/log" + "github.com/ethersphere/swarm/state" "github.com/ethersphere/swarm/swap/chain" "github.com/ethersphere/swarm/uint256" ) @@ -33,8 +35,7 @@ import ( // afterwards it attempts to cash-in a bouncing cheque func TestContractIntegration(t *testing.T) { backend := newTestBackend(t) - reset := setupContractTest() - defer reset() + defer backend.Close() payout := uint256.FromUint64(42) chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout) @@ -116,11 +117,18 @@ func TestContractIntegration(t *testing.T) { // TestCashCheque creates a valid cheque and feeds it to cashoutProcessor.cashCheque func TestCashCheque(t *testing.T) { backend := newTestBackend(t) - reset := setupContractTest() - defer reset() + defer backend.Close() - cashoutProcessor := newCashoutProcessor(backend, ownerKey) - payout := uint256.FromUint64(42) + store := state.NewInmemoryStore() + defer store.Close() + + transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey) + transactionQueue.Start() + defer transactionQueue.Stop() + + cashoutHandler := newTestCashoutResultHandler(nil) + cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey, cashoutHandler) + payout := uint256.FromUint64(CashChequeBeneficiaryTransactionCost*2 + 1) chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout) if err != nil { @@ -132,12 +140,14 @@ func TestCashCheque(t *testing.T) { t.Fatal(err) } - err = cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{ + cashoutProcessor.submitCheque(context.Background(), &CashoutRequest{ Cheque: *testCheque, Destination: ownerAddress, }) - if err != nil { - t.Fatal(err) + + select { + case <-cashoutHandler.cashChequeDone: + case <-time.After(5 * time.Second): } paidOut, err := chequebook.PaidOut(nil, ownerAddress) @@ -154,12 +164,18 @@ func TestCashCheque(t *testing.T) { // TestEstimatePayout creates a valid cheque and feeds it to cashoutProcessor.estimatePayout func TestEstimatePayout(t *testing.T) { backend := newTestBackend(t) - reset := setupContractTest() - defer reset() + defer backend.Close() - cashoutProcessor := newCashoutProcessor(backend, ownerKey) - payout := uint256.FromUint64(42) + store := state.NewInmemoryStore() + defer store.Close() + + transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey) + transactionQueue.Start() + defer transactionQueue.Stop() + cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey, &testCashoutResultHandler{}) + + payout := uint256.FromUint64(42) chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout) if err != nil { t.Fatal(err) diff --git a/swap/chain/backend.go b/swap/chain/backend.go index 54ad6b55b1..07cfad398e 100644 --- a/swap/chain/backend.go +++ b/swap/chain/backend.go @@ -2,7 +2,6 @@ package chain import ( "context" - "errors" "time" "github.com/ethereum/go-ethereum/log" @@ -12,11 +11,6 @@ import ( "github.com/ethereum/go-ethereum/core/types" ) -var ( - // ErrTransactionReverted is given when the transaction that cashes a cheque is reverted - ErrTransactionReverted = errors.New("Transaction reverted") -) - // Backend is the minimum amount of functionality required by the underlying ethereum backend type Backend interface { bind.ContractBackend @@ -30,12 +24,9 @@ func WaitMined(ctx context.Context, b Backend, hash common.Hash) (*types.Receipt for { receipt, err := b.TransactionReceipt(ctx, hash) if err != nil { - log.Error("receipt retrieval failed", "err", err) + log.Trace("receipt retrieval failed", "err", err) } if receipt != nil { - if receipt.Status != types.ReceiptStatusSuccessful { - return nil, ErrTransactionReverted - } return receipt, nil } diff --git a/swap/chain/common_test.go b/swap/chain/common_test.go new file mode 100644 index 0000000000..38d9109af3 --- /dev/null +++ b/swap/chain/common_test.go @@ -0,0 +1,7 @@ +package chain + +import "github.com/ethersphere/swarm/testutil" + +func init() { + testutil.Init() +} diff --git a/swap/chain/mock/testbackend.go b/swap/chain/mock/testbackend.go index 40b64c4b46..57def1dd10 100644 --- a/swap/chain/mock/testbackend.go +++ b/swap/chain/mock/testbackend.go @@ -21,6 +21,11 @@ func (b *TestBackend) SendTransaction(ctx context.Context, tx *types.Transaction return err } +// SendTransactionNoCommit provides access to the underlying SendTransaction function without the auto commit +func (b *TestBackend) SendTransactionNoCommit(ctx context.Context, tx *types.Transaction) (err error) { + return b.SimulatedBackend.SendTransaction(ctx, tx) +} + // Close overrides the Close function of the underlying SimulatedBackend so that it does nothing // This allows the same SimulatedBackend backend to be reused across tests // This is necessary due to some memory leakage issues with the used version of the SimulatedBackend diff --git a/swap/chain/persistentqueue.go b/swap/chain/persistentqueue.go new file mode 100644 index 0000000000..e6695f2f19 --- /dev/null +++ b/swap/chain/persistentqueue.go @@ -0,0 +1,121 @@ +package chain + +import ( + "context" + "encoding" + "encoding/json" + "fmt" + "strings" + "sync" + "time" + + "github.com/ethersphere/swarm/state" +) + +/* + PersistentQueue represents a queue stored in a state store + Items are enqueued by writing them to the state store with the timestamp as prefix and a nonce in case two items get queued at the same time + It provides a (blocking) Next function to wait for a new item to be available. Only a single call to Next may be active at any time + To allow atomic operations with other state store operations all functions only write to batches instead of writing to the store directly + The user must ensure that all functions (except Next) are called with the same lock held which is provided externally so multiple queues can use the same + The queue provides no dequeue function. Instead an item must be deleted by its key +*/ + +// PersistentQueue represents a queue stored in a state store +type PersistentQueue struct { + store state.Store // the store backing this queue + prefix string // the prefix for the keys for this queue + trigger chan struct{} // channel to notify the queue that a new item is available + nonce uint64 // increasing nonce. starts with 0 on every startup +} + +// NewPersistentQueue creates a structure to interact with a queue with the given prefix +func NewPersistentQueue(store state.Store, prefix string) *PersistentQueue { + return &PersistentQueue{ + store: store, + prefix: prefix, + trigger: make(chan struct{}, 1), + nonce: 0, + } +} + +// Queue puts the necessary database operations for enqueueing a new item into the supplied batch +// It returns the generated key and a trigger function which must be called once the batch was successfully written +// This only returns an error if the encoding fails which is an unrecoverable error +// A lock must be held and kept until after the trigger function was called or the batch write failed +func (pq *PersistentQueue) Queue(b *state.StoreBatch, v interface{}) (key string, trigger func(), err error) { + // the nonce guarantees keys don't collide if multiple transactions are queued in the same second + pq.nonce++ + key = fmt.Sprintf("%d_%08d", time.Now().Unix(), pq.nonce) + if err = b.Put(pq.prefix+key, v); err != nil { + return "", nil, err + } + + return key, func() { + select { + case pq.trigger <- struct{}{}: + default: + } + }, nil +} + +// Peek looks at the next item in the queue +// The error returned is either a decode or an io error +// A lock must be held when this is called and should be held afterwards to prevent the item from being removed while processing +func (pq *PersistentQueue) Peek(i interface{}) (key string, exists bool, err error) { + err = pq.store.Iterate(pq.prefix, func(k, data []byte) (bool, error) { + key = string(k) + unmarshaler, ok := i.(encoding.BinaryUnmarshaler) + if !ok { + return true, json.Unmarshal(data, i) + } + return true, unmarshaler.UnmarshalBinary(data) + }) + if err != nil { + return "", false, err + } + if key == "" { + return "", false, nil + } + return strings.TrimPrefix(key, pq.prefix), true, nil +} + +// Next looks at the next item in the queue and blocks until an item is available if there is none +// The error returned is either an decode error, an io error or a cancelled context +// No lock should not be held when this is called. Only a single call to next may be active at any time +// If the the key is not "", the value exists, the supplied lock was acquired and must be released by the caller after processing the item +// The supplied lock should be the same that is used for the other functions +func (pq *PersistentQueue) Next(ctx context.Context, i interface{}, lock *sync.Mutex) (key string, err error) { + lock.Lock() + key, exists, err := pq.Peek(i) + if exists { + return key, nil + } + lock.Unlock() + if err != nil { + return "", err + } + + for { + select { + case <-pq.trigger: + lock.Lock() + key, exists, err = pq.Peek(i) + if exists { + return key, nil + } + lock.Unlock() + if err != nil { + return "", err + } + case <-ctx.Done(): + return "", ctx.Err() + } + } +} + +// Delete adds the batch operation to delete the queue element with the given key +// A lock must be held when the batch is written +func (pq *PersistentQueue) Delete(b *state.StoreBatch, key string) { + b.Delete(pq.prefix + key) +} diff --git a/swap/chain/persistentqueue_test.go b/swap/chain/persistentqueue_test.go new file mode 100644 index 0000000000..b866175a1c --- /dev/null +++ b/swap/chain/persistentqueue_test.go @@ -0,0 +1,93 @@ +package chain + +import ( + "context" + "errors" + "fmt" + "sync" + "testing" + "time" + + "github.com/ethersphere/swarm/state" +) + +// TestNewPersistentQueue adds 200 elements in one routine and waits for them and then deletes them in another +func TestNewPersistentQueue(t *testing.T) { + store := state.NewInmemoryStore() + defer store.Close() + + queue := NewPersistentQueue(store, "testq") + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + var lock sync.Mutex // lock for the queue + var wg sync.WaitGroup // wait group to wait for both routines to terminate + wg.Add(2) + + count := 200 + + var errout error // stores the last error that occurred in one of the routines + go func() { + defer wg.Done() + for i := 0; i < count; i++ { + func() { // this is a function so we can use defer with the right scope + var value uint64 + key, err := queue.Next(ctx, &value, &lock) + if err != nil { + errout = fmt.Errorf("failed to get next item: %v", err) + return + } + defer lock.Unlock() + + if key == "" { + errout = errors.New("key is empty") + return + } + + if value != uint64(i) { + errout = fmt.Errorf("values don't match: got %v, expected %v", value, i) + return + } + + batch := new(state.StoreBatch) + queue.Delete(batch, key) + err = store.WriteBatch(batch) + if err != nil { + errout = fmt.Errorf("could not write batch: %v", err) + return + } + }() + } + }() + + go func() { + defer wg.Done() + for i := 0; i < count; i++ { + func() { // this is a function so we can use defer with the right scope + lock.Lock() + defer lock.Unlock() + + var value = uint64(i) + batch := new(state.StoreBatch) + _, trigger, err := queue.Queue(batch, value) + if err != nil { + errout = fmt.Errorf("failed to queue item: %v", err) + return + } + err = store.WriteBatch(batch) + if err != nil { + errout = fmt.Errorf("failed to write batch: %v", err) + return + } + + trigger() + }() + } + }() + + wg.Wait() + + if errout != nil { + t.Fatal(errout) + } +} diff --git a/swap/chain/txqueue.go b/swap/chain/txqueue.go new file mode 100644 index 0000000000..2a917ac46e --- /dev/null +++ b/swap/chain/txqueue.go @@ -0,0 +1,584 @@ +package chain + +import ( + "context" + "crypto/ecdsa" + "errors" + "fmt" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethersphere/swarm/state" +) + +// TxQueue is a TxScheduler which sends transactions in sequence +// A new transaction is only sent after the previous one confirmed +// This done to minimize the chance of wrong nonce use +type TxQueue struct { + lock sync.Mutex // lock for the entire queue + ctx context.Context // context used for all network requests and waiting operations to ensure the queue can be stopped at any point + cancel context.CancelFunc // function to cancel the above context + wg sync.WaitGroup // used to ensure that all background go routines have finished before Stop returns + started bool // bool indicating that the queue has been started. used to ensure it does not run multiple times simultaneously + errorChan chan error // channel to stop the queue in case of errors + + store state.Store // state store to use as the backend + prefix string // all keys in the state store are prefixed with this + requestQueue *PersistentQueue // queue for all future requests + handlers map[TxRequestTypeID]*TxRequestHandlers // map from request type ids to their registered handlers + notificationQueues map[TxRequestTypeID]*PersistentQueue // map from request type ids to the notification queue of that handler + + backend Backend // ethereum backend to use + privateKey *ecdsa.PrivateKey // private key used to sign transactions +} + +// TxRequestMetadata is the metadata the queue saves for every request +type TxRequestMetadata struct { + RequestTypeID TxRequestTypeID // the type id of this request + State TxRequestState // the state this request is in + Hash common.Hash // the hash of the associated transaction (if already sent) +} + +// NotificationQueueItem is the metadata the queue saves for every pending notification +type NotificationQueueItem struct { + NotificationType string // the type of the notification + RequestID uint64 // the request this notification is for +} + +// ErrNoHandler is the error used if a request cannot be sent because no handler was registered for it +var ErrNoHandler = errors.New("no handler") + +// NewTxQueue creates a new TxQueue +func NewTxQueue(store state.Store, prefix string, backend Backend, privateKey *ecdsa.PrivateKey) *TxQueue { + txq := &TxQueue{ + store: store, + prefix: prefix, + handlers: make(map[TxRequestTypeID]*TxRequestHandlers), + notificationQueues: make(map[TxRequestTypeID]*PersistentQueue), + backend: backend, + privateKey: privateKey, + requestQueue: NewPersistentQueue(store, prefix+"_requestQueue_"), + errorChan: make(chan error, 1), + } + txq.ctx, txq.cancel = context.WithCancel(context.Background()) + return txq +} + +// requestKey returns the database key for the TxRequestMetadata data +func (txq *TxQueue) requestKey(id uint64) string { + return fmt.Sprintf("%s_requests_%d", txq.prefix, id) +} + +// requestDataKey returns the database key for the custom TxRequest +func (txq *TxQueue) requestDataKey(id uint64) string { + return fmt.Sprintf("%s_data", txq.requestKey(id)) +} + +// activeRequestKey returns the database key used for the currently active request +func (txq *TxQueue) activeRequestKey() string { + return fmt.Sprintf("%s_active", txq.prefix) +} + +// notificationKey returns the database key for a notification +func (txq *TxQueue) notificationKey(key string) string { + return fmt.Sprintf("%s_notification_%s", txq.prefix, key) +} + +// stopWithError sends the error to the error channel +func (txq *TxQueue) stopWithError(err error) { + select { + case txq.errorChan <- err: + default: + } +} + +// ScheduleRequest adds a new request to be processed +// The request is assigned an id which is returned +func (txq *TxQueue) ScheduleRequest(requestTypeID TxRequestTypeID, request interface{}) (id uint64, err error) { + txq.lock.Lock() + defer txq.lock.Unlock() + + // get the last id + idKey := txq.prefix + "_request_id" + err = txq.store.Get(idKey, &id) + if err != nil && err != state.ErrNotFound { + return 0, err + } + // ids start at 1 + id++ + + // in a single batch this + // * stores the request data + // * stores the request metadata + // * adds it to the queue + batch := new(state.StoreBatch) + batch.Put(idKey, id) + err = batch.Put(txq.requestDataKey(id), request) + if err != nil { + return 0, err + } + + err = batch.Put(txq.requestKey(id), &TxRequestMetadata{ + RequestTypeID: requestTypeID, + State: TxRequestStateQueued, + }) + if err != nil { + return 0, err + } + + _, triggerQueue, err := txq.requestQueue.Queue(batch, id) + if err != nil { + return 0, err + } + + // persist to disk + err = txq.store.WriteBatch(batch) + if err != nil { + return 0, err + } + + triggerQueue() + return id, nil +} + +// GetRequest load the serialized request data from disk and tries to decode it +func (txq *TxQueue) GetRequest(id uint64, request interface{}) error { + return txq.store.Get(txq.requestDataKey(id), &request) +} + +// Start starts processing transactions if it is not already doing so +func (txq *TxQueue) Start() { + txq.lock.Lock() + defer txq.lock.Unlock() + + if txq.started { + return + } + + txq.started = true + txq.wg.Add(2) + go func() { + err := txq.loop() + if err != nil && !errors.Is(err, context.Canceled) { + log.Error("transaction queue terminated with an error", "queue", txq.prefix, "error", err) + } + txq.wg.Done() + }() + + go func() { + select { + case err := <-txq.errorChan: + log.Error("unrecoverable transaction queue error (transaction processing disabled)", "error", err) + txq.Stop() + case <-txq.ctx.Done(): + } + txq.wg.Done() + }() +} + +// Stop stops processing transactions if it is running +// It will block until processing has terminated +func (txq *TxQueue) Stop() { + txq.lock.Lock() + + if !txq.started { + txq.lock.Unlock() + return + } + + txq.cancel() + txq.lock.Unlock() + // wait until all routines have finished + txq.wg.Wait() +} + +// getNotificationQueue gets the notification queue for a handler +// it initializes the struct if it does not yet exist +// the TxQueue lock must be held +func (txq *TxQueue) getNotificationQueue(requestTypeID TxRequestTypeID) *PersistentQueue { + queue, ok := txq.notificationQueues[requestTypeID] + if !ok { + queue = NewPersistentQueue(txq.store, fmt.Sprintf("%s_notify_%s_%s", txq.prefix, requestTypeID.Handler, requestTypeID.RequestType)) + txq.notificationQueues[requestTypeID] = queue + } + return queue +} + +// SetHandlers registers the handlers for the given TxRequestTypeID +// This starts the delivery of notifications for this TxRequestTypeID +func (txq *TxQueue) SetHandlers(requestTypeID TxRequestTypeID, handlers *TxRequestHandlers) error { + txq.lock.Lock() + defer txq.lock.Unlock() + + if txq.handlers[requestTypeID] != nil { + return fmt.Errorf("handlers for %v.%v already set", requestTypeID.Handler, requestTypeID.RequestType) + } + txq.handlers[requestTypeID] = handlers + notifyQueue := txq.getNotificationQueue(requestTypeID) + + // go routine processing the notification queue for this handler + txq.wg.Add(1) + go func() { + defer txq.wg.Done() + + for { + var item NotificationQueueItem + // get the next notification item + key, err := notifyQueue.Next(txq.ctx, &item, &txq.lock) + if err != nil { + if !errors.Is(err, context.Canceled) { + txq.stopWithError(fmt.Errorf("could not read from notification queue: %v", err)) + } + return + } + // since this is the only function which deletes this item from notifyQueue we can already unlock here + txq.lock.Unlock() + + // load and decode the notification + var notification interface{} + switch item.NotificationType { + case "TxReceiptNotification": + notification = &TxReceiptNotification{} + case "TxStateChangedNotification": + notification = &TxStateChangedNotification{} + } + + err = txq.store.Get(txq.notificationKey(key), notification) + if err != nil { + txq.stopWithError(fmt.Errorf("could not read notification: %v", err)) + return + } + + switch item.NotificationType { + case "TxReceiptNotification": + if handlers.NotifyReceipt != nil { + err = handlers.NotifyReceipt(txq.ctx, item.RequestID, notification.(*TxReceiptNotification)) + } + case "TxStateChangedNotification": + if handlers.NotifyStateChanged != nil { + err = handlers.NotifyStateChanged(txq.ctx, item.RequestID, notification.(*TxStateChangedNotification)) + } + } + + // if a handler failed we will try again in 10 seconds + if err != nil { + log.Info("transaction request handler failed", "type", item.NotificationType, "request", item.RequestID, "error", err) + select { + case <-txq.ctx.Done(): + return + case <-time.After(10 * time.Second): + continue + } + } + + // once the notification was handled delete it from the queue + txq.lock.Lock() + batch := new(state.StoreBatch) + notifyQueue.Delete(batch, key) + err = txq.store.WriteBatch(batch) + txq.lock.Unlock() + if err != nil { + txq.stopWithError(fmt.Errorf("could not delete notification: %v", err)) + return + } + } + }() + return nil +} + +// updateRequestStatus is a helper function to change the state of a transaction request while also emitting a notification +// in one write batch it +// * adds a TxStateChangedNotification notification to the notification queue +// * stores the corresponding notification +// * updates the state of requestMetadata and persists it +// it returns the trigger function which must be called once the batch was written +// this only returns an error if the encoding fails which is an unrecoverable error +// must be called with the txqueue lock held +func (txq *TxQueue) updateRequestStatus(batch *state.StoreBatch, id uint64, requestMetadata *TxRequestMetadata, newState TxRequestState) (triggerNotifyQueue func(), err error) { + notifyQueue := txq.getNotificationQueue(requestMetadata.RequestTypeID) + key, triggerNotifyQueue, err := notifyQueue.Queue(batch, &NotificationQueueItem{ + RequestID: id, + NotificationType: "TxStateChangedNotification", + }) + if err != nil { + return nil, fmt.Errorf("could not serialize notification queue item: %v", err) + } + + err = batch.Put(txq.notificationKey(key), &TxStateChangedNotification{ + OldState: requestMetadata.State, + NewState: newState, + }) + if err != nil { + return nil, fmt.Errorf("could not serialize notification: %v", err) + } + requestMetadata.State = newState + batch.Put(txq.requestKey(id), requestMetadata) + + return triggerNotifyQueue, nil +} + +// waitForNextRequest waits for the next request and sets it as the active request +// the txqueue lock must not be held +func (txq *TxQueue) waitForNextRequest() (id uint64, requestMetadata *TxRequestMetadata, err error) { + // get the id of the next request in the queue + key, err := txq.requestQueue.Next(txq.ctx, &id, &txq.lock) + if err != nil { + return 0, nil, err + } + defer txq.lock.Unlock() + + err = txq.store.Get(txq.requestKey(id), &requestMetadata) + if err != nil { + return 0, nil, err + } + + handlers := txq.handlers[requestMetadata.RequestTypeID] + if handlers == nil || handlers.Send == nil { + // if there is no handler for this handler available we mark the request as cancelled and remove it from the queue + batch := new(state.StoreBatch) + triggerNotifyQueue, err := txq.updateRequestStatus(batch, id, requestMetadata, TxRequestStateCancelled) + if err != nil { + return 0, nil, err + } + + txq.requestQueue.Delete(batch, key) + err = txq.store.WriteBatch(batch) + if err != nil { + return 0, nil, err + } + + triggerNotifyQueue() + return 0, nil, ErrNoHandler + } + + // if the request was successfully decoded it is removed from the queue and set as the active request + batch := new(state.StoreBatch) + err = batch.Put(txq.activeRequestKey(), id) + if err != nil { + return 0, nil, fmt.Errorf("could not put id write into batch: %v", err) + } + + txq.requestQueue.Delete(batch, key) + if err = txq.store.WriteBatch(batch); err != nil { + return 0, nil, err + } + return id, requestMetadata, nil +} + +// sendRequest sends the request with the given id and TxRequestMetadata +// the txqueue lock must not be held +func (txq *TxQueue) sendRequest(id uint64, requestMetadata *TxRequestMetadata) error { + // finally we call the handler to send the actual transaction + opts := bind.NewKeyedTransactor(txq.privateKey) + opts.Context = txq.ctx + handlers := txq.handlers[requestMetadata.RequestTypeID] + hash, err := handlers.Send(id, txq.backend, opts) + txq.lock.Lock() + defer txq.lock.Unlock() + if err != nil { + // even if SendTransactionRequest returns an error there are still certain rare edge cases where the transaction might still be sent so we mark it as status unknown + // in the future there should be a special error type to indicate the transaction was never sent + batch := new(state.StoreBatch) + triggerNotifyQueue, err := txq.updateRequestStatus(batch, id, requestMetadata, TxRequestStateStatusUnknown) + if err != nil { + return fmt.Errorf("failed to write transaction request status to store: %v", err) + } + + if err = txq.store.WriteBatch(batch); err != nil { + return err + } + triggerNotifyQueue() + return nil + } + + // if we have a hash we mark the transaction as pending + batch := new(state.StoreBatch) + requestMetadata.Hash = hash + triggerNotifyQueue, err := txq.updateRequestStatus(batch, id, requestMetadata, TxRequestStatePending) + if err != nil { + return fmt.Errorf("failed to write transaction request status to store: %v", err) + } + + if err = txq.store.WriteBatch(batch); err != nil { + return err + } + triggerNotifyQueue() + return nil +} + +// processActiveRequest continues monitoring the active request if there is one +// this is called on startup before the queue begins normal operation +func (txq *TxQueue) processActiveRequest() error { + // get the stored active request key + // if nothing is stored id will remain 0 (which is invalid as ids start with 1) + var id uint64 + err := txq.store.Get(txq.activeRequestKey(), &id) + if err != nil && err != state.ErrNotFound { + return err + } + + // if there is a non-zero id there is an active request + if id != 0 { + // load the request metadata + var requestMetadata TxRequestMetadata + err = txq.store.Get(txq.requestKey(id), &requestMetadata) + if err != nil { + return err + } + + log.Info("continuing to wait for previous transaction", "hash", requestMetadata.Hash) + txq.lock.Lock() + switch requestMetadata.State { + // if the transaction is still in the Queued state we cannot know for sure where the process terminated + // with a very high likelihood the transaction was not yet sent, but we cannot be sure of that + // the transaction is marked as TransactionStatusUnknown and removed as the active transaction + // in that rare case nonce issue might arise in subsequent requests + case TxRequestStateQueued: + defer txq.lock.Unlock() + batch := new(state.StoreBatch) + triggerNotifyQueue, err := txq.updateRequestStatus(batch, id, &requestMetadata, TxRequestStateStatusUnknown) + // this only returns an error if the encoding fails which is an unrecoverable error + if err != nil { + return err + } + batch.Delete(txq.activeRequestKey()) + if err := txq.store.WriteBatch(batch); err != nil { + return err + } + triggerNotifyQueue() + // if the transaction is in the pending state this means we were previously waiting for the transaction + case TxRequestStatePending: + txq.lock.Unlock() + // this only returns an error if the encoding fails which is an unrecoverable error + if err := txq.waitForActiveTransaction(id, &requestMetadata); err != nil { + return err + } + default: + defer txq.lock.Unlock() + // this indicates a client bug + log.Error("found active transaction in unexpected state", "state", requestMetadata.State) + if err := txq.store.Delete(txq.activeRequestKey()); err != nil { + return err + } + } + } + return nil +} + +// waitForActiveTransaction waits for requestMetadata to be mined and resets the active transaction afterwards +// the transaction will also be considered mine once the notification was queued successfully +// this only returns an error if the encoding fails which is an unrecoverable error +// the txqueue lock must not be held +func (txq *TxQueue) waitForActiveTransaction(id uint64, requestMetadata *TxRequestMetadata) error { + ctx, cancel := context.WithTimeout(txq.ctx, 20*time.Minute) + defer cancel() + + // an error here means the context was cancelled + receipt, err := WaitMined(ctx, txq.backend, requestMetadata.Hash) + txq.lock.Lock() + defer txq.lock.Unlock() + if err != nil { + // if the main context of the TxQueue was cancelled we log and return + if txq.ctx.Err() != nil { + log.Info("terminating transaction queue while waiting for a transaction", "hash", requestMetadata.Hash) + return nil + } + + // if the timeout context expired we mark the transaction status as unknown + // future versions of the queue (with local nonce-tracking) should keep note of that and reuse the nonce for the next request + log.Warn("transaction timeout reached", "hash", requestMetadata.Hash) + batch := new(state.StoreBatch) + triggerNotifyQueue, err := txq.updateRequestStatus(batch, id, requestMetadata, TxRequestStateStatusUnknown) + if err != nil { + return err + } + err = batch.Put(txq.activeRequestKey(), nil) + if err != nil { + return err + } + + if err = txq.store.WriteBatch(batch); err != nil { + return err + } + + triggerNotifyQueue() + return nil + } + + // if the transaction is mined we need to + // * update the request state and emit the corresponding notification + // * emit a TxReceiptNotification + // * reset the active request + notifyQueue := txq.getNotificationQueue(requestMetadata.RequestTypeID) + batch := new(state.StoreBatch) + triggerNotifyQueueStateChanged, err := txq.updateRequestStatus(batch, id, requestMetadata, TxRequestStateConfirmed) + if err != nil { + return err + } + + key, triggerNotifyQueueReceipt, err := notifyQueue.Queue(batch, &NotificationQueueItem{ + RequestID: id, + NotificationType: "TxReceiptNotification", + }) + if err != nil { + return err + } + + batch.Put(txq.notificationKey(key), &TxReceiptNotification{ + Receipt: *receipt, + }) + + err = batch.Put(txq.activeRequestKey(), nil) + if err != nil { + return err + } + + if err = txq.store.WriteBatch(batch); err != nil { + return err + } + + triggerNotifyQueueStateChanged() + triggerNotifyQueueReceipt() + return nil +} + +// loop is the main transaction processing function of the TxQueue +// first it checks if there already is an active request. If so it processes this first +// then it will take requests from the queue in a loop and execute those +func (txq *TxQueue) loop() error { + err := txq.processActiveRequest() + if err != nil { + return err + } + + for { + select { + case <-txq.ctx.Done(): + return nil + default: + } + + id, requestMetadata, err := txq.waitForNextRequest() + if err == ErrNoHandler { + continue + } + if err != nil { + return err + } + + err = txq.sendRequest(id, requestMetadata) + if err != nil { + return err + } + + err = txq.waitForActiveTransaction(id, requestMetadata) + if err != nil { + // this only returns an error if the encoding fails which is an unrecoverable error + return fmt.Errorf("error while waiting for transaction: %v", err) + } + } +} diff --git a/swap/chain/txqueue_test.go b/swap/chain/txqueue_test.go new file mode 100644 index 0000000000..5b07a49278 --- /dev/null +++ b/swap/chain/txqueue_test.go @@ -0,0 +1,421 @@ +package chain + +import ( + "context" + "errors" + "fmt" + "math/big" + "reflect" + "sync" + "testing" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/accounts/abi/bind/backends" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" + "github.com/ethersphere/swarm/state" + mock "github.com/ethersphere/swarm/swap/chain/mock" +) + +var ( + senderKey, _ = crypto.HexToECDSA("634fb5a872396d9693e5c9f9d7233cfa93f395c093371017ff44aa9ae6564cdd") + senderAddress = crypto.PubkeyToAddress(senderKey.PublicKey) +) + +var defaultBackend = backends.NewSimulatedBackend(core.GenesisAlloc{ + senderAddress: {Balance: big.NewInt(1000000000000000000)}, +}, 8000000) + +func newTestBackend() *mock.TestBackend { + return mock.NewTestBackend(defaultBackend) +} + +var testRequestTypeID = TxRequestTypeID{ + Handler: "test", + RequestType: "TestRequest", +} + +var dummyTypeID = TxRequestTypeID{ + Handler: "dummy", + RequestType: "DummyRequest", +} + +// txSchedulerTester is a helper used for testing TxScheduler implementations +// it saves received notifications to channels so they can easily be checked in tests +// furthermore it can trigger certain errors depending on flags set in the requests +type txSchedulerTester struct { + lock sync.Mutex + txScheduler TxScheduler + chans map[uint64]*txSchedulerTesterRequestData + backend Backend +} + +// txSchedulerTesterRequestData is the data txSchedulerTester saves for every request +type txSchedulerTesterRequestData struct { + ReceiptNotification chan *TxReceiptNotification + StateChangedNotification chan *TxStateChangedNotification + hash common.Hash +} + +type txSchedulerTesterRequest struct { + NoCommit bool // the transaction should not be automatically mined + ErrorDuringSend bool // send should return with an error +} + +func newTxSchedulerTester(backend Backend, txScheduler TxScheduler) (*txSchedulerTester, error) { + t := &txSchedulerTester{ + txScheduler: txScheduler, + backend: backend, + chans: make(map[uint64]*txSchedulerTesterRequestData), + } + err := txScheduler.SetHandlers(testRequestTypeID, &TxRequestHandlers{ + Send: t.SendTransactionRequest, + NotifyReceipt: t.NotifyReceipt, + NotifyStateChanged: t.NotifyStateChanged, + }) + if err != nil { + return nil, err + } + return t, nil +} + +// getRequest gets the txSchedulerTesterRequestData for this id or initializes it if it does not yet exist +func (tc *txSchedulerTester) getRequest(id uint64) *txSchedulerTesterRequestData { + tc.lock.Lock() + defer tc.lock.Unlock() + c, ok := tc.chans[id] + if !ok { + tc.chans[id] = &txSchedulerTesterRequestData{ + ReceiptNotification: make(chan *TxReceiptNotification), + StateChangedNotification: make(chan *TxStateChangedNotification), + } + return tc.chans[id] + } + return c +} + +// expectStateChangedNotification waits for a StateChangedNotification with the given parameters +func (tc *txSchedulerTester) expectStateChangedNotification(ctx context.Context, id uint64, oldState TxRequestState, newState TxRequestState) error { + var notification *TxStateChangedNotification + request := tc.getRequest(id) + select { + case notification = <-request.StateChangedNotification: + case <-ctx.Done(): + return ctx.Err() + } + + if notification.OldState != oldState { + return fmt.Errorf("wrong old state. got %v, expected %v", notification.OldState, oldState) + } + + if notification.NewState != newState { + return fmt.Errorf("wrong new state. got %v, expected %v", notification.NewState, newState) + } + + return nil +} + +// expectStateChangedNotification waits for a ReceiptNotification for the given request id and verifies its hash +func (tc *txSchedulerTester) expectReceiptNotification(ctx context.Context, id uint64) error { + var notification *TxReceiptNotification + request := tc.getRequest(id) + select { + case notification = <-request.ReceiptNotification: + case <-ctx.Done(): + return ctx.Err() + } + + receipt, err := tc.backend.TransactionReceipt(ctx, request.hash) + if err != nil { + return err + } + if receipt == nil { + return errors.New("no receipt found for transaction") + } + + if notification.Receipt.TxHash != request.hash { + return fmt.Errorf("wrong old state. got %v, expected %v", notification.Receipt.TxHash, request.hash) + } + + return nil +} + +func (tc *txSchedulerTester) NotifyReceipt(ctx context.Context, id uint64, notification *TxReceiptNotification) error { + select { + case tc.getRequest(id).ReceiptNotification <- notification: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +func (tc *txSchedulerTester) NotifyStateChanged(ctx context.Context, id uint64, notification *TxStateChangedNotification) error { + select { + case tc.getRequest(id).StateChangedNotification <- notification: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// simple low level transaction logic +func (tc *txSchedulerTester) SendTransactionRequest(id uint64, backend Backend, opts *bind.TransactOpts) (hash common.Hash, err error) { + var nonce uint64 + if opts.Nonce == nil { + nonce, err = backend.PendingNonceAt(opts.Context, opts.From) + if err != nil { + return common.Hash{}, err + } + } else { + nonce = opts.Nonce.Uint64() + } + + signed, err := opts.Signer(types.HomesteadSigner{}, opts.From, types.NewTransaction(nonce, common.Address{}, big.NewInt(0), 100000, big.NewInt(int64(10000000)), []byte{})) + if err != nil { + return common.Hash{}, err + } + + var request *txSchedulerTesterRequest + err = tc.txScheduler.GetRequest(id, &request) + if err != nil { + return common.Hash{}, err + } + + if request.ErrorDuringSend { + return common.Hash{}, errors.New("simulated error during send") + } + + if request.NoCommit { + err = backend.(*mock.TestBackend).SendTransactionNoCommit(opts.Context, signed) + } else { + err = backend.SendTransaction(opts.Context, signed) + } + if err != nil { + return common.Hash{}, err + } + + tc.getRequest(id).hash = signed.Hash() + return signed.Hash(), nil +} + +// helper function for queue tests which sets up everything and provides a cleanup function +// if run is true the queue starts processing requests and cleanup function will wait for proper termination +func setupTxQueueTest(run bool) (*TxQueue, func()) { + backend := newTestBackend() + store := state.NewInmemoryStore() + txq := NewTxQueue(store, "test", backend, senderKey) + if run { + txq.Start() + } + return txq, func() { + if run { + txq.Stop() + } + store.Close() + backend.Close() + } +} + +// TestTxQueueScheduleRequest tests scheduling a single request when the queue is not running +// Afterwards the queue is started and the correct sequence of notifications is expected +func TestTxQueueScheduleRequest(t *testing.T) { + txq, clean := setupTxQueueTest(false) + defer clean() + tc, err := newTxSchedulerTester(txq.backend, txq) + if err != nil { + t.Fatal(err) + } + + testRequest := &txSchedulerTesterRequest{} + + id, err := txq.ScheduleRequest(testRequestTypeID, testRequest) + if err != nil { + t.Fatal(err) + } + + if id != 1 { + t.Fatal("expected id to be 1") + } + + var testRequestRetrieved *txSchedulerTesterRequest + err = txq.GetRequest(id, &testRequestRetrieved) + if err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(testRequest, testRequestRetrieved) { + t.Fatalf("got request %v, expected %v", testRequestRetrieved, testRequest) + } + + txq.Start() + defer txq.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + if err = tc.expectStateChangedNotification(ctx, id, TxRequestStateQueued, TxRequestStatePending); err != nil { + t.Fatal(err) + } + + if err = tc.expectStateChangedNotification(ctx, id, TxRequestStatePending, TxRequestStateConfirmed); err != nil { + t.Fatal(err) + } + + if err = tc.expectReceiptNotification(ctx, id); err != nil { + t.Fatal(err) + } +} + +// TestTxQueueManyRequests schedules many requests and expects all of them to be successful +func TestTxQueueManyRequests(t *testing.T) { + txq, clean := setupTxQueueTest(true) + defer clean() + tc, err := newTxSchedulerTester(txq.backend, txq) + if err != nil { + t.Fatal(err) + } + + var ids []uint64 + count := 200 + for i := 0; i < count; i++ { + id, err := txq.ScheduleRequest(testRequestTypeID, &txSchedulerTesterRequest{}) + if err != nil { + t.Fatal(err) + } + + ids = append(ids, id) + } + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + for _, id := range ids { + err := tc.expectStateChangedNotification(ctx, id, TxRequestStateQueued, TxRequestStatePending) + if err != nil { + t.Fatal(err) + } + err = tc.expectStateChangedNotification(ctx, id, TxRequestStatePending, TxRequestStateConfirmed) + if err != nil { + t.Fatal(err) + } + err = tc.expectReceiptNotification(ctx, id) + if err != nil { + t.Fatal(err) + } + } +} + +// TestTxQueueNoHandler schedules a request with no send handler +func TestTxQueueNoHandler(t *testing.T) { + txq, clean := setupTxQueueTest(true) + defer clean() + tc, err := newTxSchedulerTester(txq.backend, txq) + if err != nil { + t.Fatal(err) + } + + txq.SetHandlers(dummyTypeID, &TxRequestHandlers{ + NotifyStateChanged: tc.NotifyStateChanged, + }) + + id, err := txq.ScheduleRequest(dummyTypeID, &txSchedulerTesterRequest{}) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err = tc.expectStateChangedNotification(ctx, id, TxRequestStateQueued, TxRequestStateCancelled) + if err != nil { + t.Fatal(err) + } +} + +// TestTxQueueActiveTransaction tests that the queue continues to monitor the last pending transaction +func TestTxQueueActiveTransaction(t *testing.T) { + txq, clean := setupTxQueueTest(false) + defer clean() + + tc, err := newTxSchedulerTester(txq.backend, txq) + if err != nil { + t.Fatal(err) + } + + txq.Start() + + id, err := txq.ScheduleRequest(testRequestTypeID, &txSchedulerTesterRequest{ + NoCommit: true, + }) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err = tc.expectStateChangedNotification(ctx, id, TxRequestStateQueued, TxRequestStatePending) + if err != nil { + t.Fatal(err) + } + + txq.Stop() + + // start a new queue with the same store and backend + txq2 := NewTxQueue(txq.store, txq.prefix, txq.backend, txq.privateKey) + if err != nil { + t.Fatal(err) + } + // reuse the tester so it maintains state about the tx hash and id + err = txq2.SetHandlers(testRequestTypeID, &TxRequestHandlers{ + Send: tc.SendTransactionRequest, + NotifyReceipt: tc.NotifyReceipt, + NotifyStateChanged: tc.NotifyStateChanged, + }) + if err != nil { + t.Fatal(err) + } + + // the transaction confirmed in the meantime + txq2.backend.(*mock.TestBackend).Commit() + + txq2.Start() + defer txq2.Stop() + + err = tc.expectStateChangedNotification(ctx, id, TxRequestStatePending, TxRequestStateConfirmed) + if err != nil { + t.Fatal(err) + } + + err = tc.expectReceiptNotification(ctx, id) + if err != nil { + t.Fatal(err) + } +} + +// TestTxQueueErrorDuringSend tests that a request is marked as TxRequestStateStatusUnknown if the send fails +func TestTxQueueErrorDuringSend(t *testing.T) { + txq, clean := setupTxQueueTest(true) + defer clean() + tc, err := newTxSchedulerTester(txq.backend, txq) + if err != nil { + t.Fatal(err) + } + + id, err := txq.ScheduleRequest(testRequestTypeID, &txSchedulerTesterRequest{ + ErrorDuringSend: true, + }) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + err = tc.expectStateChangedNotification(ctx, id, TxRequestStateQueued, TxRequestStateStatusUnknown) + if err != nil { + t.Fatal(err) + } +} diff --git a/swap/chain/txscheduler.go b/swap/chain/txscheduler.go new file mode 100644 index 0000000000..be0d0c567e --- /dev/null +++ b/swap/chain/txscheduler.go @@ -0,0 +1,83 @@ +package chain + +import ( + "context" + "fmt" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// TxScheduler represents a central sender for all transactions from a single ethereum account +// its purpose is to ensure there are no nonce issues and that transaction initiators are notified of the result +// notifications are guaranteed to happen even across node restarts and disconnects from the ethereum backend +type TxScheduler interface { + // SetHandlers registers the handlers for the given requestTypeID + // This starts the delivery of notifications for this requestTypeID + SetHandlers(requestTypeID TxRequestTypeID, handlers *TxRequestHandlers) error + // ScheduleRequest adds a new request to be processed + // The request is assigned an id which is returned + ScheduleRequest(requestTypeID TxRequestTypeID, request interface{}) (id uint64, err error) + // GetRequest load the serialized transaction request from disk and tries to decode it + GetRequest(id uint64, request interface{}) error + // Start starts processing transactions if it is not already doing so + Start() + // Stop stops processing transactions if it is running + // It will block until processing has terminated + Stop() +} + +// TxRequestTypeID is a combination of a handler and a request type +// All requests with a given TxRequestTypeID are handled the same +type TxRequestTypeID struct { + Handler string + RequestType string +} + +func (rti TxRequestTypeID) String() string { + return fmt.Sprintf("%s.%s", rti.Handler, rti.RequestType) +} + +// TxRequestHandlers holds all the callbacks for a given TxRequestTypeID +// Except for Send, any of the functions may be nil +// Notify functions are called by the transaction queue when a notification for a transaction occurs +// If the handler returns an error the notification will be resent in the future (including across restarts) +type TxRequestHandlers struct { + // Send should send the transaction using the backend and opts provided + // opts may be modified, however From, Nonce and Signer must be left untouched + // If the transaction is sent through other means From, Nonce and Signer must be respected (if Nonce set to nil, the "pending" nonce must be used) + Send func(id uint64, backend Backend, opts *bind.TransactOpts) (common.Hash, error) + // NotifyReceipt is called the first time a receipt is observed for a transaction + NotifyReceipt func(ctx context.Context, id uint64, notification *TxReceiptNotification) error + // NotifyStateChanged is called every time the transaction status changes + NotifyStateChanged func(ctx context.Context, id uint64, notification *TxStateChangedNotification) error +} + +// TxRequestState is the type used to indicate which state the transaction is in +type TxRequestState uint8 + +// TxReceiptNotification is the notification emitted when the receipt is available +type TxReceiptNotification struct { + Receipt types.Receipt // the receipt of the included transaction +} + +// TxStateChangedNotification is the notification emitted when the state of the request changes +// Note that by the time the handler processes the notification, the state might have already changed again +type TxStateChangedNotification struct { + OldState TxRequestState // the state prior to the change + NewState TxRequestState // the state after the change +} + +const ( + // TxRequestStateQueued is the initial state for all requests that enter the queue + TxRequestStateQueued TxRequestState = 0 + // TxRequestStatePending means the request is no longer in the queue but not yet confirmed + TxRequestStatePending TxRequestState = 1 + // TxRequestStateConfirmed is entered the first time a confirmation is received. This is a terminal state. + TxRequestStateConfirmed TxRequestState = 2 + // TxRequestStateStatusUnknown is used for all cases where it is unclear wether the transaction was broadcast or not. This is also used for timed-out transactions. + TxRequestStateStatusUnknown TxRequestState = 3 + // TxRequestStateCancelled is used for all cases where it is certain the transaction was and never will be sent + TxRequestStateCancelled TxRequestState = 4 +) diff --git a/swap/common_test.go b/swap/common_test.go index f7b7fbcce6..de0aeca62f 100644 --- a/swap/common_test.go +++ b/swap/common_test.go @@ -15,10 +15,12 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind/backends" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p/simulations/adapters" contractFactory "github.com/ethersphere/go-sw3/contracts-v0-2-0/simpleswapfactory" + contract "github.com/ethersphere/swarm/contracts/swap" cswap "github.com/ethersphere/swarm/contracts/swap" "github.com/ethersphere/swarm/network" "github.com/ethersphere/swarm/p2p/protocols" @@ -34,8 +36,6 @@ type swapTestBackend struct { *mock.TestBackend factoryAddress common.Address // address of the SimpleSwapFactory in the simulated network tokenAddress common.Address // address of the token in the simulated network - // the async cashing go routine needs synchronization for tests - cashDone chan struct{} } var defaultBackend = backends.NewSimulatedBackend(core.GenesisAlloc{ @@ -67,7 +67,6 @@ func newTestBackend(t *testing.T) *swapTestBackend { TestBackend: backend, factoryAddress: factoryAddress, tokenAddress: tokenAddress, - cashDone: make(chan struct{}), } } @@ -105,7 +104,9 @@ func newBaseTestSwapWithParams(t *testing.T, key *ecdsa.PrivateKey, params *Para if err != nil { t.Fatal(err) } - swap := newSwapInstance(stateStore, owner, backend, 10, params, factory) + + txqueue := chain.NewTxQueue(stateStore, "chain", backend, owner.privateKey) + swap := newSwapInstance(stateStore, owner, backend, 10, params, factory, txqueue) return swap, dir } @@ -126,6 +127,7 @@ func newTestSwap(t *testing.T, key *ecdsa.PrivateKey, backend *swapTestBackend) usedBackend = newTestBackend(t) } swap, dir := newBaseTestSwap(t, key, usedBackend) + swap.txScheduler.Start() clean := func() { swap.Close() // only close if created by newTestSwap to avoid double close @@ -206,32 +208,6 @@ func newRandomTestCheque() *Cheque { return cheque } -// During tests, because the cashing in of cheques is async, we should wait for the function to be returned -// Otherwise if we call `handleEmitChequeMsg` manually, it will return before the TX has been committed to the `SimulatedBackend`, -// causing subsequent TX to possibly fail due to nonce mismatch -func testCashCheque(s *Swap, cheque *Cheque) { - cashCheque(s, cheque) - // send to the channel, signals to clients that this function actually finished - if stb, ok := s.backend.(*swapTestBackend); ok { - if stb.cashDone != nil { - stb.cashDone <- struct{}{} - } - } -} - -// setupContractTest is a helper function for setting up the -// blockchain wait function for testing -func setupContractTest() func() { - // we also need to store the previous cashCheque function in case this is called multiple times - currentCashCheque := defaultCashCheque - defaultCashCheque = testCashCheque - // overwrite only for the duration of the test, so... - return func() { - // ...we need to set it back to original when done - defaultCashCheque = currentCashCheque - } -} - // deploy for testing (needs simulated backend commit) func testDeployWithPrivateKey(ctx context.Context, backend chain.Backend, privateKey *ecdsa.PrivateKey, ownerAddress common.Address, depositAmount *uint256.Uint256) (cswap.Contract, error) { opts := bind.NewKeyedTransactor(privateKey) @@ -248,9 +224,6 @@ func testDeployWithPrivateKey(ctx context.Context, backend chain.Backend, privat return nil, err } - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() contract, err := factory.DeploySimpleSwap(opts, ownerAddress, big.NewInt(int64(defaultHarddepositTimeoutDuration))) if err != nil { return nil, err @@ -315,3 +288,41 @@ func (d *dummyMsgRW) ReadMsg() (p2p.Msg, error) { func (d *dummyMsgRW) WriteMsg(msg p2p.Msg) error { return nil } + +type cashChequeDoneData struct { + request *CashoutRequest + result *contract.CashChequeResult + receipt *types.Receipt +} + +type testCashoutResultHandler struct { + swap *Swap + cashChequeDone chan cashChequeDoneData +} + +func newTestCashoutResultHandler(swap *Swap) *testCashoutResultHandler { + return &testCashoutResultHandler{ + swap: swap, + cashChequeDone: make(chan cashChequeDoneData), + } +} + +func (h *testCashoutResultHandler) HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error { + if h.swap != nil { + if err := h.swap.HandleCashoutResult(request, result, receipt); err != nil { + return err + } + } + h.cashChequeDone <- cashChequeDoneData{ + request: request, + result: result, + receipt: receipt, + } + return nil +} + +func overrideCashoutResultHandler(swap *Swap) *testCashoutResultHandler { + cashoutResultHandler := newTestCashoutResultHandler(swap) + swap.cashoutProcessor.cashoutResultHandler = cashoutResultHandler + return cashoutResultHandler +} diff --git a/swap/protocol_test.go b/swap/protocol_test.go index ca120ae9d6..56f2a59b99 100644 --- a/swap/protocol_test.go +++ b/swap/protocol_test.go @@ -49,7 +49,6 @@ type swapTester struct { swap *Swap } -// creates a new protocol tester for swap with a deployed chequebook func newSwapTester(t *testing.T, backend *swapTestBackend, depositAmount *uint256.Uint256) (*swapTester, func(), error) { swap, clean := newTestSwap(t, ownerKey, backend) @@ -229,14 +228,11 @@ func TestEmitCheque(t *testing.T) { t.Fatal(err) } creditorSwap := protocolTester.swap + cashoutHandler := overrideCashoutResultHandler(creditorSwap) debitorSwap, cleanDebitorSwap := newTestSwap(t, beneficiaryKey, testBackend) defer cleanDebitorSwap() - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() - log.Debug("deploy to simulated backend") // cashCheque cashes a cheque when the reward of doing so is twice the transaction costs. @@ -317,7 +313,7 @@ func TestEmitCheque(t *testing.T) { // we wait until the cashCheque is actually terminated (ensures proper nonce count) select { - case <-testBackend.cashDone: + case <-cashoutHandler.cashChequeDone: log.Debug("cash transaction completed and committed") case <-time.After(4 * time.Second): t.Fatalf("Timeout waiting for cash transaction to complete") @@ -340,9 +336,6 @@ func TestTriggerPaymentThreshold(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() if err = protocolTester.testHandshake( correctSwapHandshakeMsg(debitorSwap), diff --git a/swap/simulations_test.go b/swap/simulations_test.go index e2222da8b7..38fdb3ef29 100644 --- a/swap/simulations_test.go +++ b/swap/simulations_test.go @@ -47,6 +47,7 @@ import ( "github.com/ethersphere/swarm/network/simulation" "github.com/ethersphere/swarm/p2p/protocols" "github.com/ethersphere/swarm/state" + "github.com/ethersphere/swarm/swap/chain" mock "github.com/ethersphere/swarm/swap/chain/mock" "github.com/ethersphere/swarm/uint256" ) @@ -62,6 +63,7 @@ For integration tests, run test cluster deployments with all integration moduele (blockchains, oracles, etc.) */ // swapSimulationParams allows to avoid global variables for the test + type swapSimulationParams struct { swaps map[int]*Swap dirs map[int]string @@ -165,9 +167,10 @@ func newSimServiceMap(params *swapSimulationParams) map[string]simulation.Servic if err != nil { return nil, nil, err } + ts.swap.cashoutProcessor.txScheduler.Start() cleanup = func() { - ts.swap.store.Close() + ts.swap.Close() os.RemoveAll(dir) } @@ -238,7 +241,6 @@ func newSharedBackendSwaps(t *testing.T, nodeCount int) (*swapSimulationParams, TestBackend: mock.NewTestBackend(defaultBackend), factoryAddress: factoryAddress, tokenAddress: tokenAddress, - cashDone: make(chan struct{}), } // finally, create all Swap instances for each node, which share the same backend var owner *Owner @@ -249,7 +251,8 @@ func newSharedBackendSwaps(t *testing.T, nodeCount int) (*swapSimulationParams, if err != nil { t.Fatal(err) } - params.swaps[i] = newSwapInstance(stores[i], owner, testBackend, 10, defParams, factory) + txqueue := chain.NewTxQueue(stores[i], "chain", testBackend, owner.privateKey) + params.swaps[i] = newSwapInstance(stores[i], owner, testBackend, 10, defParams, factory, txqueue) } params.backend = testBackend @@ -271,10 +274,6 @@ func TestMultiChequeSimulation(t *testing.T) { // cleanup backend defer params.backend.Close() - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() - // initialize the simulation sim := simulation.NewBzzInProc(newSimServiceMap(params), false) defer sim.Close() @@ -306,6 +305,8 @@ func TestMultiChequeSimulation(t *testing.T) { // get the testService for the creditor creditorSvc := sim.Service("swap", creditor).(*testService) + cashoutHandler := overrideCashoutResultHandler(creditorSvc.swap) + var debLen, credLen, debSwapLen, credSwapLen int timeout := time.After(10 * time.Second) for { @@ -384,7 +385,7 @@ func TestMultiChequeSimulation(t *testing.T) { balanceAfterMessage := debitorBalance - int64(msgPrice) if balanceAfterMessage <= -paymentThreshold { // we need to wait a bit in order to give time for the cheque to be processed - if err := waitForChequeProcessed(t, params.backend, counter, lastCount, debitorSvc.swap.peers[creditor], expectedPayout); err != nil { + if err := waitForChequeProcessed(t, params.backend, counter, lastCount, debitorSvc.swap.peers[creditor], expectedPayout, cashoutHandler.cashChequeDone); err != nil { t.Fatal(err) } expectedPayout += uint64(-balanceAfterMessage) @@ -620,7 +621,7 @@ func TestBasicSwapSimulation(t *testing.T) { log.Info("Simulation ended") } -func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metrics.Counter, lastCount int64, p *Peer, expectedLastPayout uint64) error { +func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metrics.Counter, lastCount int64, p *Peer, expectedLastPayout uint64, cashChequeDone chan cashChequeDoneData) error { t.Helper() ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) defer cancel() @@ -643,7 +644,7 @@ func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metr lock.Unlock() wg.Done() return - case <-backend.cashDone: + case <-cashChequeDone: wg.Done() return } diff --git a/swap/swap.go b/swap/swap.go index 274c3a29cd..0d505b4b5d 100644 --- a/swap/swap.go +++ b/swap/swap.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/console" + "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" @@ -68,6 +69,7 @@ type Swap struct { chequebookFactory contract.SimpleSwapFactory // the chequebook factory used honeyPriceOracle HoneyOracle // oracle which resolves the price of honey (in Wei) cashoutProcessor *CashoutProcessor // processor for cashing out + txScheduler chain.TxScheduler // transaction scheduler to use } // Owner encapsulates information related to accessing the contract @@ -136,8 +138,8 @@ func swapRotatingFileHandler(logdir string) (log.Handler, error) { } // newSwapInstance is a swap constructor function without integrity checks -func newSwapInstance(stateStore state.Store, owner *Owner, backend chain.Backend, chainID uint64, params *Params, chequebookFactory contract.SimpleSwapFactory) *Swap { - return &Swap{ +func newSwapInstance(stateStore state.Store, owner *Owner, backend chain.Backend, chainID uint64, params *Params, chequebookFactory contract.SimpleSwapFactory, txScheduler chain.TxScheduler) *Swap { + s := &Swap{ store: stateStore, peers: make(map[enode.ID]*Peer), backend: backend, @@ -146,8 +148,10 @@ func newSwapInstance(stateStore state.Store, owner *Owner, backend chain.Backend chequebookFactory: chequebookFactory, honeyPriceOracle: NewHoneyPriceOracle(), chainID: chainID, - cashoutProcessor: newCashoutProcessor(backend, owner.privateKey), + txScheduler: txScheduler, } + s.cashoutProcessor = newCashoutProcessor(txScheduler, backend, owner.privateKey, s) + return s } // New prepares and creates all fields to create a swap instance: @@ -209,12 +213,15 @@ func New(dbPath string, prvkey *ecdsa.PrivateKey, backendURL string, params *Par chainID.Uint64(), params, factory, + chain.NewTxQueue(stateStore, "chain", backend, owner.privateKey), ) // start the chequebook if swap.contract, err = swap.StartChequebook(chequebookAddressFlag); err != nil { return nil, err } + swap.txScheduler.Start() + // deposit money in the chequebook if desired if !skipDepositFlag { // prompt the user for a depositAmount @@ -391,8 +398,6 @@ func (s *Swap) handleMsg(p *Peer) func(ctx context.Context, msg interface{}) err } } -var defaultCashCheque = cashCheque - // handleEmitChequeMsg should be handled by the creditor when it receives // a cheque from a debitor func (s *Swap) handleEmitChequeMsg(ctx context.Context, p *Peer, msg *EmitChequeMsg) error { @@ -435,21 +440,10 @@ func (s *Swap) handleEmitChequeMsg(ctx context.Context, p *Peer, msg *EmitCheque return protocols.Break(err) } - expectedPayout, transactionCosts, err := s.cashoutProcessor.estimatePayout(context.TODO(), cheque) - if err != nil { - return protocols.Break(err) - } - - costsMultiplier := uint256.FromUint64(2) - costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier) - if err != nil { - return err - } - - // do a payout transaction if we get 2 times the gas costs - if expectedPayout.Cmp(costThreshold) == 1 { - go defaultCashCheque(s, cheque) - } + s.cashoutProcessor.submitCheque(ctx, &CashoutRequest{ + Cheque: *cheque, + Destination: s.GetParams().ContractAddress, + }) return nil } @@ -489,20 +483,6 @@ func (s *Swap) handleConfirmChequeMsg(ctx context.Context, p *Peer, msg *Confirm return nil } -// cashCheque should be called async as it blocks until the transaction(s) are mined -// The function cashes the cheque by sending it to the blockchain -func cashCheque(s *Swap, cheque *Cheque) { - err := s.cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{ - Cheque: *cheque, - Destination: s.GetParams().ContractAddress, - }) - - if err != nil { - metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1) - swapLog.Error("cashing cheque:", "error", err) - } -} - // processAndVerifyCheque verifies the cheque and compares it with the last received cheque // if the cheque is valid it will also be saved as the new last cheque // the caller is expected to hold p.lock @@ -613,6 +593,7 @@ func (s *Swap) saveBalance(p enode.ID, balance int64) error { // Close cleans up swap func (s *Swap) Close() error { + s.txScheduler.Stop() return s.store.Close() } @@ -737,3 +718,15 @@ func (s *Swap) loadChequebook() (common.Address, error) { func (s *Swap) saveChequebook(chequebook common.Address) error { return s.store.Put(connectedChequebookKey, chequebook) } + +func (s *Swap) HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error { + metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64()) + + if result.Bounced { + metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1) + swapLog.Warn("cheque bounced", "tx", receipt.TxHash) + } + + swapLog.Info("cheque cashed", "cheque", &request.Cheque) + return nil +} diff --git a/swap/swap_test.go b/swap/swap_test.go index 01bf17d142..6f5fa94104 100644 --- a/swap/swap_test.go +++ b/swap/swap_test.go @@ -603,6 +603,7 @@ func TestResetBalance(t *testing.T) { defer testBackend.Close() // create both test swap accounts creditorSwap, clean1 := newTestSwap(t, beneficiaryKey, testBackend) + cashoutHandler := overrideCashoutResultHandler(creditorSwap) debitorSwap, clean2 := newTestSwap(t, ownerKey, testBackend) defer clean1() defer clean2() @@ -641,10 +642,6 @@ func TestResetBalance(t *testing.T) { t.Fatal(err) } - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() - // now simulate sending the cheque to the creditor from the debitor if err = creditor.sendCheque(); err != nil { t.Fatal(err) @@ -663,20 +660,18 @@ func TestResetBalance(t *testing.T) { if cheque == nil { t.Fatal("expected to find a cheque, but it was empty") } - // ...create a message... - msg := &EmitChequeMsg{ - Cheque: cheque, - } // ...and trigger message handling on the receiver side (creditor) // remember that debitor is the model of the remote node for the creditor... - err = creditorSwap.handleEmitChequeMsg(ctx, debitor, msg) + err = creditorSwap.handleEmitChequeMsg(ctx, debitor, &EmitChequeMsg{ + Cheque: cheque, + }) if err != nil { t.Fatal(err) } // ...on which we wait until the cashCheque is actually terminated (ensures proper nonce count) select { - case <-testBackend.cashDone: + case <-cashoutHandler.cashChequeDone: log.Debug("cash transaction completed and committed") case <-time.After(4 * time.Second): t.Fatalf("Timeout waiting for cash transactions to complete") @@ -692,8 +687,6 @@ func TestResetBalance(t *testing.T) { func TestDebtCheques(t *testing.T) { testBackend := newTestBackend(t) defer testBackend.Close() - cleanup := setupContractTest() - defer cleanup() creditorSwap, cleanup := newTestSwap(t, beneficiaryKey, testBackend) defer cleanup() @@ -745,14 +738,6 @@ func TestDebtCheques(t *testing.T) { if err != nil { t.Fatal(err) } - - // ...on which we wait until the cashCheque is actually terminated (ensures proper nonce count) - select { - case <-testBackend.cashDone: - log.Debug("cash transaction completed and committed") - case <-time.After(4 * time.Second): - t.Fatalf("Timeout waiting for cash transactions to complete") - } } // generate bookings based on parameters, apply them to a Swap struct and verify the result @@ -1303,10 +1288,6 @@ func TestSwapLogToFile(t *testing.T) { t.Fatal(err) } - // setup the wait for mined transaction function for testing - cleanup := setupContractTest() - defer cleanup() - // now simulate sending the cheque to the creditor from the debitor if err = creditor.sendCheque(); err != nil { t.Fatal(err) @@ -1360,8 +1341,6 @@ func TestAvailableBalance(t *testing.T) { defer testBackend.Close() swap, clean := newTestSwap(t, ownerKey, testBackend) defer clean() - cleanup := setupContractTest() - defer cleanup() depositAmount := uint256.FromUint64(9000 * RetrieveRequestPrice)