diff --git a/core/blockchain_test.go b/core/blockchain_test.go index 804919abd2..1e84868e9a 100644 --- a/core/blockchain_test.go +++ b/core/blockchain_test.go @@ -17,12 +17,14 @@ package core import ( + "encoding/binary" "errors" "fmt" "io/ioutil" "math/big" "math/rand" "os" + "os/exec" "sync" "testing" "time" @@ -40,6 +42,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/ethash" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" + "github.com/ethereum/go-ethereum/core/state/snapshot" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/crypto" @@ -4312,3 +4315,170 @@ func TestSidecarsPruning(t *testing.T) { } } } + +func TestBlockChain_2000StorageUpdate(t *testing.T) { + var ( + numTxs = 2000 + signer = types.HomesteadSigner{} + testBankKey, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") + testBankAddress = crypto.PubkeyToAddress(testBankKey.PublicKey) + bankFunds = big.NewInt(100000000000000000) + contractAddress = common.HexToAddress("0x1234") + gspec = Genesis{ + Config: params.TestChainConfig, + Alloc: GenesisAlloc{ + testBankAddress: {Balance: bankFunds}, + contractAddress: { + Nonce: 1, + Balance: common.Big0, + // Store 1 into slot passed by calldata + Code: []byte{ + byte(vm.PUSH0), + byte(vm.CALLDATALOAD), + byte(vm.PUSH1), + byte(0x1), + byte(vm.SWAP1), + byte(vm.SSTORE), + byte(vm.STOP), + }, + Storage: make(map[common.Hash]common.Hash), + }, + }, + GasLimit: 100e6, // 100 M + } + ) + + for i := 0; i < 1000; i++ { + gspec.Alloc[contractAddress].Storage[common.BigToHash(big.NewInt(int64(i)))] = common.BigToHash(big.NewInt(0x100)) + } + + // Generate the original common chain segment and the two competing forks + engine := ethash.NewFaker() + db := rawdb.NewMemoryDatabase() + genesis := gspec.MustCommit(db) + + blockGenerator := func(i int, block *BlockGen) { + block.SetCoinbase(common.Address{1}) + for txi := 0; txi < numTxs; txi++ { + var calldata [32]byte + binary.BigEndian.PutUint64(calldata[:], uint64(txi)) + tx, err := types.SignTx( + types.NewTransaction(uint64(txi), contractAddress, common.Big0, 100_000, + block.header.BaseFee, calldata[:]), + signer, + testBankKey) + if err != nil { + t.Error(err) + } + block.AddTx(tx) + } + } + + shared, _ := GenerateChain(params.TestChainConfig, genesis, engine, db, 1, blockGenerator, true) + err := os.Mkdir("./pebble", 0775) + if err != nil { + t.Fatal(err) + } + defer os.RemoveAll("./pebble") + // Import the shared chain and the original canonical one + diskdb, err := rawdb.NewPebbleDBDatabase("./pebble", 1024, 500000, "", false, false) + if err != nil { + t.Fatal(err) + } + defer diskdb.Close() + gspec.MustCommit(diskdb) + + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) + if err != nil { + t.Fatalf("failed to create tester chain: %v", err) + } + if _, err := chain.InsertChain(shared, nil); err != nil { + t.Fatalf("failed to insert shared chain: %v", err) + } + + blockHash := chain.CurrentBlock().Hash() + if blockHash != (common.HexToHash("0x684f656efba5a77f0e8b4c768a2b3479b28250fd7b81dbb9a888abf6180b01bd")) { + t.Fatalf("Block hash mismatches, exp %s got %s", common.Hash{}, blockHash) + } +} + +// This benchmark is intended to be used with mainnet data, so mainnet chaindata's directory +// is needed to run this benchmark +func BenchmarkManyStorageUpdate(b *testing.B) { + const ( + // Fill the chaindata's parent directory + datadir = "" + numInsert = state.ParallelInsertThreshold + 1 + ) + + var ( + diskdb ethdb.Database + err error + axieContract = common.HexToAddress("0x32950db2a7164ae833121501c797d79e7b79d74c") + value = common.HexToHash("0x11") + ) + defer func() { + if diskdb != nil { + diskdb.Close() + cmd := exec.Command("../script/overlayfs_chaindata.sh", "-d", datadir, "-c") + if err := cmd.Run(); err != nil { + b.Fatal(err) + } + } + }() + + keys := make([]common.Hash, 0, numInsert) + for i := 0; i < numInsert; i++ { + hash := crypto.Keccak256Hash(big.NewInt(int64(i)).Bytes()) + keys = append(keys, hash) + } + + b.StopTimer() + b.ResetTimer() + for i := 0; i < b.N; i++ { + cmd := exec.Command("../script/overlayfs_chaindata.sh", "-d", datadir) + if err := cmd.Run(); err != nil { + b.Fatal(err) + } + + diskdb, err = rawdb.NewPebbleDBDatabase(datadir+"/chaindata", 1024, 500000, "", false, false) + if err != nil { + b.Fatal(err) + } + + engine := ethash.NewFaker() + chain, err := NewBlockChain(diskdb, nil, params.TestChainConfig, engine, vm.Config{}, nil, nil) + if err != nil { + b.Fatalf("failed to create tester chain: %v", err) + } + headBlock := chain.CurrentBlock() + + database := state.NewDatabase(diskdb) + snapshot, err := snapshot.New(diskdb, database.TrieDB(), 256, headBlock.Root(), true, true, false) + if err != nil { + b.Fatal(err) + } + + statedb, err := state.New(headBlock.Root(), database, snapshot) + if err != nil { + b.Fatal(err) + } + + b.StartTimer() + for i := 0; i < numInsert; i++ { + statedb.SetState(axieContract, keys[i], value) + } + _, err = statedb.Commit(true) + if err != nil { + b.Fatal(err) + } + b.StopTimer() + + diskdb.Close() + cmd = exec.Command("../script/overlayfs_chaindata.sh", "-d", datadir, "-c") + if err := cmd.Run(); err != nil { + b.Fatal(err) + } + diskdb = nil + } +} diff --git a/core/state/state_object.go b/core/state/state_object.go index 22d90b8420..7aa4d7a556 100644 --- a/core/state/state_object.go +++ b/core/state/state_object.go @@ -28,10 +28,13 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" + "github.com/ethereum/go-ethereum/trie" ) var emptyCodeHash = crypto.Keccak256(nil) +const ParallelInsertThreshold = 500 + type Code []byte func (c Code) String() string { @@ -338,6 +341,16 @@ func (s *stateObject) updateTrie(db Database) Trie { tr := s.getTrie(db) hasher := s.db.hasher + var ( + parallelInsert, ok bool + secureTrie *trie.SecureTrie + keys, values [][]byte + ) + if len(s.pendingStorage) > ParallelInsertThreshold { + if secureTrie, ok = tr.(*trie.SecureTrie); ok { + parallelInsert = true + } + } usedStorage := make([][]byte, 0, len(s.pendingStorage)) for key, value := range s.pendingStorage { // Skip noop changes, persist actual changes @@ -353,8 +366,14 @@ func (s *stateObject) updateTrie(db Database) Trie { } else { // Encoding []byte cannot fail, ok to ignore the error. v, _ = rlp.EncodeToBytes(common.TrimLeftZeroes(value[:])) - s.setError(tr.TryUpdate(key[:], v)) s.db.StorageUpdated += 1 + if parallelInsert { + key := key + keys = append(keys, key[:]) + values = append(values, v) + } else { + s.setError(tr.TryUpdate(key[:], v)) + } } // If state snapshotting is active, cache the data til commit if s.db.snap != nil { @@ -369,6 +388,9 @@ func (s *stateObject) updateTrie(db Database) Trie { } usedStorage = append(usedStorage, common.CopyBytes(key[:])) // Copy needed for closure } + if parallelInsert && len(keys) > 0 { + s.setError(secureTrie.TryBatchInsert(keys, values)) + } if s.db.prefetcher != nil { s.db.prefetcher.used(s.data.Root, usedStorage) } diff --git a/trie/secure_trie.go b/trie/secure_trie.go index 18be12d34a..12f8431cbb 100644 --- a/trie/secure_trie.go +++ b/trie/secure_trie.go @@ -132,6 +132,28 @@ func (t *SecureTrie) TryUpdate(key, value []byte) error { return nil } +// TryBatchInsert batches multiple insert together. +func (t *SecureTrie) TryBatchInsert(keys, values [][]byte) error { + hashKeys := make([][]byte, 0, len(keys)) + for i := range keys { + hk := t.hashKey(keys[i]) + // t.hashKey does not return a new slice but an shared internal slice, + // so we must copy here + hashKeys = append(hashKeys, common.CopyBytes(hk)) + } + + err := t.trie.TryBatchInsert(hashKeys, values) + if err != nil { + return err + } + + for i, hashKey := range hashKeys { + t.getSecKeyCache()[string(hashKey)] = common.CopyBytes(keys[i]) + } + + return nil +} + // Delete removes any existing value for key from the trie. func (t *SecureTrie) Delete(key []byte) { if err := t.TryDelete(key); err != nil { diff --git a/trie/trie.go b/trie/trie.go index 13343112b8..55bdfadfe7 100644 --- a/trie/trie.go +++ b/trie/trie.go @@ -283,6 +283,88 @@ func (t *Trie) TryUpdate(key, value []byte) error { return nil } +// TryBatchInsert batches multiple insert together. +// +// When the root node after resolving is a fullnode, TryBatchInsert will split +// the key, value list based on the first byte of key and spawn multiple +// goroutines to insert these lists parallel. +func (t *Trie) TryBatchInsert(keys, values [][]byte) error { + t.unhashed += len(keys) + + var ( + resolvedNode node = t.root + err error + ) + if node, ok := t.root.(hashNode); ok { + resolvedNode, err = t.resolveHash(node, nil) + if err != nil { + return err + } + } + + if fnode, ok := resolvedNode.(*fullNode); ok { + type insertTask struct { + key []byte + value []byte + } + + var insertTasks [17][]insertTask + for i := range keys { + k := keybytesToHex(keys[i]) + insertTasks[k[0]] = append(insertTasks[k[0]], insertTask{ + key: k, + value: values[i], + }) + } + + var ( + wg sync.WaitGroup + returnedNodes [17]node + errors [17]error + ) + wg.Add(17) + for i, tasks := range insertTasks { + go func(index int, tasks []insertTask) { + defer wg.Done() + + var err error + taskNode := fnode.Children[index] + for _, task := range tasks { + _, taskNode, err = t.insert(taskNode, []byte{byte(index)}, task.key[1:], valueNode(task.value)) + if err != nil { + break + } + } + + errors[index] = err + returnedNodes[index] = taskNode + }(i, tasks) + } + + wg.Wait() + for _, err := range errors { + if err != nil { + return err + } + } + var newFullNode fullNode + copy(newFullNode.Children[:], returnedNodes[:]) + newFullNode.flags = t.newFlag() + t.root = &newFullNode + } else { + for i := range keys { + k := keybytesToHex(keys[i]) + _, n, err := t.insert(t.root, nil, k, valueNode(values[i])) + if err != nil { + return err + } + t.root = n + } + } + + return nil +} + func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) { if len(key) == 0 { if v, ok := n.(valueNode); ok { diff --git a/trie/trie_test.go b/trie/trie_test.go index 806a8cc634..6df5a38b81 100644 --- a/trie/trie_test.go +++ b/trie/trie_test.go @@ -182,6 +182,79 @@ func TestInsert(t *testing.T) { } } +func TestBatchUpdate(t *testing.T) { + trie := newEmpty() + + trie.TryBatchInsert([][]byte{[]byte("doe")}, [][]byte{[]byte("reindeer")}) + trie.TryBatchInsert( + [][]byte{[]byte("dog"), []byte("dogglesworth")}, + [][]byte{[]byte("puppy"), []byte("cat")}, + ) + + exp := common.HexToHash("8aad789dff2f538bca5d8ea56e8abe10f4c7ba3a5dea95fea4cd6e7c3a1168d3") + root := trie.Hash() + if root != exp { + t.Errorf("case 1: exp %x got %x", exp, root) + } + + trie = newEmpty() + // Make root node a fullnode + trie.TryBatchInsert( + [][]byte{[]byte("doe"), []byte("cat")}, + [][]byte{[]byte("reindeer"), []byte("reindeer")}, + ) + + trie.TryBatchInsert( + [][]byte{[]byte("wolf"), []byte("dog"), []byte("dogglesworth"), []byte("mouse")}, + [][]byte{[]byte("reindeer"), []byte("reindeer"), []byte("cat"), []byte("reindeer")}, + ) + + exp = common.HexToHash("96baa01a6376b285252d202de862d889ca05de308ed1c68cf442ffc4b9036988") + root = trie.Hash() + if root != exp { + t.Errorf("case 2: exp %x got %x", exp, root) + } +} + +func FuzzBatchUpdate(f *testing.F) { + // Every 64-byte chunk will be use as a key-value pair + // Key is the first 32 bytes, value is the remainning 32 bytes + f.Add([]byte("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")) + input := append( + []byte("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"), + []byte("CAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")..., + ) + input = append(input, []byte("DAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB")...) + f.Add(input) + f.Fuzz(func(t *testing.T, fuzzInput []byte) { + if len(fuzzInput) <= 64 || len(fuzzInput)%64 != 0 { + return + } + + keys := make([][]byte, 0) + values := make([][]byte, 0) + for i := 0; i < len(fuzzInput); i += 64 { + keys = append(keys, fuzzInput[i:i+32]) + values = append(values, fuzzInput[i+32:i+64]) + } + + trie := newEmpty() + for i := range keys { + trie.Update(keys[i], values[i]) + } + + expectedHash := trie.Hash() + + trie = newEmpty() + trie.TryBatchInsert(keys, values) + gotHash := trie.Hash() + + if gotHash != expectedHash { + t.Fatalf("Trie hash mismatches, exp: %s got %s", expectedHash, gotHash) + } + }) +} + func TestGet(t *testing.T) { trie := newEmpty() updateString(trie, "doe", "reindeer") @@ -519,6 +592,44 @@ func benchUpdate(b *testing.B, e binary.ByteOrder) *Trie { return trie } +func benchmarkManyInserts(b *testing.B, f func(trie *Trie, keys [][]byte, values [][]byte)) { + numUpdates := 10000 + + k := make([]byte, 32) + + keys := make([][]byte, 0, numUpdates) + for i := 0; i < numUpdates; i++ { + binary.BigEndian.PutUint64(k, uint64(i)) + hashKey := crypto.Keccak256Hash(k) + keys = append(keys, hashKey[:]) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + trie := newEmpty() + // Make trie root a fullnode + updateString(trie, "AAA", "BBB") + updateString(trie, "ZAA", "BBB") + b.StartTimer() + f(trie, keys, keys) + } +} + +func BenchmarkNormalInsert(b *testing.B) { + benchmarkManyInserts(b, func(trie *Trie, keys, values [][]byte) { + for i := range keys { + trie.Update(keys[i], values[i]) + } + }) +} + +func BenchmarkBatchInsert(b *testing.B) { + benchmarkManyInserts(b, func(trie *Trie, keys, values [][]byte) { + trie.TryBatchInsert(keys, values) + }) +} + // Benchmarks the trie hashing. Since the trie caches the result of any operation, // we cannot use b.N as the number of hashing rouns, since all rounds apart from // the first one will be NOOP. As such, we'll use b.N as the number of account to