diff --git a/.gitignore b/.gitignore index 2fc6232a0..d57e073df 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,7 @@ out cache **/build **/target +**/CLAUDE.md scripts/test_files/gnark_groth16_bn254_infinite_script/infinite_proofs/** crates/cli/batch_inclusion_responses/* **/aligned_verification_data @@ -35,3 +36,6 @@ witness.wtns *.zkey circuit_cpp/ circuit_js + +# Claude Code files +CLAUDE.md diff --git a/Makefile b/Makefile index 7ff35a4c4..769d8cdc9 100644 --- a/Makefile +++ b/Makefile @@ -748,6 +748,18 @@ task_sender_test_connections_holesky_stage: --num-senders $(NUM_SENDERS) \ --network holesky-stage +# ===== SEPOLIA ===== +task_sender_generate_and_fund_wallets_sepolia: + @cd crates/task-sender && \ + cargo run --release -- generate-and-fund-wallets \ + --eth-rpc-url https://ethereum-sepolia-rpc.publicnode.com \ + --network sepolia \ + --funding-wallet-private-key $(FUNDING_WALLET_PRIVATE_KEY) \ + --number-wallets $(NUM_WALLETS) \ + --amount-to-deposit $(AMOUNT_TO_DEPOSIT) \ + --amount-to-deposit-to-aligned $(AMOUNT_TO_DEPOSIT_TO_ALIGNED) \ + --private-keys-filepath $(CURDIR)/crates/task-sender/wallets/sepolia + __UTILS__: aligned_get_user_balance_devnet: @cd crates/cli/ && cargo run --release -- get-user-balance \ diff --git a/aggregation_mode/aggregation_programs/sp1/Cargo.lock b/aggregation_mode/aggregation_programs/sp1/Cargo.lock new file mode 100644 index 000000000..20a817514 --- /dev/null +++ b/aggregation_mode/aggregation_programs/sp1/Cargo.lock @@ -0,0 +1,734 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "arrayref" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76a2e8124351fda1ef8aaaa3bbd7ebbcb486bbcd4225aca0aa0d84bb2db8fecb" + +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "bincode" +version = "1.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" +dependencies = [ + "serde", +] + +[[package]] +name = "blake3" +version = "1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3888aaa89e4b2a40fca9848e400f6a658a5a3978de7be858e209cafa8be9a4a0" +dependencies = [ + "arrayref", + "arrayvec", + "cc", + "cfg-if", + "constant_time_eq", +] + +[[package]] +name = "block-buffer" +version = "0.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" +dependencies = [ + "generic-array", +] + +[[package]] +name = "bumpalo" +version = "3.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1b094a32014c3d1f3944e4808e0e7c70e97dae0660886a8eb6dbc52d745badc" + +[[package]] +name = "cc" +version = "1.2.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0fc897dc1e865cc67c0e05a836d9d3f1df3cbe442aa4a9473b18e12624a4951" +dependencies = [ + "shlex", +] + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "constant_time_eq" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c74b8349d32d297c9134b8c88677813a227df8f779daa29bfc29c183fe3dca6" + +[[package]] +name = "cpufeatures" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" +dependencies = [ + "libc", +] + +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + +[[package]] +name = "digest" +version = "0.10.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" +dependencies = [ + "block-buffer", + "crypto-common", +] + +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + +[[package]] +name = "gcd" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d758ba1b47b00caf47f24925c0074ecb20d6dfcffe7f6d53395c0465674841a" + +[[package]] +name = "generic-array" +version = "0.14.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85649ca51fd72272d7821adaf274ad91c288277713d9c18820d8499a7ff69e9a" +dependencies = [ + "typenum", + "version_check", +] + +[[package]] +name = "getrandom" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" +dependencies = [ + "cfg-if", + "js-sys", + "libc", + "wasi", + "wasm-bindgen", +] + +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + +[[package]] +name = "js-sys" +version = "0.3.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f" +dependencies = [ + "once_cell", + "wasm-bindgen", +] + +[[package]] +name = "keccak" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc2af9a1119c51f12a14607e783cb977bde58bc069ff0c3da1095e635d70654" +dependencies = [ + "cpufeatures", +] + +[[package]] +name = "lambdaworks-crypto" +version = "0.12.0" +source = "git+https://github.com/lambdaclass/lambdaworks.git?rev=5f8f2cfcc8a1a22f77e8dff2d581f1166eefb80b#5f8f2cfcc8a1a22f77e8dff2d581f1166eefb80b" +dependencies = [ + "lambdaworks-math", + "rand", + "rand_chacha", + "serde", + "sha2 0.10.9", + "sha3 0.10.8 (registry+https://github.com/rust-lang/crates.io-index)", +] + +[[package]] +name = "lambdaworks-math" +version = "0.12.0" +source = "git+https://github.com/lambdaclass/lambdaworks.git?rev=5f8f2cfcc8a1a22f77e8dff2d581f1166eefb80b#5f8f2cfcc8a1a22f77e8dff2d581f1166eefb80b" +dependencies = [ + "getrandom", + "rand", + "serde", + "serde_json", +] + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + +[[package]] +name = "libc" +version = "0.2.172" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" + +[[package]] +name = "libm" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" + +[[package]] +name = "log" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "num-bigint" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a5e44f723f1133c9deac646763579fdb3ac745e418f2a7af9cd0c431da1f20b9" +dependencies = [ + "num-integer", + "num-traits", +] + +[[package]] +name = "num-integer" +version = "0.1.46" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7969661fd2958a5cb096e56c8e1ad0444ac2bbcd0061bd28660485a44879858f" +dependencies = [ + "num-traits", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + +[[package]] +name = "once_cell" +version = "1.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" + +[[package]] +name = "p3-baby-bear" +version = "0.2.3-succinct" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7521838ecab2ddf4f7bc4ceebad06ec02414729598485c1ada516c39900820e8" +dependencies = [ + "num-bigint", + "p3-field", + "p3-mds", + "p3-poseidon2", + "p3-symmetric", + "rand", + "serde", +] + +[[package]] +name = "p3-dft" +version = "0.2.3-succinct" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46414daedd796f1eefcdc1811c0484e4bced5729486b6eaba9521c572c76761a" +dependencies = [ + "p3-field", + "p3-matrix", + "p3-maybe-rayon", + "p3-util", + "tracing", +] + +[[package]] +name = "p3-field" +version = "0.2.3-succinct" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48948a0516b349e9d1cdb95e7236a6ee010c44e68c5cc78b4b92bf1c4022a0d9" +dependencies = [ + "itertools", + "num-bigint", + "num-traits", + "p3-util", + "rand", + "serde", +] + +[[package]] +name = "p3-matrix" +version = "0.2.3-succinct" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e4de3f373589477cb735ea58e125898ed20935e03664b4614c7fac258b3c42f" +dependencies = [ + "itertools", + "p3-field", + "p3-maybe-rayon", + "p3-util", + "rand", + "serde", + "tracing", +] + +[[package]] +name = "p3-maybe-rayon" +version = "0.2.3-succinct" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3968ad1160310296eb04f91a5f4edfa38fe1d6b2b8cd6b5c64e6f9b7370979e" + +[[package]] +name = "p3-mds" +version = "0.2.3-succinct" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2356b1ed0add6d5dfbf7a338ce534a6fde827374394a52cec16a0840af6e97c9" +dependencies = [ + "itertools", + "p3-dft", + "p3-field", + "p3-matrix", + "p3-symmetric", + "p3-util", + "rand", +] + +[[package]] +name = "p3-poseidon2" +version = "0.2.3-succinct" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da1eec7e1b6900581bedd95e76e1ef4975608dd55be9872c9d257a8a9651c3a" +dependencies = [ + "gcd", + "p3-field", + "p3-mds", + "p3-symmetric", + "rand", + "serde", +] + +[[package]] +name = "p3-symmetric" +version = "0.2.3-succinct" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edb439bea1d822623b41ff4b51e3309e80d13cadf8b86d16ffd5e6efb9fdc360" +dependencies = [ + "itertools", + "p3-field", + "serde", +] + +[[package]] +name = "p3-util" +version = "0.2.3-succinct" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c2c2010678b9332b563eaa38364915b585c1a94b5ca61e2c7541c087ddda5c" +dependencies = [ + "serde", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" + +[[package]] +name = "ppv-lite86" +version = "0.2.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "proc-macro2" +version = "1.0.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02b3e5e68a3a1a02aad3ec490a98007cbc13c37cbe84a3cd7b8e406d76e7f778" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quote" +version = "1.0.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1885c039570dc00dcb4ff087a89e185fd56bae234ddc7f056a945bf36467248d" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "ryu" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" + +[[package]] +name = "serde" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f0e2c6ed6606019b4e29e69dbaba95b11854410e5347d525002456dbbb786b6" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.219" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b0276cf7f2c73365f7157c8123c21cd9a50fbbd844757af28ca1f5925fc2a00" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.140" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "sha2" +version = "0.10.8" +source = "git+https://github.com/sp1-patches/RustCrypto-hashes?tag=sha2-v0.10.8-patch-v1#1f224388fdede7cef649bce0d63876d1a9e3f515" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + +[[package]] +name = "sha2" +version = "0.10.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" +dependencies = [ + "cfg-if", + "cpufeatures", + "digest", +] + +[[package]] +name = "sha3" +version = "0.10.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75872d278a8f37ef87fa0ddbda7802605cb18344497949862c0d4dcb291eba60" +dependencies = [ + "digest", + "keccak", +] + +[[package]] +name = "sha3" +version = "0.10.8" +source = "git+https://github.com/sp1-patches/RustCrypto-hashes?tag=sha3-v0.10.8-patch-v1#8f6d303c0861ba7e5adcc36207c0f41fe5edaabc" +dependencies = [ + "digest", + "keccak", +] + +[[package]] +name = "shlex" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" + +[[package]] +name = "sp1-lib" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03046db52868c1b60e8acffa0777ef6dc11ec1bbbb10b9eb612a871f69c8d3f6" +dependencies = [ + "bincode", + "serde", + "sp1-primitives", +] + +[[package]] +name = "sp1-primitives" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6939d6b2f63e54e5fbd208a0293027608f22511741b62fe32b6f67f6c144e0c0" +dependencies = [ + "bincode", + "blake3", + "cfg-if", + "hex", + "lazy_static", + "num-bigint", + "p3-baby-bear", + "p3-field", + "p3-poseidon2", + "p3-symmetric", + "serde", + "sha2 0.10.9", +] + +[[package]] +name = "sp1-zkvm" +version = "5.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16e69fef4d915b10072461e52fd616ca2625409ede7b37a36ec910e1a52bd860" +dependencies = [ + "cfg-if", + "getrandom", + "lazy_static", + "libm", + "p3-baby-bear", + "p3-field", + "rand", + "sha2 0.10.9", + "sp1-lib", + "sp1-primitives", +] + +[[package]] +name = "sp1_aggregation_program" +version = "0.1.0" +dependencies = [ + "lambdaworks-crypto", + "serde", + "serde_json", + "sha2 0.10.8", + "sha3 0.10.8 (git+https://github.com/sp1-patches/RustCrypto-hashes?tag=sha3-v0.10.8-patch-v1)", + "sp1-zkvm", +] + +[[package]] +name = "syn" +version = "2.0.101" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ce2b7fc941b3a24138a0a7cf8e858bfc6a992e7978a068a5c760deb0ed43caf" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "tracing" +version = "0.1.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" +dependencies = [ + "once_cell", +] + +[[package]] +name = "typenum" +version = "1.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" + +[[package]] +name = "unicode-ident" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a5f39404a5da50712a4c1eecf25e90dd62b613502b7e925fd4e4d19b5c96512" + +[[package]] +name = "version_check" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "wasm-bindgen" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6" +dependencies = [ + "bumpalo", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.100" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "zerocopy" +version = "0.8.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1702d9583232ddb9174e01bb7c15a2ab8fb1bc6f227aa1233858c351a3ba0cb" +dependencies = [ + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.8.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28a6e20d751156648aa063f3800b706ee209a32c0b4d9f24be3d980b01be55ef" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/crates/Cargo.lock b/crates/Cargo.lock index 5ed416f56..744392b74 100644 --- a/crates/Cargo.lock +++ b/crates/Cargo.lock @@ -103,6 +103,7 @@ dependencies = [ "bytes", "ciborium", "clap", + "dashmap", "dotenvy", "env_logger", "ethers", @@ -2039,6 +2040,20 @@ dependencies = [ "syn 2.0.100", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "dashu" version = "0.4.2" diff --git a/crates/batcher/Cargo.toml b/crates/batcher/Cargo.toml index af7909285..0b8662dd4 100644 --- a/crates/batcher/Cargo.toml +++ b/crates/batcher/Cargo.toml @@ -29,6 +29,7 @@ aligned-sdk = { path = "../sdk" } ciborium = "=0.2.2" priority-queue = "2.1.0" reqwest = { version = "0.12", features = ["json"] } +dashmap = "6.0.1" once_cell = "1.20.2" warp = "0.3.7" diff --git a/crates/batcher/src/connection.rs b/crates/batcher/src/connection.rs index 946922db3..b32e511c5 100644 --- a/crates/batcher/src/connection.rs +++ b/crates/batcher/src/connection.rs @@ -18,7 +18,7 @@ use tokio_tungstenite::{ pub(crate) type WsMessageSink = Arc, Message>>>; pub(crate) async fn send_batch_inclusion_data_responses( - finalized_batch: Vec, + finalized_batch: &[BatchQueueEntry], batch_merkle_tree: &MerkleTree, ) -> Result<(), BatcherError> { // Finalized_batch is ordered as the PriorityQueue, ordered by: ascending max_fee && if max_fee is equal, by descending nonce. diff --git a/crates/batcher/src/lib.rs b/crates/batcher/src/lib.rs index a3d0a670e..8cda4a8a7 100644 --- a/crates/batcher/src/lib.rs +++ b/crates/batcher/src/lib.rs @@ -17,6 +17,7 @@ use types::batch_state::BatchState; use types::user_state::UserState; use batch_queue::calculate_batch_size; +use dashmap::DashMap; use std::collections::HashMap; use std::env; use std::net::SocketAddr; @@ -72,6 +73,7 @@ mod zk_utils; pub const LISTEN_NEW_BLOCKS_MAX_TIMES: usize = usize::MAX; pub struct Batcher { + // Configuration parameters s3_client: S3Client, s3_bucket_name: String, download_endpoint: String, @@ -84,19 +86,34 @@ pub struct Batcher { payment_service_fallback: BatcherPaymentService, service_manager: ServiceManager, service_manager_fallback: ServiceManager, - batch_state: Mutex, min_block_interval: u64, transaction_wait_timeout: u64, max_proof_size: usize, max_batch_byte_size: usize, max_batch_proof_qty: usize, - last_uploaded_batch_block: Mutex, pre_verification_is_enabled: bool, non_paying_config: Option, - posting_batch: Mutex, - disabled_verifiers: Mutex, aggregator_fee_percentage_multiplier: u128, aggregator_gas_cost: u128, + + // Shared state (Mutex) + /// The general business rule is: + /// - User processing can be done in parallel unless a batch creation is happening + /// - Batch creation needs to be able to change all the states, so all processing + /// needs to be stopped, and all user_states locks need to be taken + batch_state: Mutex, + user_states: DashMap>>, + + last_uploaded_batch_block: Mutex, + + /// This is used to avoid multiple batches being submitted at the same time + /// It could be removed in the future by changing how we spawn + /// the batch creation task + posting_batch: Mutex, + + disabled_verifiers: Mutex, + + // Observability and monitoring pub metrics: metrics::BatcherMetrics, pub telemetry: TelemetrySender, } @@ -211,8 +228,8 @@ impl Batcher { .await .expect("Failed to get fallback Service Manager contract"); - let mut user_states = HashMap::new(); - let mut batch_state = BatchState::new(config.batcher.max_queue_size); + let user_states = DashMap::new(); + let batch_state = BatchState::new(config.batcher.max_queue_size); let non_paying_config = if let Some(non_paying_config) = config.batcher.non_paying { warn!("Non-paying address configuration detected. Will replace non-paying address {} with configured address.", non_paying_config.address); @@ -227,11 +244,9 @@ impl Batcher { let non_paying_user_state = UserState::new(nonpaying_nonce); user_states.insert( non_paying_config.replacement.address(), - non_paying_user_state, + Arc::new(Mutex::new(non_paying_user_state)), ); - batch_state = - BatchState::new_with_user_states(user_states, config.batcher.max_queue_size); Some(non_paying_config) } else { None @@ -275,12 +290,90 @@ impl Batcher { aggregator_gas_cost: config.batcher.aggregator_gas_cost, posting_batch: Mutex::new(false), batch_state: Mutex::new(batch_state), + user_states, disabled_verifiers: Mutex::new(disabled_verifiers), metrics, telemetry, } } + fn update_evicted_user_state_with_lock( + &self, + removed_entry: &types::batch_queue::BatchQueueEntry, + batch_queue: &types::batch_queue::BatchQueue, + user_state_guard: &mut tokio::sync::MutexGuard<'_, crate::types::user_state::UserState>, + ) { + let addr = removed_entry.sender; + + let new_last_max_fee_limit = match batch_queue + .iter() + .filter(|(e, _)| e.sender == addr) + .next_back() + { + Some((last_entry, _)) => last_entry.nonced_verification_data.max_fee, + None => { + self.user_states.remove(&addr); + return; + } + }; + + user_state_guard.proofs_in_batch -= 1; + user_state_guard.nonce -= U256::one(); + user_state_guard.total_fees_in_queue -= removed_entry.nonced_verification_data.max_fee; + user_state_guard.last_max_fee_limit = new_last_max_fee_limit; + } + + // Fallback async version for restoration path where we don't have pre-held locks + async fn update_evicted_user_state_async( + &self, + removed_entry: &types::batch_queue::BatchQueueEntry, + batch_queue: &types::batch_queue::BatchQueue, + ) -> Option<()> { + let addr = removed_entry.sender; + + let new_last_max_fee_limit = match batch_queue + .iter() + .filter(|(e, _)| e.sender == addr) + .next_back() + { + Some((last_entry, _)) => last_entry.nonced_verification_data.max_fee, + None => { + self.user_states.remove(&addr); + return Some(()); + } + }; + + let user_state = self.user_states.get(&addr)?; + let mut user_state_guard = user_state.lock().await; + user_state_guard.proofs_in_batch -= 1; + user_state_guard.nonce -= U256::one(); + user_state_guard.total_fees_in_queue -= removed_entry.nonced_verification_data.max_fee; + user_state_guard.last_max_fee_limit = new_last_max_fee_limit; + Some(()) + } + + fn calculate_new_user_states_data( + &self, + batch_queue: &types::batch_queue::BatchQueue, + ) -> HashMap { + let mut updated_user_states = HashMap::new(); + for (entry, _) in batch_queue.iter() { + let addr = entry.sender; + let max_fee = entry.nonced_verification_data.max_fee; + + let (proof_count, max_fee_limit, total_fees_in_queue) = updated_user_states + .entry(addr) + .or_insert((0, max_fee, U256::zero())); + + *proof_count += 1; + *total_fees_in_queue += max_fee; + if max_fee < *max_fee_limit { + *max_fee_limit = max_fee; + } + } + updated_user_states + } + pub async fn listen_connections(self: Arc, address: &str) -> Result<(), BatcherError> { // Create the event loop and TCP listener we'll accept connections on. let listener = TcpListener::bind(address) @@ -563,8 +656,14 @@ impl Batcher { } let cached_user_nonce = { - let batch_state_lock = self.batch_state.lock().await; - batch_state_lock.get_user_nonce(&address).await + let user_state_ref = self.user_states.get(&address); + match user_state_ref { + Some(user_state_ref) => { + let user_state_guard = user_state_ref.lock().await; + Some(user_state_guard.nonce) + } + None => None, + } }; let user_nonce = if let Some(user_nonce) = cached_user_nonce { @@ -662,48 +761,6 @@ impl Batcher { nonced_verification_data = aux_verification_data } - // When pre-verification is enabled, batcher will verify proofs for faster feedback with clients - if self.pre_verification_is_enabled { - let verification_data = &nonced_verification_data.verification_data; - if self - .is_verifier_disabled(verification_data.proving_system) - .await - { - warn!( - "Verifier for proving system {} is disabled, skipping verification", - verification_data.proving_system - ); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::DisabledVerifier( - verification_data.proving_system, - )), - ) - .await; - self.metrics.user_error(&[ - "disabled_verifier", - &format!("{}", verification_data.proving_system), - ]); - return Ok(()); - } - - if !zk_utils::verify(verification_data).await { - error!("Invalid proof detected. Verification failed"); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::RejectedProof), - ) - .await; - self.metrics.user_error(&[ - "rejected_proof", - &format!("{}", verification_data.proving_system), - ]); - return Ok(()); - } - } - - info!("Handling message"); - // We don't need a batch state lock here, since if the user locks its funds // after the check, some blocks should pass until he can withdraw. // It is safe to do just do this here. @@ -711,16 +768,36 @@ impl Batcher { return Ok(()); } + info!("Handling message, locking user state"); + // We acquire the lock first only to query if the user is already present and the lock is dropped. // If it was not present, then the user nonce is queried to the Aligned contract. // Lastly, we get a lock of the batch state again and insert the user state if it was still missing. - let is_user_in_state: bool; - { - let batch_state_lock = self.batch_state.lock().await; - is_user_in_state = batch_state_lock.user_states.contains_key(&addr); + let is_user_in_state = self.user_states.contains_key(&addr); + + if !is_user_in_state { + // We add a dummy user state to grab a lock on the user state + let dummy_user_state = UserState::new(U256::zero()); + self.user_states + .insert(addr, Arc::new(Mutex::new(dummy_user_state))); } + let Some(user_state_ref) = self.user_states.get(&addr) else { + error!("This should never happen, user state has previously been inserted if it didn't exist"); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::AddToBatchError, + ) + .await; + self.metrics.user_error(&["batcher_state_error", ""]); + return Ok(()); + }; + + // We acquire the lock on the user state, now everything will be processed sequentially + let mut user_state_guard = user_state_ref.lock().await; + + // If the user state was not present, we need to get the nonce from the Ethereum contract and update the dummy user state if !is_user_in_state { let ethereum_user_nonce = match self.get_user_nonce_from_ethereum(addr).await { Ok(ethereum_user_nonce) => ethereum_user_nonce, @@ -737,12 +814,8 @@ impl Batcher { return Ok(()); } }; - let user_state = UserState::new(ethereum_user_nonce); - let mut batch_state_lock = self.batch_state.lock().await; - batch_state_lock - .user_states - .entry(addr) - .or_insert(user_state); + // Update the dummy user state with the correct nonce + user_state_guard.nonce = ethereum_user_nonce; } // * ---------------------------------------------------* @@ -760,40 +833,12 @@ impl Batcher { return Ok(()); }; - // For now on until the message is fully processed, the batch state is locked - // This is needed because we need to query the user state to make validations and - // finally add the proof to the batch queue. - - let mut batch_state_lock = self.batch_state.lock().await; - let msg_max_fee = nonced_verification_data.max_fee; - let Some(user_last_max_fee_limit) = - batch_state_lock.get_user_last_max_fee_limit(&addr).await - else { - std::mem::drop(batch_state_lock); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, - ) - .await; - self.metrics.user_error(&["batcher_state_error", ""]); - return Ok(()); - }; + let user_last_max_fee_limit = user_state_guard.last_max_fee_limit; - let Some(user_accumulated_fee) = batch_state_lock.get_user_total_fees_in_queue(&addr).await - else { - std::mem::drop(batch_state_lock); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, - ) - .await; - self.metrics.user_error(&["batcher_state_error", ""]); - return Ok(()); - }; + let user_accumulated_fee = user_state_guard.total_fees_in_queue; if !self.verify_user_has_enough_balance(user_balance, user_accumulated_fee, msg_max_fee) { - std::mem::drop(batch_state_lock); send_message( ws_conn_sink.clone(), SubmitProofResponseMessage::InsufficientBalance(addr), @@ -803,22 +848,9 @@ impl Batcher { return Ok(()); } - let cached_user_nonce = batch_state_lock.get_user_nonce(&addr).await; - - let Some(expected_nonce) = cached_user_nonce else { - error!("Failed to get cached user nonce: User not found in user states, but it should have been already inserted"); - std::mem::drop(batch_state_lock); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, - ) - .await; - self.metrics.user_error(&["batcher_state_error", ""]); - return Ok(()); - }; + let expected_nonce = user_state_guard.nonce; if expected_nonce < msg_nonce { - std::mem::drop(batch_state_lock); warn!("Invalid nonce for address {addr}, expected nonce: {expected_nonce:?}, received nonce: {msg_nonce:?}"); send_message( ws_conn_sink.clone(), @@ -831,14 +863,17 @@ impl Batcher { // In this case, the message might be a replacement one. If it is valid, // we replace the old entry with the new from the replacement message. + // Notice this stops the normal flow of the handle_submit_proof. + // We pass the already-held user_state_guard to avoid double-locking + // This will take the batch lock internally if expected_nonce > msg_nonce { info!("Possible replacement message received: Expected nonce {expected_nonce:?} - message nonce: {msg_nonce:?}"); self.handle_replacement_message( - batch_state_lock, nonced_verification_data, ws_conn_sink.clone(), client_msg.signature, addr, + user_state_guard, ) .await; @@ -848,7 +883,6 @@ impl Batcher { // We check this after replacement logic because if user wants to replace a proof, their // new_max_fee must be greater or equal than old_max_fee if msg_max_fee > user_last_max_fee_limit { - std::mem::drop(batch_state_lock); warn!("Invalid max fee for address {addr}, had fee limit of {user_last_max_fee_limit:?}, sent {msg_max_fee:?}"); send_message( ws_conn_sink.clone(), @@ -859,56 +893,132 @@ impl Batcher { return Ok(()); } + // When pre-verification is enabled, batcher will verify proofs for faster feedback with clients + if self.pre_verification_is_enabled { + let verification_data = &nonced_verification_data.verification_data; + if self + .is_verifier_disabled(verification_data.proving_system) + .await + { + warn!( + "Verifier for proving system {} is disabled, skipping verification", + verification_data.proving_system + ); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::DisabledVerifier( + verification_data.proving_system, + )), + ) + .await; + self.metrics.user_error(&[ + "disabled_verifier", + &format!("{}", verification_data.proving_system), + ]); + return Ok(()); + } + + if !zk_utils::verify(verification_data).await { + error!("Invalid proof detected. Verification failed"); + send_message( + ws_conn_sink.clone(), + SubmitProofResponseMessage::InvalidProof(ProofInvalidReason::RejectedProof), + ) + .await; + self.metrics.user_error(&[ + "rejected_proof", + &format!("{}", verification_data.proving_system), + ]); + return Ok(()); + } + } + // * ---------------------------------------------------------------------* // * Perform validation over batcher queue * // * ---------------------------------------------------------------------* + let mut batch_state_lock = self.batch_state.lock().await; if batch_state_lock.is_queue_full() { debug!("Batch queue is full. Evaluating if the incoming proof can replace a lower-priority entry."); - // This cannot panic, if the batch queue is full it has at least one item - let (lowest_priority_entry, _) = batch_state_lock - .batch_queue - .peek() - .expect("Batch queue was expected to be full, but somehow no item was inside"); - - let lowest_fee_in_queue = lowest_priority_entry.nonced_verification_data.max_fee; - let new_proof_fee = nonced_verification_data.max_fee; + let mut evicted_entry = None; - // We will keep the proof with the highest fee - // Note: we previously checked that if it's a new proof from the same user the fee is the same or lower - // So this will never eject a proof of the same user with a lower nonce - // which is the expected behaviour - if new_proof_fee > lowest_fee_in_queue { - // This cannot panic, if the batch queue is full it has at least one item - let (removed_entry, _) = batch_state_lock - .batch_queue - .pop() - .expect("Batch queue was expected to be full, but somehow no item was inside"); + // Collect addresses of potential candidates (lightweight) + let eviction_candidates: Vec
= batch_state_lock + .batch_queue + .iter() + .filter_map(|(entry, _)| { + if new_proof_fee > entry.nonced_verification_data.max_fee { + Some(entry.sender) + } else { + None + } + }) + .collect(); + + // Try to find any candidate whose lock we can acquire and immediately process them + for candidate_addr in eviction_candidates { + if let Some(user_state_arc) = self.user_states.get(&candidate_addr) { + if let Ok(mut user_guard) = user_state_arc.try_lock() { + // Found someone whose lock we can get - now find and remove their entry + let entries_to_check: Vec<_> = batch_state_lock + .batch_queue + .iter() + .filter(|(entry, _)| { + entry.sender == candidate_addr + && new_proof_fee > entry.nonced_verification_data.max_fee + }) + .map(|(entry, _)| entry.clone()) + .collect(); + + if let Some(target_entry) = entries_to_check.into_iter().next() { + let removed_entry = batch_state_lock + .batch_queue + .remove(&target_entry) + .map(|(e, _)| e); + + if let Some(removed) = removed_entry { + info!( + "Incoming proof (nonce: {}, fee: {}) replacing proof from sender {} with nonce {} (fee: {})", + nonced_verification_data.nonce, + new_proof_fee, + removed.sender, + removed.nonced_verification_data.nonce, + removed.nonced_verification_data.max_fee + ); + + // Update the evicted user's state immediately + self.update_evicted_user_state_with_lock( + &removed, + &batch_state_lock.batch_queue, + &mut user_guard, + ); + + // Notify the evicted user + if let Some(ref removed_entry_ws) = removed.messaging_sink { + send_message( + removed_entry_ws.clone(), + SubmitProofResponseMessage::UnderpricedProof, + ) + .await; + } + + evicted_entry = Some(removed); + break; + } + } + } + } + } + // Check if we successfully evicted someone + if evicted_entry.is_none() { + // No lock could be acquired or no evictable entry found - reject this proof info!( - "Incoming proof (nonce: {}, fee: {}) has higher fee. Replacing lowest fee proof from sender {} with nonce {}.", + "Incoming proof (nonce: {}, fee: {}) rejected - queue is full and no evictable entries found.", nonced_verification_data.nonce, - nonced_verification_data.max_fee, - removed_entry.sender, - removed_entry.nonced_verification_data.nonce - ); - - batch_state_lock.update_user_state_on_entry_removal(&removed_entry); - - if let Some(removed_entry_ws) = removed_entry.messaging_sink { - send_message( - removed_entry_ws, - SubmitProofResponseMessage::UnderpricedProof, - ) - .await; - }; - } else { - info!( - "Incoming proof (nonce: {}, fee: {}) has lower priority than all entries in the full queue. Rejecting submission.", - nonced_verification_data.nonce, - nonced_verification_data.max_fee + new_proof_fee ); std::mem::drop(batch_state_lock); send_message( @@ -927,7 +1037,7 @@ impl Batcher { if let Err(e) = self .add_to_batch( batch_state_lock, - nonced_verification_data, + &nonced_verification_data, ws_conn_sink.clone(), signature, addr, @@ -940,6 +1050,14 @@ impl Batcher { return Ok(()); }; + // Update user state now that entry has been successfully added to batch + let max_fee = nonced_verification_data.max_fee; + let nonce = nonced_verification_data.nonce; + user_state_guard.nonce = nonce + U256::one(); + user_state_guard.last_max_fee_limit = max_fee; + user_state_guard.proofs_in_batch += 1; + user_state_guard.total_fees_in_queue += max_fee; + info!("Verification data message handled"); Ok(()) } @@ -969,16 +1087,18 @@ impl Batcher { /// Returns true if the message was replaced in the batch, false otherwise async fn handle_replacement_message( &self, - mut batch_state_lock: MutexGuard<'_, BatchState>, nonced_verification_data: NoncedVerificationData, ws_conn_sink: WsMessageSink, signature: Signature, addr: Address, + mut user_state_guard: tokio::sync::MutexGuard<'_, UserState>, ) { let replacement_max_fee = nonced_verification_data.max_fee; let nonce = nonced_verification_data.nonce; - let Some(entry) = batch_state_lock.get_entry(addr, nonce) else { - std::mem::drop(batch_state_lock); + let mut batch_state_guard = self.batch_state.lock().await; // Second: batch lock + let Some(entry) = batch_state_guard.get_entry(addr, nonce) else { + drop(batch_state_guard); + drop(user_state_guard); warn!("Invalid nonce for address {addr}. Queue entry with nonce {nonce} not found"); send_message( ws_conn_sink.clone(), @@ -991,7 +1111,8 @@ impl Batcher { let original_max_fee = entry.nonced_verification_data.max_fee; if original_max_fee > replacement_max_fee { - std::mem::drop(batch_state_lock); + drop(batch_state_guard); + drop(user_state_guard); warn!("Invalid replacement message for address {addr}, had max fee: {original_max_fee:?}, received fee: {replacement_max_fee:?}"); send_message( ws_conn_sink.clone(), @@ -1030,8 +1151,8 @@ impl Batcher { } replacement_entry.messaging_sink = Some(ws_conn_sink.clone()); - if !batch_state_lock.replacement_entry_is_valid(&replacement_entry) { - std::mem::drop(batch_state_lock); + if !batch_state_guard.replacement_entry_is_valid(&replacement_entry) { + std::mem::drop(batch_state_guard); warn!("Invalid replacement message"); send_message( ws_conn_sink.clone(), @@ -1052,45 +1173,18 @@ impl Batcher { // note that the entries are considered equal for the priority queue // if they have the same nonce and sender, so we can remove the old entry // by calling remove with the new entry - batch_state_lock.batch_queue.remove(&replacement_entry); - batch_state_lock.batch_queue.push( + batch_state_guard.batch_queue.remove(&replacement_entry); + batch_state_guard.batch_queue.push( replacement_entry.clone(), BatchQueueEntryPriority::new(replacement_max_fee, nonce), ); - // update max_fee_limit - let updated_max_fee_limit_in_batch = batch_state_lock.get_user_min_fee_in_batch(&addr); - if batch_state_lock - .update_user_max_fee_limit(&addr, updated_max_fee_limit_in_batch) - .is_none() - { - std::mem::drop(batch_state_lock); - warn!("User state for address {addr:?} was not present in batcher user states, but it should be"); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, - ) - .await; - return; - }; + // update max_fee_limit and total_fees_in_queue using already held user_state_guard + let updated_max_fee_limit_in_batch = batch_state_guard.get_user_min_fee_in_batch(&addr); + user_state_guard.last_max_fee_limit = updated_max_fee_limit_in_batch; - // update total_fees_in_queue - if batch_state_lock - .update_user_total_fees_in_queue_of_replacement_message( - &addr, - original_max_fee, - replacement_max_fee, - ) - .is_none() - { - std::mem::drop(batch_state_lock); - warn!("User state for address {addr:?} was not present in batcher user states, but it should be"); - send_message( - ws_conn_sink.clone(), - SubmitProofResponseMessage::AddToBatchError, - ) - .await; - }; + let fee_difference = replacement_max_fee - original_max_fee; + user_state_guard.total_fees_in_queue += fee_difference; } async fn disabled_verifiers(&self) -> Result> { @@ -1132,7 +1226,7 @@ impl Batcher { async fn add_to_batch( &self, mut batch_state_lock: MutexGuard<'_, BatchState>, - verification_data: NoncedVerificationData, + verification_data: &NoncedVerificationData, ws_conn_sink: WsMessageSink, proof_submitter_sig: Signature, proof_submitter_addr: Address, @@ -1145,7 +1239,7 @@ impl Batcher { let nonce = verification_data.nonce; batch_state_lock.batch_queue.push( BatchQueueEntry::new( - verification_data, + verification_data.clone(), verification_data_comm, ws_conn_sink, proof_submitter_sig, @@ -1162,45 +1256,7 @@ impl Batcher { info!("Current batch queue length: {}", queue_len); - let Some(user_proof_count) = batch_state_lock - .get_user_proof_count(&proof_submitter_addr) - .await - else { - error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present"); - std::mem::drop(batch_state_lock); - return Err(BatcherError::AddressNotFoundInUserStates( - proof_submitter_addr, - )); - }; - - let Some(current_total_fees_in_queue) = batch_state_lock - .get_user_total_fees_in_queue(&proof_submitter_addr) - .await - else { - error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present"); - std::mem::drop(batch_state_lock); - return Err(BatcherError::AddressNotFoundInUserStates( - proof_submitter_addr, - )); - }; - - // User state is updated - if batch_state_lock - .update_user_state( - &proof_submitter_addr, - nonce + U256::one(), - max_fee, - user_proof_count + 1, - current_total_fees_in_queue + max_fee, - ) - .is_none() - { - error!("User state of address {proof_submitter_addr} was not found when trying to update user state. This user state should have been present"); - std::mem::drop(batch_state_lock); - return Err(BatcherError::AddressNotFoundInUserStates( - proof_submitter_addr, - )); - }; + // User state will be updated by the caller who already has the lock Ok(()) } @@ -1219,8 +1275,8 @@ impl Batcher { /// an empty batch, even if the block interval has been reached. /// Once the batch meets the conditions for submission, the finalized batch is then passed to the /// `finalize_batch` function. - /// This function doesn't remove the proofs from the queue. - async fn is_batch_ready( + /// This function removes the proofs from the queue immediately to avoid race conditions. + async fn extract_batch_if_ready( &self, block_number: u64, gas_price: U256, @@ -1256,9 +1312,12 @@ impl Batcher { // Set the batch posting flag to true *batch_posting = true; - let batch_queue_copy = batch_state_lock.batch_queue.clone(); - let finalized_batch = batch_queue::try_build_batch( - batch_queue_copy, + + // PHASE 1: Extract the batch directly from the queue to avoid race conditions + let mut batch_state_lock = batch_state_lock; // Make mutable + + let finalized_batch = batch_queue::extract_batch_directly( + &mut batch_state_lock.batch_queue, gas_price, self.max_batch_byte_size, self.max_batch_proof_qty, @@ -1278,66 +1337,179 @@ impl Batcher { }) .ok()?; + info!( + "Extracted {} proofs from queue for batch processing", + finalized_batch.len() + ); + Some(finalized_batch) } - /// Takes the submitted proofs and removes them from the queue. - /// This function should be called only AFTER the submission was confirmed onchain - async fn remove_proofs_from_queue( + /// Updates user states based on current queue state after batch operations. + /// Used for both successful batch confirmation and failed batch restoration. + /// Updates proofs_in_batch, total_fees_in_queue, and last_max_fee_limit based on current queue state. + /// Uses proper lock ordering: user_state -> batch_state to avoid deadlocks. + async fn update_user_states_from_queue_state( &self, - finalized_batch: Vec, + affected_users: std::collections::HashSet
, ) -> Result<(), BatcherError> { - info!("Removing proofs from queue..."); + // Update each user's state with proper lock ordering + for addr in affected_users { + if let Some(user_state) = self.user_states.get(&addr) { + let mut user_state_guard = user_state.lock().await; // First: user lock + let batch_state_lock = self.batch_state.lock().await; // Second: batch lock + + // Calculate what each user's state should be based on current queue contents + let current_queue_user_states = + self.calculate_new_user_states_data(&batch_state_lock.batch_queue); + + if let Some((proof_count, min_max_fee_in_queue, total_fees_in_queue)) = + current_queue_user_states.get(&addr) + { + // User has proofs in queue - use calculated values + user_state_guard.proofs_in_batch = *proof_count; + user_state_guard.total_fees_in_queue = *total_fees_in_queue; + user_state_guard.last_max_fee_limit = *min_max_fee_in_queue; + } else { + // User not found in queue - reset to defaults + user_state_guard.proofs_in_batch = 0; + user_state_guard.total_fees_in_queue = U256::zero(); + user_state_guard.last_max_fee_limit = U256::MAX; + } + + drop(batch_state_lock); // Release batch lock + drop(user_state_guard); // Release user lock + } else { + warn!("User state not found for address {}", addr); + } + } + + Ok(()) + } + + /// Cleans up user states after successful batch submission. + /// Resets last_max_fee_limit to U256::MAX for users who had proofs in the submitted batch + /// but now have no proofs left in the queue. + fn cleanup_user_states_after_successful_submission(&self, finalized_batch: &[BatchQueueEntry]) { + use std::collections::HashSet; + + // Get unique users from the submitted batch + let users_in_batch: HashSet
= + finalized_batch.iter().map(|entry| entry.sender).collect(); + + // Check current queue state to see which users still have proofs + let batch_state_lock = match self.batch_state.try_lock() { + Ok(lock) => lock, + Err(_) => { + // If we can't get the lock, skip cleanup - it's not critical + warn!("Could not acquire batch state lock for user state cleanup"); + return; + } + }; + + let current_user_states = + self.calculate_new_user_states_data(&batch_state_lock.batch_queue); + + // For each user in the batch, check if they now have no proofs left + for user_addr in users_in_batch { + if !current_user_states.contains_key(&user_addr) { + // User has no proofs left in queue - reset their max_fee_limit + if let Some(user_state_ref) = self.user_states.get(&user_addr) { + if let Ok(mut user_state_guard) = user_state_ref.try_lock() { + user_state_guard.last_max_fee_limit = U256::max_value(); + } + // If we can't get the lock, skip this user - not critical + } + } + } + } + + /// Restores proofs to the queue after batch submission failure. + /// Uses similar logic to user proof submission, including handling queue capacity. + /// NOTE: Nonce ordering is preserved by the priority queue's eviction order: + /// - Lower fees get evicted first + /// - For same fees, higher nonces get evicted first + /// This ensures we never have nonce N+1 without nonce N in the queue. + async fn restore_proofs_after_batch_failure(&self, failed_batch: &[BatchQueueEntry]) { + info!( + "Restoring {} proofs to queue after batch failure", + failed_batch.len() + ); + let mut batch_state_lock = self.batch_state.lock().await; + let mut restored_entries = Vec::new(); + + for entry in failed_batch { + let priority = BatchQueueEntryPriority::new( + entry.nonced_verification_data.max_fee, + entry.nonced_verification_data.nonce, + ); - finalized_batch.iter().for_each(|entry| { - if batch_state_lock.batch_queue.remove(entry).is_none() { - // If this happens, we have a bug in our code - error!("Some proofs were not found in the queue. This should not happen."); + // Check if queue is full + if batch_state_lock.is_queue_full() { + // Use same logic as user submission - evict lowest priority if this one is higher + if let Some((lowest_entry, _)) = batch_state_lock.batch_queue.peek() { + let lowest_fee = lowest_entry.nonced_verification_data.max_fee; + let restore_fee = entry.nonced_verification_data.max_fee; + + if restore_fee > lowest_fee { + // Evict the lowest priority entry (preserves nonce ordering) + if let Some((evicted_entry, _)) = batch_state_lock.batch_queue.pop() { + warn!("Queue full during restoration, evicting proof from sender {} with nonce {} (fee: {})", + evicted_entry.sender, evicted_entry.nonced_verification_data.nonce, evicted_entry.nonced_verification_data.max_fee); + + // Update user state for evicted entry + self.update_evicted_user_state_async( + &evicted_entry, + &batch_state_lock.batch_queue, + ) + .await; + + // Notify the evicted user via websocket + if let Some(evicted_ws_sink) = evicted_entry.messaging_sink { + connection::send_message( + evicted_ws_sink, + aligned_sdk::common::types::SubmitProofResponseMessage::UnderpricedProof, + ) + .await; + } + } + } else { + warn!("Queue full and restored proof has lower priority, dropping proof from sender {} with nonce {} (fee: {})", + entry.sender, entry.nonced_verification_data.nonce, entry.nonced_verification_data.max_fee); + continue; + } + } } - }); - - // now we calculate the new user_states - let new_user_states = // proofs, max_fee_limit, total_fees_in_queue - batch_state_lock.calculate_new_user_states_data(); - - let user_addresses: Vec
= batch_state_lock.user_states.keys().cloned().collect(); - let default_value = (0, U256::MAX, U256::zero()); - for addr in user_addresses.iter() { - let (proof_count, max_fee_limit, total_fees_in_queue) = - new_user_states.get(addr).unwrap_or(&default_value); - - // FIXME: The case where a the update functions return `None` can only happen when the user was not found - // in the `user_states` map should not really happen here, but doing this check so that we don't unwrap. - // Once https://github.com/yetanotherco/aligned_layer/issues/1046 is done we could return a more - // informative error. - - // Now we update the user states related to the batch (proof count in batch and min fee in batch) - batch_state_lock - .update_user_proof_count(addr, *proof_count) - .ok_or(BatcherError::QueueRemoveError( - "Could not update_user_proof_count".into(), - ))?; - batch_state_lock - .update_user_max_fee_limit(addr, *max_fee_limit) - .ok_or(BatcherError::QueueRemoveError( - "Could not update_user_max_fee_limit".into(), - ))?; - batch_state_lock - .update_user_total_fees_in_queue(addr, *total_fees_in_queue) - .ok_or(BatcherError::QueueRemoveError( - "Could not update_user_total_fees_in_queue".into(), - ))?; + + // Add the proof back to the queue + batch_state_lock.batch_queue.push(entry.clone(), priority); + restored_entries.push(entry); } - // Update metrics - let queue_len = batch_state_lock.batch_queue.len(); - let queue_size_bytes = calculate_batch_size(&batch_state_lock.batch_queue)?; + info!( + "Restored {} proofs to queue, new queue length: {}", + restored_entries.len(), + batch_state_lock.batch_queue.len() + ); - self.metrics - .update_queue_metrics(queue_len as i64, queue_size_bytes as i64); + // Get unique users from restored entries + let users_with_restored_proofs: std::collections::HashSet
= + restored_entries.iter().map(|entry| entry.sender).collect(); - Ok(()) + drop(batch_state_lock); // Release batch lock before user state updates + + // Update user states for successfully restored proofs + info!("Updating user states after proof restoration..."); + if let Err(e) = self + .update_user_states_from_queue_state(users_with_restored_proofs) + .await + { + error!( + "Failed to update user states after proof restoration: {:?}", + e + ); + } } /// Takes the finalized batch as input and: @@ -1350,13 +1522,12 @@ impl Batcher { async fn finalize_batch( &self, block_number: u64, - finalized_batch: Vec, + finalized_batch: &[BatchQueueEntry], gas_price: U256, ) -> Result<(), BatcherError> { let nonced_batch_verifcation_data: Vec = finalized_batch - .clone() - .into_iter() - .map(|entry| entry.nonced_verification_data) + .iter() + .map(|entry| entry.nonced_verification_data.clone()) .collect(); let batch_verification_data: Vec = nonced_batch_verifcation_data @@ -1369,9 +1540,8 @@ impl Batcher { info!("Finalizing batch. Length: {}", finalized_batch.len()); let batch_data_comm: Vec = finalized_batch - .clone() - .into_iter() - .map(|entry| entry.verification_data_commitment) + .iter() + .map(|entry| entry.verification_data_commitment.clone()) .collect(); let batch_merkle_tree: MerkleTree = @@ -1410,7 +1580,7 @@ impl Batcher { &batch_bytes, &batch_merkle_tree.root, leaves, - &finalized_batch, + finalized_batch, gas_price, ) .await @@ -1429,8 +1599,8 @@ impl Batcher { BatcherError::TransactionSendError( TransactionSendError::SubmissionInsufficientBalance, ) => { - // TODO calling remove_proofs_from_queue here is a better solution, flushing only the failed batch - // this would also need a message sent to the clients + // TODO: In the future, we should re-add the failed batch back to the queue + // For now, we flush everything as a safety measure self.flush_queue_and_clear_nonce_cache().await; } _ => { @@ -1441,12 +1611,25 @@ impl Batcher { return Err(e); }; - // Once the submit is succesfull, we remove the submitted proofs from the queue - // TODO handle error case: - if let Err(e) = self.remove_proofs_from_queue(finalized_batch.clone()).await { - error!("Unexpected error while updating queue: {:?}", e); + // Note: Proofs were already removed from the queue during extraction phase + // Now update user states based on current queue state after successful submission + info!("Updating user states after batch confirmation..."); + let users_in_batch: std::collections::HashSet
= + finalized_batch.iter().map(|entry| entry.sender).collect(); + if let Err(e) = self + .update_user_states_from_queue_state(users_in_batch) + .await + { + error!( + "Failed to update user states after batch confirmation: {:?}", + e + ); + // Continue with the rest of the process since batch was already submitted successfully } + // Clean up user states for users who had proofs in this batch but now have no proofs left + self.cleanup_user_states_after_successful_submission(finalized_batch); + connection::send_batch_inclusion_data_responses(finalized_batch, &batch_merkle_tree).await } @@ -1463,7 +1646,7 @@ impl Batcher { let Some(nonpaying_replacement_addr) = self.get_nonpaying_replacement_addr() else { batch_state_lock.batch_queue.clear(); - batch_state_lock.user_states.clear(); + self.user_states.clear(); return; }; @@ -1475,15 +1658,16 @@ impl Batcher { .await else { batch_state_lock.batch_queue.clear(); - batch_state_lock.user_states.clear(); + self.user_states.clear(); return; }; batch_state_lock.batch_queue.clear(); - batch_state_lock.user_states.clear(); + self.user_states.clear(); let nonpaying_user_state = UserState::new(nonpaying_replacement_addr_nonce); - batch_state_lock - .user_states - .insert(nonpaying_replacement_addr, nonpaying_user_state); + self.user_states.insert( + nonpaying_replacement_addr, + Arc::new(Mutex::new(nonpaying_user_state)), + ); self.metrics.update_queue_metrics(0, 0); } @@ -1514,16 +1698,29 @@ impl Batcher { let modified_gas_price = gas_price * U256::from(GAS_PRICE_PERCENTAGE_MULTIPLIER) / U256::from(PERCENTAGE_DIVIDER); - if let Some(finalized_batch) = self.is_batch_ready(block_number, modified_gas_price).await { + // TODO (Mauro): Take all the user locks here + if let Some(finalized_batch) = self + .extract_batch_if_ready(block_number, modified_gas_price) + .await + { let batch_finalization_result = self - .finalize_batch(block_number, finalized_batch, modified_gas_price) + .finalize_batch(block_number, &finalized_batch, modified_gas_price) .await; // Resetting this here to avoid doing it on every return path of `finalize_batch` function let mut batch_posting = self.posting_batch.lock().await; *batch_posting = false; - batch_finalization_result?; + // If batch finalization failed, restore the proofs to the queue + if let Err(e) = batch_finalization_result { + error!( + "Batch finalization failed, restoring proofs to queue: {:?}", + e + ); + self.restore_proofs_after_batch_failure(&finalized_batch) + .await; + return Err(e); + } } Ok(()) diff --git a/crates/batcher/src/types/batch_queue.rs b/crates/batcher/src/types/batch_queue.rs index 74fea2615..f8de63c7e 100644 --- a/crates/batcher/src/types/batch_queue.rs +++ b/crates/batcher/src/types/batch_queue.rs @@ -146,62 +146,73 @@ pub(crate) fn calculate_batch_size(batch_queue: &BatchQueue) -> Result Result, BatcherError> { - let mut finalized_batch = batch_queue; - let mut batch_size = calculate_batch_size(&finalized_batch)?; - - while let Some((entry, _)) = finalized_batch.peek() { - let batch_len = finalized_batch.len(); - let fee_per_proof = calculate_fee_per_proof(batch_len, gas_price, constant_gas_cost); + let mut batch_size = calculate_batch_size(batch_queue)?; + let mut rejected_entries = Vec::new(); + + // Remove entries that won't pay enough, or that makes a queue that is too big + loop { + let should_remove = if let Some((entry, _)) = batch_queue.peek() { + let batch_len = batch_queue.len(); + let fee_per_proof = calculate_fee_per_proof(batch_len, gas_price, constant_gas_cost); + + // if batch is not acceptable: + batch_size > max_batch_byte_size + || fee_per_proof > entry.nonced_verification_data.max_fee + || batch_len > max_batch_proof_qty + } else { + false + }; - // if batch is not acceptable: - if batch_size > max_batch_byte_size - || fee_per_proof > entry.nonced_verification_data.max_fee - || batch_len > max_batch_proof_qty - { - // Update the state for the next iteration: - // * Subtract this entry size to the size of the batch size. - // * Push the current entry to the resulting batch queue. + if should_remove { + // Remove this entry (it won't pay enough) and save it + let (rejected_entry, rejected_priority) = batch_queue.pop().unwrap(); - // It is safe to call `.unwrap()` here since any serialization error should have been caught - // when calculating the total size of the batch with the `calculate_batch_size` function + // Update batch size let verification_data_size = - cbor_serialize(&entry.nonced_verification_data.verification_data) + cbor_serialize(&rejected_entry.nonced_verification_data.verification_data) .unwrap() .len(); batch_size -= verification_data_size; - finalized_batch.pop(); + rejected_entries.push((rejected_entry, rejected_priority)); + } else { + // At this point, we found a viable batch - break + break; + } + } - continue; + // Check if we have a viable batch + if batch_queue.is_empty() { + // No viable batch found - put back the rejected entries + for (entry, priority) in rejected_entries { + batch_queue.push(entry, priority); } + return Err(BatcherError::BatchCostTooHigh); + } - // At this point, we break since we found a batch that can be submitted - break; + // Extract remaining entries in sorted order + // Since pop() gives highest priority first, we collect them directly + let mut batch_for_posting = Vec::new(); + while let Some((entry, _)) = batch_queue.pop() { + batch_for_posting.push(entry); } - // If `finalized_batch` is empty, this means that all the batch queue was traversed and we didn't find - // any user willing to pay fot the fee per proof. - if finalized_batch.is_empty() { - return Err(BatcherError::BatchCostTooHigh); + // Put back the rejected entries (they stay in the queue for later) + for (entry, priority) in rejected_entries { + batch_queue.push(entry, priority); } - Ok(finalized_batch.clone().into_sorted_vec()) + Ok(batch_for_posting) } fn calculate_fee_per_proof(batch_len: usize, gas_price: U256, constant_gas_cost: u128) -> U256 { @@ -311,8 +322,9 @@ mod test { batch_queue.push(entry_3, batch_priority_3); let gas_price = U256::from(1); - let finalized_batch = try_build_batch( - batch_queue.clone(), + let mut test_queue = batch_queue.clone(); + let finalized_batch = extract_batch_directly( + &mut test_queue, gas_price, 5000000, 50, @@ -423,8 +435,9 @@ mod test { batch_queue.push(entry_3, batch_priority_3); let gas_price = U256::from(1); - let finalized_batch = try_build_batch( - batch_queue.clone(), + let mut test_queue = batch_queue.clone(); + let finalized_batch = extract_batch_directly( + &mut test_queue, gas_price, 5000000, 50, @@ -533,8 +546,9 @@ mod test { batch_queue.push(entry_3.clone(), batch_priority_3.clone()); let gas_price = U256::from(1); - let finalized_batch = try_build_batch( - batch_queue.clone(), + let mut test_queue = batch_queue.clone(); + let finalized_batch = extract_batch_directly( + &mut test_queue, gas_price, 5000000, 2, @@ -643,8 +657,9 @@ mod test { batch_queue.push(entry_3, batch_priority_3); let gas_price = U256::from(1); - let finalized_batch = try_build_batch( - batch_queue.clone(), + let mut test_queue = batch_queue.clone(); + let finalized_batch = extract_batch_directly( + &mut test_queue, gas_price, 5000000, 50, @@ -759,8 +774,9 @@ mod test { batch_queue.push(entry_3, batch_priority_3); let gas_price = U256::from(1); - let finalized_batch = try_build_batch( - batch_queue.clone(), + let mut test_queue = batch_queue.clone(); + let finalized_batch = extract_batch_directly( + &mut test_queue, gas_price, 5000000, 50, @@ -781,6 +797,77 @@ mod test { ); } + #[test] + fn batch_finalization_algorithm_works_single_high_fee_proof() { + // Test the scenario: 1 proof with high fee that should be viable + let proof_generator_addr = Address::random(); + let payment_service_addr = Address::random(); + let sender_addr = Address::random(); + let bytes_for_verification_data = vec![42_u8; 10]; + let dummy_signature = Signature { + r: U256::from(1), + s: U256::from(2), + v: 3, + }; + let verification_data = VerificationData { + proving_system: ProvingSystemId::Risc0, + proof: bytes_for_verification_data.clone(), + pub_input: Some(bytes_for_verification_data.clone()), + verification_key: Some(bytes_for_verification_data.clone()), + vm_program_code: Some(bytes_for_verification_data), + proof_generator_addr, + }; + let chain_id = U256::from(42); + + // Single entry with very high fee - should definitely be viable + let nonce = U256::from(1); + let high_max_fee = U256::from(1_000_000_000_000_000_000u128); // Very high fee - 1 ETH + let nonced_verification_data = NoncedVerificationData::new( + verification_data, + nonce, + high_max_fee, + chain_id, + payment_service_addr, + ); + let vd_commitment: VerificationDataCommitment = nonced_verification_data.clone().into(); + let entry = BatchQueueEntry::new_for_testing( + nonced_verification_data, + vd_commitment, + dummy_signature, + sender_addr, + ); + let batch_priority = BatchQueueEntryPriority::new(high_max_fee, nonce); + + let mut batch_queue = BatchQueue::new(); + batch_queue.push(entry, batch_priority); + + let gas_price = U256::from(10_000_000_000u64); // 10 gwei gas price + let mut test_queue = batch_queue.clone(); + let finalized_batch = extract_batch_directly( + &mut test_queue, + gas_price, + 5000000, // Large byte size limit + 50, // Large proof quantity limit + DEFAULT_CONSTANT_GAS_COST, + ); + + // This should succeed and return the single proof + assert!( + finalized_batch.is_ok(), + "Should successfully extract batch with single high-fee proof" + ); + let batch = finalized_batch.unwrap(); + assert_eq!(batch.len(), 1, "Batch should contain exactly 1 proof"); + assert_eq!(batch[0].nonced_verification_data.max_fee, high_max_fee); + + // The queue should now be empty (no rejected entries to put back) + assert_eq!( + test_queue.len(), + 0, + "Queue should be empty after extracting the single viable proof" + ); + } + #[test] fn batch_finalization_algorithm_works_not_bigger_than_max_batch_proof_qty() { // The following information will be the same for each entry, it is just some dummy data to see @@ -875,8 +962,9 @@ mod test { // The max batch len is 2, so the algorithm should stop at the second entry. let max_batch_proof_qty = 2; - let finalized_batch = try_build_batch( - batch_queue.clone(), + let mut test_queue = batch_queue.clone(); + let finalized_batch = extract_batch_directly( + &mut test_queue, gas_price, 5000000, max_batch_proof_qty, diff --git a/crates/batcher/src/types/batch_state.rs b/crates/batcher/src/types/batch_state.rs index 481ca44f7..153019519 100644 --- a/crates/batcher/src/types/batch_state.rs +++ b/crates/batcher/src/types/batch_state.rs @@ -1,15 +1,9 @@ -use std::collections::{hash_map::Entry, HashMap}; - -use super::{ - batch_queue::{BatchQueue, BatchQueueEntry}, - user_state::UserState, -}; +use super::batch_queue::{BatchQueue, BatchQueueEntry}; use ethers::types::{Address, U256}; use log::debug; pub(crate) struct BatchState { pub(crate) batch_queue: BatchQueue, - pub(crate) user_states: HashMap, pub(crate) max_size: usize, } @@ -19,18 +13,6 @@ impl BatchState { pub(crate) fn new(max_size: usize) -> Self { Self { batch_queue: BatchQueue::new(), - user_states: HashMap::new(), - max_size, - } - } - - pub(crate) fn new_with_user_states( - user_states: HashMap, - max_size: usize, - ) -> Self { - Self { - batch_queue: BatchQueue::new(), - user_states, max_size, } } @@ -44,30 +26,6 @@ impl BatchState { .find(|entry| entry.sender == sender && entry.nonced_verification_data.nonce == nonce) } - pub(crate) fn get_user_state(&self, addr: &Address) -> Option<&UserState> { - self.user_states.get(addr) - } - - pub(crate) async fn get_user_nonce(&self, addr: &Address) -> Option { - let user_state = self.get_user_state(addr)?; - Some(user_state.nonce) - } - - pub(crate) async fn get_user_last_max_fee_limit(&self, addr: &Address) -> Option { - let user_state = self.get_user_state(addr)?; - Some(user_state.last_max_fee_limit) - } - - pub(crate) async fn get_user_total_fees_in_queue(&self, addr: &Address) -> Option { - let user_state = self.get_user_state(addr)?; - Some(user_state.total_fees_in_queue) - } - - pub(crate) async fn get_user_proof_count(&self, addr: &Address) -> Option { - let user_state = self.get_user_state(addr)?; - Some(user_state.proofs_in_batch) - } - pub(crate) fn get_user_min_fee_in_batch(&self, addr: &Address) -> U256 { self.batch_queue .iter() @@ -77,127 +35,8 @@ impl BatchState { .unwrap_or(U256::max_value()) } - // SETTERS: - - pub(crate) fn update_user_max_fee_limit( - &mut self, - addr: &Address, - new_max_fee_limit: U256, - ) -> Option { - // TODO refactor to return Result, or something less redundant - if let Entry::Occupied(mut user_state) = self.user_states.entry(*addr) { - user_state.get_mut().last_max_fee_limit = new_max_fee_limit; - return Some(new_max_fee_limit); - } - None - } - - pub(crate) fn update_user_proof_count( - &mut self, - addr: &Address, - new_proof_count: usize, - ) -> Option { - // TODO refactor to return Result, or something less redundant - if let Entry::Occupied(mut user_state) = self.user_states.entry(*addr) { - user_state.get_mut().proofs_in_batch = new_proof_count; - return Some(new_proof_count); - } - None - } - - pub(crate) fn update_user_nonce(&mut self, addr: &Address, new_nonce: U256) -> Option { - // TODO refactor to return Result, or something less redundant - if let Entry::Occupied(mut user_state) = self.user_states.entry(*addr) { - user_state.get_mut().nonce = new_nonce; - return Some(new_nonce); - } - None - } - - pub(crate) fn update_user_total_fees_in_queue( - &mut self, - addr: &Address, - new_total_fees_in_queue: U256, - ) -> Option { - // TODO refactor to return Result, or something less redundant - if let Entry::Occupied(mut user_state) = self.user_states.entry(*addr) { - user_state.get_mut().total_fees_in_queue = new_total_fees_in_queue; - return Some(new_total_fees_in_queue); - } - None - } - - pub(crate) fn update_user_total_fees_in_queue_of_replacement_message( - &mut self, - addr: &Address, - original_max_fee: U256, - new_max_fee: U256, - ) -> Option { - // TODO refactor to return Result, or something less redundant - let fee_difference = new_max_fee - original_max_fee; //here we already know new_max_fee > original_max_fee - if let Entry::Occupied(mut user_state) = self.user_states.entry(*addr) { - user_state.get_mut().total_fees_in_queue += fee_difference; - return Some(user_state.get().total_fees_in_queue); - } - None - } - - /// Updates the user with address `addr` with the provided values of - /// `new_nonce`, `new_max_fee_limit`, `new_proof_count` and `new_total_fees_in_queue` - /// If state is updated successfully, returns the updated values inside a `Some()` - /// If the address was not found in the user states, returns `None` - pub(crate) fn update_user_state( - &mut self, - addr: &Address, - new_nonce: U256, - new_max_fee_limit: U256, - new_proof_count: usize, - new_total_fees_in_queue: U256, - ) -> Option<(U256, U256, usize, U256)> { - // TODO refactor to return Result, or something less redundant - let updated_nonce = self.update_user_nonce(addr, new_nonce); - let updated_max_fee_limit = self.update_user_max_fee_limit(addr, new_max_fee_limit); - let updated_proof_count = self.update_user_proof_count(addr, new_proof_count); - let updated_total_fees_in_queue = - self.update_user_total_fees_in_queue(addr, new_total_fees_in_queue); - - if updated_nonce.is_some() - && updated_max_fee_limit.is_some() - && updated_proof_count.is_some() - && updated_total_fees_in_queue.is_some() - { - return Some(( - new_nonce, - new_max_fee_limit, - new_proof_count, - new_total_fees_in_queue, - )); - } - None - } - // LOGIC: - pub(crate) fn calculate_new_user_states_data(&self) -> HashMap { - let mut updated_user_states = HashMap::new(); // address -> (proof_count, max_fee_limit, total_fees_in_queue) - for (entry, _) in self.batch_queue.iter() { - let addr = entry.sender; - let max_fee = entry.nonced_verification_data.max_fee; - - let (proof_count, max_fee_limit, total_fees_in_queue) = updated_user_states - .entry(addr) - .or_insert((0, max_fee, U256::zero())); - - *proof_count += 1; - *total_fees_in_queue += max_fee; - if max_fee < *max_fee_limit { - *max_fee_limit = max_fee; - } - } - - updated_user_states - } - /// Checks if the entry is valid /// An entry is valid if there is no entry with the same sender, lower nonce and a lower fee pub(crate) fn replacement_entry_is_valid( @@ -221,37 +60,6 @@ impl BatchState { }) } - /// Updates or removes a user's state when their latest proof entry is removed from the batch queue. - /// - /// If the user has no other proofs remaining in the queue, their state is removed entirely. - /// Otherwise, the user's state is updated to reflect the next most recent entry in the queue. - /// - /// Note: The given `removed_entry` must be the most recent (latest or highest nonce) entry for the user in the queue. - pub(crate) fn update_user_state_on_entry_removal(&mut self, removed_entry: &BatchQueueEntry) { - let addr = removed_entry.sender; - - let new_last_max_fee_limit = match self - .batch_queue - .iter() - .filter(|(e, _)| e.sender == addr) - .next_back() - { - Some((last_entry, _)) => last_entry.nonced_verification_data.max_fee, - None => { - self.user_states.remove(&addr); - return; - } - }; - - if let Entry::Occupied(mut user_state) = self.user_states.entry(addr) { - user_state.get_mut().proofs_in_batch -= 1; - user_state.get_mut().nonce -= U256::one(); - user_state.get_mut().total_fees_in_queue -= - removed_entry.nonced_verification_data.max_fee; - user_state.get_mut().last_max_fee_limit = new_last_max_fee_limit; - } - } - pub(crate) fn is_queue_full(&self) -> bool { self.batch_queue.len() >= self.max_size } diff --git a/crates/sdk/src/common/constants.rs b/crates/sdk/src/common/constants.rs index e27293698..08c1c40a7 100644 --- a/crates/sdk/src/common/constants.rs +++ b/crates/sdk/src/common/constants.rs @@ -61,6 +61,8 @@ pub const BATCHER_PAYMENT_SERVICE_ADDRESS_MAINNET: &str = "0xb0567184A52cB40956df6333510d6eF35B89C8de"; pub const BATCHER_PAYMENT_SERVICE_ADDRESS_MAINNET_STAGE: &str = "0x88ad27EfBeF16b6fC5b2E40c5155d61876f847c5"; +pub const BATCHER_PAYMENT_SERVICE_ADDRESS_SEPOLIA: &str = + "0x403dE630751e148bD71BFFcE762E5667C0825399"; /// AlignedServiceManager pub const ALIGNED_SERVICE_MANAGER_DEVNET: &str = "0x1613beB3B2C4f22Ee086B2b38C1476A3cE7f78E8"; @@ -70,6 +72,7 @@ pub const ALIGNED_SERVICE_MANAGER_HOLESKY_STAGE: &str = pub const ALIGNED_SERVICE_MANAGER_MAINNET: &str = "0xeF2A435e5EE44B2041100EF8cbC8ae035166606c"; pub const ALIGNED_SERVICE_MANAGER_MAINNET_STAGE: &str = "0x96b6a29D7B98519Ae66E6398BD27A76B30a5dC3f"; +pub const ALIGNED_SERVICE_MANAGER_SEPOLIA: &str = "0xFf731AB7b3653dc66878DC77E851D174f472d137"; // AlignedProofAggregationService pub const ALIGNED_PROOF_AGG_SERVICE_ADDRESS_MAINNET: &str = "0x0"; @@ -80,6 +83,8 @@ pub const ALIGNED_PROOF_AGG_SERVICE_ADDRESS_HOLESKY: &str = "0xe84CD4084d8131841CE6DC265361f81F4C59a1d4"; pub const ALIGNED_PROOF_AGG_SERVICE_ADDRESS_DEVNET: &str = "0xFD471836031dc5108809D173A067e8486B9047A3"; +pub const ALIGNED_PROOF_AGG_SERVICE_ADDRESS_SEPOLIA: &str = + "0xb5D46304c30B1AeB3a8Da6ab599c336f7946C8A4"; /// Batcher URL's pub const BATCHER_URL_DEVNET: &str = "ws://localhost:8080"; @@ -87,3 +92,4 @@ pub const BATCHER_URL_HOLESKY: &str = "wss://batcher.alignedlayer.com"; pub const BATCHER_URL_HOLESKY_STAGE: &str = "wss://stage.batcher.alignedlayer.com"; pub const BATCHER_URL_MAINNET: &str = "wss://mainnet.batcher.alignedlayer.com"; pub const BATCHER_URL_MAINNET_STAGE: &str = "wss://mainnetstage.batcher.alignedlayer.com"; +pub const BATCHER_URL_SEPOLIA: &str = "wss://sepolia.batcher.alignedlayer.com"; diff --git a/crates/sdk/src/common/types.rs b/crates/sdk/src/common/types.rs index f9850910f..fd7fbfafb 100644 --- a/crates/sdk/src/common/types.rs +++ b/crates/sdk/src/common/types.rs @@ -19,12 +19,14 @@ use serde::{Deserialize, Serialize}; use sha3::{Digest, Keccak256}; use super::constants::{ - ALIGNED_PROOF_AGG_SERVICE_ADDRESS_DEVNET, ALIGNED_PROOF_AGG_SERVICE_ADDRESS_HOLESKY, + ALIGNED_PROOF_AGG_SERVICE_ADDRESS_DEVNET, ALIGNED_PROOF_AGG_SERVICE_ADDRESS_SEPOLIA, + ALIGNED_PROOF_AGG_SERVICE_ADDRESS_HOLESKY, ALIGNED_PROOF_AGG_SERVICE_ADDRESS_HOLESKY_STAGE, ALIGNED_PROOF_AGG_SERVICE_ADDRESS_MAINNET, ALIGNED_PROOF_AGG_SERVICE_ADDRESS_MAINNET_STAGE, ALIGNED_SERVICE_MANAGER_DEVNET, ALIGNED_SERVICE_MANAGER_HOLESKY, ALIGNED_SERVICE_MANAGER_HOLESKY_STAGE, ALIGNED_SERVICE_MANAGER_MAINNET, ALIGNED_SERVICE_MANAGER_MAINNET_STAGE, - BATCHER_PAYMENT_SERVICE_ADDRESS_DEVNET, BATCHER_PAYMENT_SERVICE_ADDRESS_HOLESKY, + ALIGNED_SERVICE_MANAGER_SEPOLIA, BATCHER_PAYMENT_SERVICE_ADDRESS_SEPOLIA, + BATCHER_URL_SEPOLIA, BATCHER_PAYMENT_SERVICE_ADDRESS_DEVNET, BATCHER_PAYMENT_SERVICE_ADDRESS_HOLESKY, BATCHER_PAYMENT_SERVICE_ADDRESS_HOLESKY_STAGE, BATCHER_PAYMENT_SERVICE_ADDRESS_MAINNET, BATCHER_PAYMENT_SERVICE_ADDRESS_MAINNET_STAGE, BATCHER_URL_DEVNET, BATCHER_URL_HOLESKY, BATCHER_URL_HOLESKY_STAGE, BATCHER_URL_MAINNET, BATCHER_URL_MAINNET_STAGE, @@ -417,6 +419,7 @@ pub enum Network { HoleskyStage, Mainnet, MainnetStage, + Sepolia, Custom(String, String, String), } @@ -428,6 +431,7 @@ impl Network { Self::HoleskyStage => H160::from_str(ALIGNED_SERVICE_MANAGER_HOLESKY_STAGE).unwrap(), Self::Mainnet => H160::from_str(ALIGNED_SERVICE_MANAGER_MAINNET).unwrap(), Self::MainnetStage => H160::from_str(ALIGNED_SERVICE_MANAGER_MAINNET_STAGE).unwrap(), + Self::Sepolia => H160::from_str(ALIGNED_SERVICE_MANAGER_SEPOLIA).unwrap(), Self::Custom(s, _, _) => H160::from_str(s.as_str()).unwrap(), } } @@ -443,6 +447,7 @@ impl Network { Self::MainnetStage => { H160::from_str(BATCHER_PAYMENT_SERVICE_ADDRESS_MAINNET_STAGE).unwrap() } + Self::Sepolia => H160::from_str(BATCHER_PAYMENT_SERVICE_ADDRESS_SEPOLIA).unwrap(), Self::Custom(_, s, _) => H160::from_str(s.as_str()).unwrap(), } } @@ -458,6 +463,7 @@ impl Network { Self::MainnetStage => { H160::from_str(ALIGNED_PROOF_AGG_SERVICE_ADDRESS_MAINNET_STAGE).unwrap() } + Self::Sepolia => H160::from_str(ALIGNED_PROOF_AGG_SERVICE_ADDRESS_SEPOLIA).unwrap(), Self::Custom(_, s, _) => H160::from_str(s.as_str()).unwrap(), } } @@ -469,6 +475,7 @@ impl Network { Self::HoleskyStage => BATCHER_URL_HOLESKY_STAGE, Self::Mainnet => BATCHER_URL_MAINNET, Self::MainnetStage => BATCHER_URL_MAINNET_STAGE, + Self::Sepolia => BATCHER_URL_SEPOLIA, Self::Custom(_, _, s) => s.as_str(), } } diff --git a/crates/task-sender/src/commands.rs b/crates/task-sender/src/commands.rs index 4b44ba57f..d46ee81e6 100644 --- a/crates/task-sender/src/commands.rs +++ b/crates/task-sender/src/commands.rs @@ -5,7 +5,7 @@ use aligned_sdk::verification_layer::{ use ethers::prelude::*; use ethers::utils::parse_ether; use k256::ecdsa::SigningKey; -use log::{debug, error, info}; +use log::{debug, error, info, warn}; use rand::seq::SliceRandom; use rand::thread_rng; use std::fs::{self, File}; @@ -19,8 +19,8 @@ use tokio::join; use tokio_tungstenite::connect_async; use crate::structs::{ - GenerateAndFundWalletsArgs, GenerateProofsArgs, ProofType, SendInfiniteProofsArgs, - TestConnectionsArgs, + GenerateAndFundWalletsArgs, GenerateProofsArgs, InfiniteProofType, ProofType, + SendInfiniteProofsArgs, TestConnectionsArgs, }; const GROTH_16_PROOF_GENERATOR_FILE_PATH: &str = @@ -149,70 +149,273 @@ pub async fn generate_and_fund_wallets(args: GenerateAndFundWalletsArgs) { .expect("Invalid private key") .with_chain_id(chain_id.as_u64()); - for i in 0..args.number_of_wallets { - // this is necessary because of the move - let eth_rpc_provider = eth_rpc_provider.clone(); - let funding_wallet = funding_wallet.clone(); - let amount_to_deposit = args.amount_to_deposit.clone(); - let amount_to_deposit_aligned = args.amount_to_deposit_to_aligned.clone(); + // Generate all wallets first + let mut wallets = Vec::new(); + let mut wallet_private_keys = Vec::new(); - // Generate new wallet + info!("Generating {} wallets...", args.number_of_wallets); + for i in 0..args.number_of_wallets { let wallet = Wallet::new(&mut thread_rng()).with_chain_id(chain_id.as_u64()); info!("Generated wallet {} with address {:?}", i, wallet.address()); - // Fund the wallet - let signer = SignerMiddleware::new(eth_rpc_provider.clone(), funding_wallet.clone()); - let amount_to_deposit = - parse_ether(&amount_to_deposit).expect("Ether format should be: XX.XX"); - info!("Depositing {}wei to wallet {}", amount_to_deposit, i); - let tx = TransactionRequest::new() - .from(funding_wallet.address()) - .to(wallet.address()) - .value(amount_to_deposit); - - let pending_transaction = match signer.send_transaction(tx, None).await { - Ok(tx) => tx, + let signer_bytes = wallet.signer().to_bytes(); + let secret_key_hex = ethers::utils::hex::encode(signer_bytes); + wallet_private_keys.push(secret_key_hex); + wallets.push(wallet); + } + + // Get base nonce for funding wallet to avoid nonce conflicts + let mut current_nonce = match eth_rpc_provider + .get_transaction_count( + funding_wallet.address(), + Some(ethers::types::BlockNumber::Pending.into()), + ) + .await + { + Ok(nonce) => nonce, + Err(err) => { + error!("Could not get base nonce for funding wallet: {}", err); + return; + } + }; + + let batch_size = 25; + let amount_to_deposit = + parse_ether(&args.amount_to_deposit).expect("Ether format should be: XX.XX"); + let amount_to_deposit_to_aligned = + parse_ether(&args.amount_to_deposit_to_aligned).expect("Ether format should be: XX.XX"); + + let mut total_successful = 0; + let total_batches = args.number_of_wallets.div_ceil(batch_size); + + // Process wallets in batches + for (batch_idx, wallet_chunk) in wallets.chunks(batch_size).enumerate() { + info!( + "Processing batch {} of {} ({} wallets)...", + batch_idx + 1, + total_batches, + wallet_chunk.len() + ); + + // Refresh nonce for each batch to avoid stale nonce issues + current_nonce = match eth_rpc_provider + .get_transaction_count( + funding_wallet.address(), + Some(ethers::types::BlockNumber::Pending.into()), + ) + .await + { + Ok(nonce) => { + info!("Batch {}: Using fresh nonce {}", batch_idx + 1, nonce); + nonce + } Err(err) => { - error!("Could not fund wallet {}", err); - return; + error!("Could not get fresh nonce for batch {}: {}", batch_idx + 1, err); + current_nonce // Use previous nonce as fallback } }; - if let Err(err) = pending_transaction.await { - error!("Could not fund wallet {}", err); + + // ETH funding phase for this batch + info!( + "Batch {}: Starting ETH funding transactions...", + batch_idx + 1 + ); + let mut eth_funding_handles = Vec::new(); + + for (chunk_idx, wallet) in wallet_chunk.iter().enumerate() { + let global_idx = batch_idx * batch_size + chunk_idx; + let eth_rpc_provider = eth_rpc_provider.clone(); + let funding_wallet = funding_wallet.clone(); + let wallet_address = wallet.address(); + let nonce = current_nonce + U256::from(chunk_idx); + + let handle = tokio::spawn(async move { + + info!( + "Submitting ETH funding transaction for wallet {} with nonce {}", + global_idx, nonce + ); + let signer = SignerMiddleware::new(eth_rpc_provider, funding_wallet.clone()); + + // Get current gas price and bump it by 20% to avoid replacement issues + let base_gas_price = match signer.provider().get_gas_price().await { + Ok(price) => price, + Err(_) => U256::from(20_000_000_000u64), // 20 gwei fallback + }; + let bumped_gas_price = base_gas_price * 120 / 100; // 20% bump + + let tx = TransactionRequest::new() + .from(funding_wallet.address()) + .to(wallet_address) + .value(amount_to_deposit) + .nonce(nonce) + .gas_price(bumped_gas_price); + + let result = { + match signer.send_transaction(tx, None).await { + Ok(pending_tx) => { + info!( + "ETH funding transaction submitted for wallet {}", + global_idx + ); + pending_tx.await + } + Err(err) => { + error!( + "Could not submit ETH funding transaction for wallet {}: {}", + global_idx, err + ); + return None; + } + } + }; + + match result { + Ok(receipt) => { + if let Some(receipt) = receipt { + info!( + "ETH funding confirmed for wallet {} (tx: {:?})", + global_idx, receipt.transaction_hash + ); + } else { + info!( + "ETH funding confirmed for wallet {} (no receipt)", + global_idx + ); + } + Some(global_idx) + } + Err(err) => { + error!("ETH funding failed for wallet {}: {}", global_idx, err); + None + } + } + }); + eth_funding_handles.push(handle); + } + + // Wait for ETH funding to complete + let mut funded_indices = Vec::new(); + for handle in eth_funding_handles { + if let Ok(Some(idx)) = handle.await { + funded_indices.push(idx); + } } - info!("Wallet {} funded", i); - // Deposit to aligned - let amount_to_deposit_to_aligned = - parse_ether(&amount_to_deposit_aligned).expect("Ether format should be: XX.XX"); info!( - "Depositing {}wei to aligned {}", - amount_to_deposit_to_aligned, i + "Batch {}: ETH funding completed for {} out of {} wallets", + batch_idx + 1, + funded_indices.len(), + wallet_chunk.len() ); - let signer = SignerMiddleware::new(eth_rpc_provider.clone(), wallet.clone()); - if let Err(err) = deposit_to_aligned( - amount_to_deposit_to_aligned, - signer, - args.network.clone().into(), - ) - .await - { - error!("Could not deposit to aligned, err: {:?}", err); - return; + + if funded_indices.is_empty() { + warn!( + "Batch {}: No wallets were funded, skipping Aligned deposits", + batch_idx + 1 + ); + current_nonce += U256::from(wallet_chunk.len()); + continue; } - info!("Successfully deposited to aligned for wallet {}", i); - // Store private key - info!("Storing private key"); - let signer_bytes = wallet.signer().to_bytes(); - let secret_key_hex = ethers::utils::hex::encode(signer_bytes); + // Aligned deposit phase for funded wallets in this batch + info!( + "Batch {}: Starting Aligned deposit transactions...", + batch_idx + 1 + ); + let mut aligned_deposit_handles = Vec::new(); + + for &idx in &funded_indices { + let wallet = wallets[idx].clone(); + let eth_rpc_provider = eth_rpc_provider.clone(); + let network = args.network.clone(); + + let handle = tokio::spawn(async move { + + info!("Submitting Aligned deposit for wallet {}", idx); + let signer = SignerMiddleware::new(eth_rpc_provider, wallet); + + match deposit_to_aligned(amount_to_deposit_to_aligned, signer, network.into()).await + { + Ok(_) => { + info!("Successfully deposited to aligned for wallet {}", idx); + Ok(idx) + } + Err(err) => { + error!("Could not deposit to aligned for wallet {}: {:?}", idx, err); + Err(idx) + } + } + }); + aligned_deposit_handles.push(handle); + } - if let Err(err) = writeln!(file, "{}", secret_key_hex) { - error!("Could not store private key: {}", err); - } else { - info!("Private key {} stored", i); + // Wait for Aligned deposits to complete and write private keys immediately + let mut batch_successful = 0; + for handle in aligned_deposit_handles { + if let Ok(Ok(idx)) = handle.await { + let wallet_address = wallets[idx].address(); + let private_key = &wallet_private_keys[idx]; + + // Write to original file (private key only) for compatibility + if let Err(err) = writeln!(file, "{}", private_key) { + error!("Could not store private key for wallet {}: {}", idx, err); + continue; + } + + // Write to new file (private_key;address format) + let detailed_filepath = format!("{}.detailed", args.private_keys_filepath); + let detailed_file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(&detailed_filepath); + + match detailed_file { + Ok(mut f) => { + if let Err(err) = writeln!(f, "{};{:?}", private_key, wallet_address) { + error!("Could not store detailed info for wallet {}: {}", idx, err); + } else { + info!("Wallet {} stored: private key and address saved", idx); + batch_successful += 1; + } + } + Err(err) => { + error!("Could not open detailed file {}: {}", detailed_filepath, err); + // Still count as successful since main file was written + info!("Private key for wallet {} stored (detailed file failed)", idx); + batch_successful += 1; + } + } + } } + + total_successful += batch_successful; + current_nonce += U256::from(wallet_chunk.len()); + + info!( + "Batch {} completed: {} wallets successfully funded and deposited (Total: {} / {})", + batch_idx + 1, + batch_successful, + total_successful, + args.number_of_wallets + ); + + // Optional: Small delay between batches (commented out for speed) + // if batch_idx + 1 < total_batches { + // tokio::time::sleep(Duration::from_millis(50)).await; + // } } + + info!( + "All batches completed! Successfully created and funded {} wallets out of {} requested", + total_successful, args.number_of_wallets + ); + info!( + "Private keys for {} successful wallets stored in:", + total_successful + ); + info!(" - {} (private keys only, for compatibility)", args.private_keys_filepath); + info!(" - {}.detailed (private_key;address format)", args.private_keys_filepath); } /// infinitely hangs connections @@ -253,81 +456,60 @@ struct Sender { wallet: Wallet, } -pub async fn send_infinite_proofs(args: SendInfiniteProofsArgs) { - if matches!(args.network.clone().into(), Network::Holesky) { - error!("Network not supported this infinite proof sender"); - return; - } - - info!("Loading wallets"); - let mut senders = vec![]; - let Ok(eth_rpc_provider) = Provider::::try_from(args.eth_rpc_url.clone()) else { - error!("Could not connect to eth rpc"); - return; - }; - let Ok(chain_id) = eth_rpc_provider.get_chainid().await else { - error!("Could not get chain id"); - return; - }; +async fn load_senders_from_file( + eth_rpc_url: &str, + private_keys_filepath: &str, +) -> Result, String> { + let eth_rpc_provider = Provider::::try_from(eth_rpc_url) + .map_err(|_| "Could not connect to eth rpc".to_string())?; + let chain_id = eth_rpc_provider + .get_chainid() + .await + .map_err(|_| "Could not get chain id".to_string())?; - let file = match File::open(&args.private_keys_filepath) { - Ok(file) => file, - Err(err) => { - error!("Could not open private keys file: {}", err); - return; - } - }; + let file = File::open(private_keys_filepath) + .map_err(|err| format!("Could not open private keys file: {}", err))?; let reader = BufReader::new(file); + let mut senders = vec![]; - // now here we need to load the senders from the provided files for line in reader.lines() { - let private_key_str = match line { - Ok(line) => line, - Err(err) => { - error!("Could not read line from private keys file: {}", err); - return; - } - }; - let wallet = Wallet::from_str(private_key_str.trim()).expect("Invalid private key"); - let wallet = wallet.with_chain_id(chain_id.as_u64()); + let private_key_str = + line.map_err(|err| format!("Could not read line from private keys file: {}", err))?; + let wallet = Wallet::from_str(private_key_str.trim()) + .map_err(|_| "Invalid private key".to_string())? + .with_chain_id(chain_id.as_u64()); let sender = Sender { wallet }; - - // info!("Wallet {} loaded", i); senders.push(sender); } if senders.is_empty() { - error!("No wallets in file"); - return; - } - info!("All wallets loaded"); - - info!("Loading proofs verification data"); - let verification_data = - get_verification_data_from_proofs_folder(args.proofs_dir, senders[0].wallet.address()); - if verification_data.is_empty() { - error!("Verification data empty, not continuing"); - return; + return Err("No wallets in file".to_string()); } - info!("Proofs loaded!"); - let max_fee = U256::from_dec_str(&args.max_fee).expect("Invalid max fee"); + Ok(senders) +} +async fn run_infinite_proof_sender( + senders: Vec, + verification_data: Vec, + network: Network, + burst_size: usize, + burst_time_secs: u64, + max_fee: U256, + random_address: bool, +) { let mut handles = vec![]; - let network: Network = args.network.into(); - info!("Starting senders!"); + for (i, sender) in senders.iter().enumerate() { - // this clones are necessary because of the move let wallet = sender.wallet.clone(); let verification_data = verification_data.clone(); let network_clone = network.clone(); - // a thread to send tasks from each loaded wallet: let handle = tokio::spawn(async move { loop { let n = network_clone.clone(); - let mut result = Vec::with_capacity(args.burst_size); + let mut result = Vec::with_capacity(burst_size); let nonce = get_nonce_from_batcher(n.clone(), wallet.address()) .await .inspect_err(|e| { @@ -338,16 +520,25 @@ pub async fn send_infinite_proofs(args: SendInfiniteProofsArgs) { ) }) .unwrap(); - while result.len() < args.burst_size { + while result.len() < burst_size { let samples = verification_data - .choose_multiple(&mut thread_rng(), args.burst_size - result.len()); - result.extend(samples.cloned()); + .choose_multiple(&mut thread_rng(), burst_size - result.len()); + for mut sample in samples.cloned() { + // Randomize proof generator address if requested + if random_address { + sample.proof_generator_addr = Address::random(); + } else if sample.proof_generator_addr == Address::zero() { + // If it was set to zero (template), use wallet address + sample.proof_generator_addr = wallet.address(); + } + result.push(sample); + } } let verification_data_to_send = result; info!( "Sending {:?} Proofs to Aligned Batcher on {:?} from sender {}, nonce: {}, address: {:?}", - args.burst_size, n, i, nonce, wallet.address(), + burst_size, n, i, nonce, wallet.address(), ); let aligned_verification_data = submit_multiple( @@ -374,7 +565,7 @@ pub async fn send_infinite_proofs(args: SendInfiniteProofsArgs) { } info!("All responses received for sender {}", i); - tokio::time::sleep(Duration::from_secs(args.burst_time_secs)).await; + tokio::time::sleep(Duration::from_secs(burst_time_secs)).await; } }); @@ -386,64 +577,188 @@ pub async fn send_infinite_proofs(args: SendInfiniteProofsArgs) { } } +pub async fn send_infinite_proofs(args: SendInfiniteProofsArgs) { + if matches!(args.network.clone().into(), Network::Holesky) { + error!("Network not supported this infinite proof sender"); + return; + } + + // Load wallets using shared function + info!("Loading wallets"); + let senders = match load_senders_from_file(&args.eth_rpc_url, &args.private_keys_filepath).await + { + Ok(senders) => senders, + Err(err) => { + error!("{}", err); + return; + } + }; + info!("All wallets loaded"); + + // Load verification data based on proof type + let verification_data = match &args.proof_type { + InfiniteProofType::GnarkGroth16 { proofs_dir } => { + info!("Loading Groth16 proofs from directory structure"); + let data = get_verification_data_from_proofs_folder( + proofs_dir.clone(), + senders[0].wallet.address(), + ); + if data.is_empty() { + error!("Verification data empty, not continuing"); + return; + } + data + } + InfiniteProofType::Risc0 { + proof_path, + bin_path, + pub_path, + } => { + info!("Loading RISC Zero proof files"); + let Ok(proof) = std::fs::read(proof_path) else { + error!("Could not read proof file: {}", proof_path); + return; + }; + let Ok(vm_program) = std::fs::read(bin_path) else { + error!("Could not read bin file: {}", bin_path); + return; + }; + let pub_input = if let Some(pub_path) = pub_path { + std::fs::read(pub_path).ok() + } else { + None + }; + + // Create template verification data (without proof_generator_addr) + vec![VerificationData { + proving_system: ProvingSystemId::Risc0, + proof, + pub_input, + verification_key: None, + vm_program_code: Some(vm_program), + proof_generator_addr: Address::zero(), // Will be set randomly in the loop + }] + } + }; + + info!("Proofs loaded!"); + + let max_fee = U256::from_dec_str(&args.max_fee).expect("Invalid max fee"); + let network: Network = args.network.into(); + + info!("Starting senders!"); + run_infinite_proof_sender( + senders, + verification_data, + network, + args.burst_size, + args.burst_time_secs, + max_fee, + args.random_address, + ) + .await; +} + +fn load_groth16_proof_files( + dir_path: &std::path::Path, + base_name: &str, +) -> Option { + let proof_path = dir_path.join(format!("{}.proof", base_name)); + let public_input_path = dir_path.join(format!("{}.pub", base_name)); + let vk_path = dir_path.join(format!("{}.vk", base_name)); + + let proof = std::fs::read(&proof_path).ok()?; + let public_input = std::fs::read(&public_input_path).ok()?; + let vk = std::fs::read(&vk_path).ok()?; + + Some(VerificationData { + proving_system: ProvingSystemId::GnarkGroth16Bn254, + proof, + pub_input: Some(public_input), + verification_key: Some(vk), + vm_program_code: None, + proof_generator_addr: Address::zero(), // Will be set later + }) +} + +fn load_from_subdirectories(dir_path: &str) -> Vec { + let mut verifications_data = vec![]; + let dir = std::fs::read_dir(dir_path).expect("Directory does not exist"); + + for entry in dir.flatten() { + let proof_folder_dir = entry.path(); + if proof_folder_dir.is_dir() && proof_folder_dir.to_str().unwrap().contains("groth16") { + // Get the first file to determine the base name + if let Some(first_file) = fs::read_dir(&proof_folder_dir) + .ok() + .and_then(|dir| dir.flatten().map(|e| e.path()).find(|path| path.is_file())) + { + if let Some(base_name) = first_file.file_stem().and_then(|s| s.to_str()) { + if let Some(verification_data) = + load_groth16_proof_files(&proof_folder_dir, base_name) + { + verifications_data.push(verification_data); + } + } + } + } + } + + verifications_data +} + +fn load_from_flat_directory(dir_path: &str) -> Vec { + let mut verifications_data = vec![]; + let mut base_names = std::collections::HashSet::new(); + + // Collect all unique base names from .proof files + if let Ok(dir) = std::fs::read_dir(dir_path) { + for entry in dir.flatten() { + let path = entry.path(); + if path.is_file() && path.extension().and_then(|s| s.to_str()) == Some("proof") { + if let Some(base_name) = path.file_stem().and_then(|s| s.to_str()) { + base_names.insert(base_name.to_string()); + } + } + } + } + + // Load verification data for each base name + let dir_path = std::path::Path::new(dir_path); + for base_name in base_names { + if let Some(verification_data) = load_groth16_proof_files(dir_path, &base_name) { + verifications_data.push(verification_data); + } + } + + verifications_data +} + /// Returns the corresponding verification data for the generated proofs directory fn get_verification_data_from_proofs_folder( dir_path: String, default_addr: Address, ) -> Vec { - let mut verifications_data = vec![]; - info!("Reading proofs from {:?}", dir_path); - let dir = std::fs::read_dir(dir_path).expect("Directory does not exists"); - - for proof_folder in dir { - // each proof_folder is a dir called groth16_n - let proof_folder_dir = proof_folder.unwrap().path(); - if proof_folder_dir.is_dir() { - // todo(marcos): this should be improved if we want to support more proofs - // currently we stored the proofs on subdirs with a prefix for the proof type - // and here we check the subdir name and based on build the verification data accordingly - if proof_folder_dir.to_str().unwrap().contains("groth16") { - // Get the first file from the folder - let first_file = fs::read_dir(proof_folder_dir.clone()) - .expect("Can't read proofs directory") - .filter_map(|entry| entry.ok().map(|e| e.path())) - .find(|path| path.is_file()) // Find any valid file - .expect("No valid proof files found"); - - // Extract the base name (file stem) without extension - let base_name = first_file - .file_stem() - .and_then(|s| s.to_str()) - .expect("Failed to extract base name"); - - // Generate the paths for the other files - let proof_path = proof_folder_dir.join(format!("{}.proof", base_name)); - let public_input_path = proof_folder_dir.join(format!("{}.pub", base_name)); - let vk_path = proof_folder_dir.join(format!("{}.vk", base_name)); - - let Ok(proof) = std::fs::read(&proof_path) else { - continue; - }; - let Ok(public_input) = std::fs::read(&public_input_path) else { - continue; - }; - let Ok(vk) = std::fs::read(&vk_path) else { - continue; - }; + // Check if we have subdirectories with groth16 in the name + let has_groth16_subdirs = std::fs::read_dir(&dir_path) + .map(|dir| { + dir.flatten().any(|entry| { + entry.path().is_dir() && entry.path().to_str().unwrap().contains("groth16") + }) + }) + .unwrap_or(false); + + let mut verifications_data = if has_groth16_subdirs { + load_from_subdirectories(&dir_path) + } else { + load_from_flat_directory(&dir_path) + }; - let verification_data = VerificationData { - proving_system: ProvingSystemId::GnarkGroth16Bn254, - proof, - pub_input: Some(public_input), - verification_key: Some(vk), - vm_program_code: None, - proof_generator_addr: default_addr, - }; - verifications_data.push(verification_data); - } - } + // Set the default address for all verification data + for data in &mut verifications_data { + data.proof_generator_addr = default_addr; } verifications_data diff --git a/crates/task-sender/src/structs.rs b/crates/task-sender/src/structs.rs index 88c50184e..359258629 100644 --- a/crates/task-sender/src/structs.rs +++ b/crates/task-sender/src/structs.rs @@ -126,11 +126,38 @@ pub struct SendInfiniteProofsArgs { )] pub private_keys_filepath: String, #[arg( - name = "The generated proofs directory", - long = "proofs-dirpath", - default_value = "devnet" + name = "Use random addresses for proof generator", + long = "random-address", + action = clap::ArgAction::SetTrue )] - pub proofs_dir: String, + pub random_address: bool, + #[clap(subcommand)] + pub proof_type: InfiniteProofType, +} + +#[derive(Parser, Debug)] +pub enum InfiniteProofType { + #[clap(about = "Send infinite Gnark Groth16 proofs from directory")] + GnarkGroth16 { + #[arg( + name = "The generated proofs directory", + long = "proofs-dir", + default_value = "scripts/test_files/task_sender/proofs" + )] + proofs_dir: String, + }, + #[clap(about = "Send infinite RISC Zero proofs from file paths")] + Risc0 { + #[arg(name = "Path to RISC Zero proof file (.proof)", long = "proof-path")] + proof_path: String, + #[arg(name = "Path to RISC Zero binary file (.bin)", long = "bin-path")] + bin_path: String, + #[arg( + name = "Path to RISC Zero public input file (.pub) - optional", + long = "pub-path" + )] + pub_path: Option, + }, } #[derive(Debug, Clone, Copy)] @@ -139,6 +166,7 @@ enum NetworkNameArg { Holesky, HoleskyStage, Mainnet, + Sepolia, } impl FromStr for NetworkNameArg { @@ -150,8 +178,9 @@ impl FromStr for NetworkNameArg { "holesky" => Ok(NetworkNameArg::Holesky), "holesky-stage" => Ok(NetworkNameArg::HoleskyStage), "mainnet" => Ok(NetworkNameArg::Mainnet), + "sepolia" => Ok(NetworkNameArg::Sepolia), _ => Err( - "Unknown network. Possible values: devnet, holesky, holesky-stage, mainnet" + "Unknown network. Possible values: devnet, holesky, holesky-stage, mainnet, sepolia" .to_string(), ), } @@ -164,7 +193,7 @@ pub struct NetworkArg { name = "The working network's name", long = "network", default_value = "devnet", - help = "[possible values: devnet, holesky, holesky-stage, mainnet]" + help = "[possible values: devnet, holesky, holesky-stage, mainnet, sepolia]" )] network: Option, #[arg( @@ -214,6 +243,7 @@ impl From for Network { Some(NetworkNameArg::Holesky) => Network::Holesky, Some(NetworkNameArg::HoleskyStage) => Network::HoleskyStage, Some(NetworkNameArg::Mainnet) => Network::Mainnet, + Some(NetworkNameArg::Sepolia) => Network::Sepolia, } } } diff --git a/network_params.yaml b/network_params.yaml index 058b9eaa6..274fa3696 100644 --- a/network_params.yaml +++ b/network_params.yaml @@ -21,7 +21,7 @@ port_publisher: ethereum_genesis_generator_params: # The image to use for ethereum genesis generator - image: ethpandaops/ethereum-genesis-generator:4.1.1 + image: ethpandaops/ethereum-genesis-generator:4.1.19 # Default configuration parameters for the network network_params: