From 6d45f72a5f1cc8498a7f58913ac8ad3ebf69fe42 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Mon, 14 Apr 2025 10:58:16 +0200 Subject: [PATCH 01/10] WIP: checkpoint Libp2pService --- Cargo.lock | 1294 +++++++++++++++-- rust-executor/Cargo.toml | 2 + rust-executor/src/lib.rs | 31 + rust-executor/src/libp2p_service/mod.rs | 309 ++++ .../src/perspectives/perspective_instance.rs | 31 +- 5 files changed, 1562 insertions(+), 105 deletions(-) create mode 100644 rust-executor/src/libp2p_service/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 6990153cc..53a944740 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -111,17 +111,18 @@ dependencies = [ "kitsune_p2p_types", "lazy_static", "libc", + "libp2p", "log", "maplit", "multibase", - "multihash", + "multihash 0.18.1", "once_cell", "os_info", "rand 0.8.5", "regex", "reqwest 0.11.20", "rocket", - "rodio 0.17.3", + "rodio", "rusqlite", "rust-embed", "rustls 0.23.12", @@ -299,7 +300,7 @@ dependencies = [ "getrandom 0.2.15", "once_cell", "version_check", - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -625,8 +626,24 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f6fd5ddaf0351dff5b8da21b2fb4ff8e08ddd02857f0bf69c47639106c0fff0" dependencies = [ - "asn1-rs-derive", - "asn1-rs-impl", + "asn1-rs-derive 0.4.0", + "asn1-rs-impl 0.1.0", + "displaydoc", + "nom 7.1.3", + "num-traits", + "rusticata-macros", + "thiserror 1.0.63", + "time 0.3.36", +] + +[[package]] +name = "asn1-rs" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5493c3bedbacf7fd7382c6346bbd66687d12bbaad3a89a2d2c303ee6cf20b048" +dependencies = [ + "asn1-rs-derive 0.5.1", + "asn1-rs-impl 0.2.0", "displaydoc", "nom 7.1.3", "num-traits", @@ -647,6 +664,18 @@ dependencies = [ "synstructure 0.12.6", ] +[[package]] +name = "asn1-rs-derive" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "965c2d33e53cb6b267e148a4cb0760bc01f4904c1cd4bb4002a085bb016d1490" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", + "synstructure 0.13.1", +] + [[package]] name = "asn1-rs-impl" version = "0.1.0" @@ -658,6 +687,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "asn1-rs-impl" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "ast_node" version = "0.9.9" @@ -1000,6 +1040,19 @@ dependencies = [ "tungstenite 0.18.0", ] +[[package]] +name = "asynchronous-codec" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a860072022177f903e59730004fb5dc13db9275b79bb2aef7ba8ce831956c233" +dependencies = [ + "bytes", + "futures-sink", + "futures-util", + "memchr", + "pin-project-lite", +] + [[package]] name = "atk" version = "0.18.0" @@ -1066,6 +1119,17 @@ version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" +[[package]] +name = "attohttpc" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d9a9bf8b79a749ee0b911b91b671cc2b6c670bdbc7e3dfd537576ddc94bb2a2" +dependencies = [ + "http 0.2.12", + "log", + "url", +] + [[package]] name = "atty" version = "0.2.14" @@ -1278,7 +1342,7 @@ dependencies = [ "bitflags 2.6.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.12.1", "lazy_static", "lazycell", "log", @@ -1561,6 +1625,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "771fe0050b883fcc3ea2359b1a96bcfbc090b7116eae7c3c512c7a083fdf23d3" +[[package]] +name = "bs58" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf88ba1141d185c399bee5288d850d63b8369520c1eafc32a0430b5b6c287bf4" +dependencies = [ + "tinyvec", +] + [[package]] name = "bson" version = "2.11.0" @@ -2034,9 +2107,9 @@ checksum = "fd94671561e36e4e7de75f753f577edafb0e7c05d6e4547229fdf7938fbcd2c3" dependencies = [ "core2", "multibase", - "multihash", + "multihash 0.18.1", "serde", - "unsigned-varint", + "unsigned-varint 0.7.2", ] [[package]] @@ -3003,7 +3076,7 @@ dependencies = [ "cssparser-macros", "dtoa-short", "itoa 1.0.11", - "phf 0.8.0", + "phf 0.11.2", "smallvec", ] @@ -3091,6 +3164,7 @@ dependencies = [ "cfg-if 1.0.0", "cpufeatures", "curve25519-dalek-derive", + "digest 0.10.7", "fiat-crypto", "rustc_version 0.4.1", "subtle", @@ -3129,7 +3203,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b28bfe653d79bd16c77f659305b195b82bb5ce0c0eb2a4846b82ddbd77586813" dependencies = [ "bitflags 2.6.0", - "libloading 0.7.4", + "libloading 0.8.5", "winapi 0.3.9", ] @@ -3630,8 +3704,8 @@ source = "git+https://github.com/coasys/deno_core.git?branch=v8-dylib#4c825881bc dependencies = [ "anyhow", "bincode", - "bit-set 0.5.3", - "bit-vec 0.6.3", + "bit-set 0.8.0", + "bit-vec 0.8.0", "bytes", "cooked-waker", "deno_core_icudata", @@ -3959,7 +4033,7 @@ dependencies = [ "winapi 0.3.9", "windows-sys 0.48.0", "x25519-dalek 2.0.1", - "x509-parser", + "x509-parser 0.15.1", ] [[package]] @@ -4269,7 +4343,21 @@ version = "8.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dbd676fbbab537128ef0278adb5576cf363cff6aa22a7b24effe97347cfab61e" dependencies = [ - "asn1-rs", + "asn1-rs 0.5.2", + "displaydoc", + "nom 7.1.3", + "num-bigint", + "num-traits", + "rusticata-macros", +] + +[[package]] +name = "der-parser" +version = "9.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5cd0a5c643689626bec213c4d8bd4d96acc8ffdb4ad4bb6bc16abf27d5f4b553" +dependencies = [ + "asn1-rs 0.6.2", "displaydoc", "nom 7.1.3", "num-bigint", @@ -4449,10 +4537,10 @@ dependencies = [ "arrayref", "base64 0.13.1", "bls12_381_plus", - "bs58", + "bs58 0.4.0", "curve25519-dalek 3.2.0", "did_url", - "ed25519-dalek", + "ed25519-dalek 1.0.1", "getrandom 0.2.15", "hkdf 0.11.0", "json-patch 0.2.7", @@ -4615,7 +4703,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "330c60081dcc4c72131f8eb70510f1ac07223e5d4163db481a04a0befcffa412" dependencies = [ - "libloading 0.7.4", + "libloading 0.8.5", ] [[package]] @@ -4888,6 +4976,16 @@ dependencies = [ "signature 1.6.4", ] +[[package]] +name = "ed25519" +version = "2.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "115531babc129696a58c64a4fef0a8bf9e9698629fb97e9e40767d235cfbcd53" +dependencies = [ + "pkcs8 0.10.2", + "signature 2.2.0", +] + [[package]] name = "ed25519-dalek" version = "1.0.1" @@ -4895,13 +4993,28 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c762bae6dcaf24c4c84667b8579785430908723d5c889f469d76a41d59cc7a9d" dependencies = [ "curve25519-dalek 3.2.0", - "ed25519", + "ed25519 1.5.3", "rand 0.7.3", "serde", "sha2 0.9.9", "zeroize", ] +[[package]] +name = "ed25519-dalek" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a3daa8e81a3963a60642bcc1f90a670680bd4a77535faa384e9d1c79d620871" +dependencies = [ + "curve25519-dalek 4.1.3", + "ed25519 2.2.3", + "rand_core 0.6.4", + "serde", + "sha2 0.10.8", + "subtle", + "zeroize", +] + [[package]] name = "ego-tree" version = "0.6.3" @@ -5717,6 +5830,16 @@ dependencies = [ "futures-util", ] +[[package]] +name = "futures-bounded" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91f328e7fb845fc832912fb6a34f40cf6d1888c92f974d1893a54e97b5ff542e" +dependencies = [ + "futures-timer", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -5753,6 +5876,7 @@ dependencies = [ "futures-core", "futures-task", "futures-util", + "num_cpus", ] [[package]] @@ -5800,6 +5924,17 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "futures-rustls" +version = "0.26.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8f2f12607f92c69b12ed746fabf9ca4f5c482cba46679c1a75b874ed7c26adb" +dependencies = [ + "futures-io", + "rustls 0.23.12", + "rustls-pki-types", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -6093,6 +6228,19 @@ dependencies = [ "windows 0.48.0", ] +[[package]] +name = "generator" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc6bd114ceda131d3b1d665eba35788690ad37f5916457286b32ab6fd3c438dd" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "log", + "rustversion", + "windows 0.58.0", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -6181,6 +6329,18 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", +] + [[package]] name = "ghash" version = "0.5.1" @@ -7069,6 +7229,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fe2267d4ed49bc07b63801559be28c718ea06c4738b7a03c94df7386d2cde46" +[[package]] +name = "hex_fmt" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b07f60793ff0a4d9cef0f18e63b5357e06209987153a64648c972c1e5aff336f" + [[package]] name = "hexf-parse" version = "0.2.1" @@ -7092,6 +7258,53 @@ dependencies = [ "ureq", ] +[[package]] +name = "hickory-proto" +version = "0.25.0-alpha.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d00147af6310f4392a31680db52a3ed45a2e0f68eb18e8c3fe5537ecc96d9e2" +dependencies = [ + "async-recursion", + "async-trait", + "cfg-if 1.0.0", + "data-encoding", + "enum-as-inner 0.6.0", + "futures-channel", + "futures-io", + "futures-util", + "idna 1.0.3", + "ipnet", + "once_cell", + "rand 0.9.0", + "socket2 0.5.7", + "thiserror 2.0.11", + "tinyvec", + "tokio", + "tracing", + "url", +] + +[[package]] +name = "hickory-resolver" +version = "0.25.0-alpha.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5762f69ebdbd4ddb2e975cd24690bf21fe6b2604039189c26acddbc427f12887" +dependencies = [ + "cfg-if 1.0.0", + "futures-util", + "hickory-proto", + "ipconfig", + "moka", + "once_cell", + "parking_lot 0.12.3", + "rand 0.9.0", + "resolv-conf", + "smallvec", + "thiserror 2.0.11", + "tokio", + "tracing", +] + [[package]] name = "hkdf" version = "0.11.0" @@ -8014,7 +8227,7 @@ dependencies = [ "httpdate", "itoa 1.0.11", "pin-project-lite", - "socket2 0.4.10", + "socket2 0.5.7", "tokio", "tower-service", "tracing", @@ -8345,12 +8558,56 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "if-watch" +version = "3.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdf9d64cfcf380606e64f9a0bcf493616b65331199f984151a6fa11a7b3cde38" +dependencies = [ + "async-io 2.3.4", + "core-foundation 0.9.4", + "fnv", + "futures", + "if-addrs 0.10.2", + "ipnet", + "log", + "netlink-packet-core", + "netlink-packet-route", + "netlink-proto", + "netlink-sys", + "rtnetlink", + "system-configuration", + "tokio", + "windows 0.52.0", +] + [[package]] name = "if_chain" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb56e1aa765b4b4f3aadfab769793b7087bb03a4ea4920644a6d238e2df5b9ed" +[[package]] +name = "igd-next" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76b0d7d4541def58a37bf8efc559683f21edce7c82f0d866c93ac21f7e098f93" +dependencies = [ + "async-trait", + "attohttpc", + "bytes", + "futures", + "http 1.1.0", + "http-body-util", + "hyper 1.1.0", + "hyper-util", + "log", + "rand 0.8.5", + "tokio", + "url", + "xmltree", +] + [[package]] name = "image" version = "0.23.14" @@ -8936,7 +9193,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6971da4d9c3aa03c3d8f3ff0f4155b534aad021292003895a469716b2a230378" dependencies = [ "base64 0.21.7", - "pem", + "pem 1.1.1", "ring 0.16.20", "serde", "serde_json", @@ -9174,7 +9431,7 @@ dependencies = [ "ort", "ort-sys", "rand 0.8.5", - "rodio 0.20.1", + "rodio", "rwhisper", "serde_json", "tokio", @@ -9569,7 +9826,7 @@ dependencies = [ "nanoid", "once_cell", "parking_lot 0.12.3", - "rcgen", + "rcgen 0.10.0", "serde", "serde_json", "serde_yaml", @@ -9594,7 +9851,7 @@ dependencies = [ "nanoid", "once_cell", "parking_lot 0.12.3", - "rcgen", + "rcgen 0.10.0", "serde", "serde_json", "serde_yaml", @@ -9938,43 +10195,484 @@ dependencies = [ ] [[package]] -name = "libredox" -version = "0.1.3" +name = "libp2p" +version = "0.55.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +checksum = "b72dc443ddd0254cb49a794ed6b6728400ee446a0f7ab4a07d0209ee98de20e9" dependencies = [ - "bitflags 2.6.0", - "libc", - "redox_syscall 0.5.3", + "bytes", + "either", + "futures", + "futures-timer", + "getrandom 0.2.15", + "libp2p-allow-block-list", + "libp2p-connection-limits", + "libp2p-core", + "libp2p-dns", + "libp2p-gossipsub", + "libp2p-identify", + "libp2p-identity", + "libp2p-kad", + "libp2p-mdns", + "libp2p-metrics", + "libp2p-noise", + "libp2p-ping", + "libp2p-quic", + "libp2p-relay", + "libp2p-request-response", + "libp2p-swarm", + "libp2p-tcp", + "libp2p-upnp", + "libp2p-yamux", + "multiaddr", + "pin-project", + "rw-stream-sink", + "thiserror 2.0.11", ] [[package]] -name = "libsecp256k1" -version = "0.7.1" +name = "libp2p-allow-block-list" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "95b09eff1b35ed3b33b877ced3a691fc7a481919c7e29c53c906226fcf55e2a1" +checksum = "38944b7cb981cc93f2f0fb411ff82d0e983bd226fbcc8d559639a3a73236568b" dependencies = [ - "arrayref", - "base64 0.13.1", - "digest 0.9.0", - "hmac-drbg", - "libsecp256k1-core", - "libsecp256k1-gen-ecmult", - "libsecp256k1-gen-genmult", - "rand 0.8.5", - "serde", - "sha2 0.9.9", - "typenum", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", ] [[package]] -name = "libsecp256k1-core" -version = "0.3.0" +name = "libp2p-connection-limits" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5be9b9bb642d8522a44d533eab56c16c738301965504753b03ad1de3425d5451" +checksum = "efe9323175a17caa8a2ed4feaf8a548eeef5e0b72d03840a0eab4bcb0210ce1c" dependencies = [ - "crunchy", - "digest 0.9.0", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", +] + +[[package]] +name = "libp2p-core" +version = "0.43.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "193c75710ba43f7504ad8f58a62ca0615b1d7e572cb0f1780bc607252c39e9ef" +dependencies = [ + "either", + "fnv", + "futures", + "futures-timer", + "libp2p-identity", + "multiaddr", + "multihash 0.19.3", + "multistream-select", + "once_cell", + "parking_lot 0.12.3", + "pin-project", + "quick-protobuf", + "rand 0.8.5", + "rw-stream-sink", + "thiserror 2.0.11", + "tracing", + "unsigned-varint 0.8.0", + "web-time", +] + +[[package]] +name = "libp2p-dns" +version = "0.43.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b780a1150214155b0ed1cdf09fbd2e1b0442604f9146a431d1b21d23eef7bd7" +dependencies = [ + "async-trait", + "futures", + "hickory-resolver", + "libp2p-core", + "libp2p-identity", + "parking_lot 0.12.3", + "smallvec", + "tracing", +] + +[[package]] +name = "libp2p-gossipsub" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d558548fa3b5a8e9b66392f785921e363c57c05dcadfda4db0d41ae82d313e4a" +dependencies = [ + "async-channel 2.3.1", + "asynchronous-codec", + "base64 0.22.1", + "byteorder", + "bytes", + "either", + "fnv", + "futures", + "futures-timer", + "getrandom 0.2.15", + "hashlink", + "hex_fmt", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "prometheus-client", + "quick-protobuf", + "quick-protobuf-codec", + "rand 0.8.5", + "regex", + "sha2 0.10.8", + "tracing", + "web-time", +] + +[[package]] +name = "libp2p-identify" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8c06862544f02d05d62780ff590cc25a75f5c2b9df38ec7a370dcae8bb873cf" +dependencies = [ + "asynchronous-codec", + "either", + "futures", + "futures-bounded", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "quick-protobuf", + "quick-protobuf-codec", + "smallvec", + "thiserror 2.0.11", + "tracing", +] + +[[package]] +name = "libp2p-identity" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "257b5621d159b32282eac446bed6670c39c7dc68a200a992d8f056afa0066f6d" +dependencies = [ + "bs58 0.5.1", + "ed25519-dalek 2.1.1", + "hkdf 0.12.4", + "multihash 0.19.3", + "quick-protobuf", + "rand 0.8.5", + "sha2 0.10.8", + "thiserror 1.0.63", + "tracing", + "zeroize", +] + +[[package]] +name = "libp2p-kad" +version = "0.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bab0466a27ebe955bcbc27328fae5429c5b48c915fd6174931414149802ec23" +dependencies = [ + "asynchronous-codec", + "bytes", + "either", + "fnv", + "futures", + "futures-bounded", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "quick-protobuf", + "quick-protobuf-codec", + "rand 0.8.5", + "sha2 0.10.8", + "smallvec", + "thiserror 2.0.11", + "tracing", + "uint", + "web-time", +] + +[[package]] +name = "libp2p-mdns" +version = "0.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11d0ba095e1175d797540e16b62e7576846b883cb5046d4159086837b36846cc" +dependencies = [ + "futures", + "hickory-proto", + "if-watch", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "rand 0.8.5", + "smallvec", + "socket2 0.5.7", + "tokio", + "tracing", +] + +[[package]] +name = "libp2p-metrics" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2ce58c64292e87af624fcb86465e7dd8342e46a388d71e8fec0ab37ee789630a" +dependencies = [ + "futures", + "libp2p-core", + "libp2p-gossipsub", + "libp2p-identify", + "libp2p-identity", + "libp2p-kad", + "libp2p-ping", + "libp2p-relay", + "libp2p-swarm", + "pin-project", + "prometheus-client", + "web-time", +] + +[[package]] +name = "libp2p-noise" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afcc133e0f3cea07acde6eb8a9665cb11b600bd61110b010593a0210b8153b16" +dependencies = [ + "asynchronous-codec", + "bytes", + "futures", + "libp2p-core", + "libp2p-identity", + "multiaddr", + "multihash 0.19.3", + "once_cell", + "quick-protobuf", + "rand 0.8.5", + "snow", + "static_assertions", + "thiserror 2.0.11", + "tracing", + "x25519-dalek 2.0.1", + "zeroize", +] + +[[package]] +name = "libp2p-ping" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2529993ff22deb2504c0130a58b60fb77f036be555053922db1a0490b5798b" +dependencies = [ + "futures", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "rand 0.8.5", + "tracing", + "web-time", +] + +[[package]] +name = "libp2p-quic" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41432a159b00424a0abaa2c80d786cddff81055ac24aa127e0cf375f7858d880" +dependencies = [ + "futures", + "futures-timer", + "if-watch", + "libp2p-core", + "libp2p-identity", + "libp2p-tls", + "quinn 0.11.6", + "rand 0.8.5", + "ring 0.17.8", + "rustls 0.23.12", + "socket2 0.5.7", + "thiserror 2.0.11", + "tokio", + "tracing", +] + +[[package]] +name = "libp2p-relay" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08a41e346681395877118c270cf993f90d57d045fbf0913ca2f07b59ec6062e4" +dependencies = [ + "asynchronous-codec", + "bytes", + "either", + "futures", + "futures-bounded", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "quick-protobuf", + "quick-protobuf-codec", + "rand 0.8.5", + "static_assertions", + "thiserror 2.0.11", + "tracing", + "web-time", +] + +[[package]] +name = "libp2p-request-response" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "548fe44a80ff275d400f1b26b090d441d83ef73efabbeb6415f4ce37e5aed865" +dependencies = [ + "async-trait", + "futures", + "futures-bounded", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm", + "rand 0.8.5", + "serde", + "serde_json", + "smallvec", + "tracing", +] + +[[package]] +name = "libp2p-swarm" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "803399b4b6f68adb85e63ab573ac568154b193e9a640f03e0f2890eabbcb37f8" +dependencies = [ + "either", + "fnv", + "futures", + "futures-timer", + "libp2p-core", + "libp2p-identity", + "libp2p-swarm-derive", + "lru", + "multistream-select", + "once_cell", + "rand 0.8.5", + "smallvec", + "tokio", + "tracing", + "web-time", +] + +[[package]] +name = "libp2p-swarm-derive" +version = "0.35.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206e0aa0ebe004d778d79fb0966aa0de996c19894e2c0605ba2f8524dd4443d8" +dependencies = [ + "heck 0.5.0", + "proc-macro2", + "quote", + "syn 2.0.87", +] + +[[package]] +name = "libp2p-tcp" +version = "0.43.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65346fb4d36035b23fec4e7be4c320436ba53537ce9b6be1d1db1f70c905cad0" +dependencies = [ + "futures", + "futures-timer", + "if-watch", + "libc", + "libp2p-core", + "socket2 0.5.7", + "tokio", + "tracing", +] + +[[package]] +name = "libp2p-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcaebc1069dea12c5b86a597eaaddae0317c2c2cb9ec99dc94f82fd340f5c78b" +dependencies = [ + "futures", + "futures-rustls", + "libp2p-core", + "libp2p-identity", + "rcgen 0.11.3", + "ring 0.17.8", + "rustls 0.23.12", + "rustls-webpki 0.101.7", + "thiserror 2.0.11", + "x509-parser 0.16.0", + "yasna", +] + +[[package]] +name = "libp2p-upnp" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d457b9ecceb66e7199f049926fad447f1f17f040e8d29d690c086b4cab8ed14a" +dependencies = [ + "futures", + "futures-timer", + "igd-next", + "libp2p-core", + "libp2p-swarm", + "tokio", + "tracing", +] + +[[package]] +name = "libp2p-yamux" +version = "0.47.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f15df094914eb4af272acf9adaa9e287baa269943f32ea348ba29cfb9bfc60d8" +dependencies = [ + "either", + "futures", + "libp2p-core", + "thiserror 2.0.11", + "tracing", + "yamux 0.12.1", + "yamux 0.13.4", +] + +[[package]] +name = "libredox" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0ff37bd590ca25063e35af745c343cb7a0271906fb7b37e4813e8f79f00268d" +dependencies = [ + "bitflags 2.6.0", + "libc", + "redox_syscall 0.5.3", +] + +[[package]] +name = "libsecp256k1" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "95b09eff1b35ed3b33b877ced3a691fc7a481919c7e29c53c906226fcf55e2a1" +dependencies = [ + "arrayref", + "base64 0.13.1", + "digest 0.9.0", + "hmac-drbg", + "libsecp256k1-core", + "libsecp256k1-gen-ecmult", + "libsecp256k1-gen-genmult", + "rand 0.8.5", + "serde", + "sha2 0.9.9", + "typenum", +] + +[[package]] +name = "libsecp256k1-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5be9b9bb642d8522a44d533eab56c16c738301965504753b03ad1de3425d5451" +dependencies = [ + "crunchy", + "digest 0.9.0", "subtle", ] @@ -10168,7 +10866,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ff50ecb28bb86013e935fb6683ab1f6d3a20016f123c76fd4c27470076ac30f5" dependencies = [ "cfg-if 1.0.0", - "generator", + "generator 0.7.5", "scoped-tls", "serde", "serde_json", @@ -10176,6 +10874,19 @@ dependencies = [ "tracing-subscriber 0.3.18", ] +[[package]] +name = "loom" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "419e0dc8046cb947daa77eb95ae174acfbddb7673b4151f56d1eed8e93fbfaca" +dependencies = [ + "cfg-if 1.0.0", + "generator 0.8.4", + "scoped-tls", + "tracing", + "tracing-subscriber 0.3.18", +] + [[package]] name = "lru" version = "0.12.4" @@ -10682,6 +11393,25 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "moka" +version = "0.12.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9321642ca94a4282428e6ea4af8cc2ca4eac48ac7a6a4ea8f33f76d0ce70926" +dependencies = [ + "crossbeam-channel 0.5.13", + "crossbeam-epoch", + "crossbeam-utils 0.8.20", + "loom 0.7.2", + "parking_lot 0.12.3", + "portable-atomic", + "rustc_version 0.4.1", + "smallvec", + "tagptr", + "thiserror 1.0.63", + "uuid 1.10.0", +] + [[package]] name = "monostate" version = "0.1.13" @@ -10787,6 +11517,25 @@ dependencies = [ "version_check", ] +[[package]] +name = "multiaddr" +version = "0.18.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe6351f60b488e04c1d21bc69e56b89cb3f5e8f5d22557d6e8031bdfd79b6961" +dependencies = [ + "arrayref", + "byteorder", + "data-encoding", + "libp2p-identity", + "multibase", + "multihash 0.19.3", + "percent-encoding", + "serde", + "static_assertions", + "unsigned-varint 0.8.0", + "url", +] + [[package]] name = "multibase" version = "0.9.1" @@ -10812,7 +11561,17 @@ dependencies = [ "multihash-derive", "sha2 0.10.8", "sha3", - "unsigned-varint", + "unsigned-varint 0.7.2", +] + +[[package]] +name = "multihash" +version = "0.19.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b430e7953c29dd6a09afc29ff0bb69c6e306329ee6794700aee27b76a1aea8d" +dependencies = [ + "core2", + "unsigned-varint 0.8.0", ] [[package]] @@ -10838,6 +11597,20 @@ dependencies = [ "serde", ] +[[package]] +name = "multistream-select" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea0df8e5eec2298a62b326ee4f0d7fe1a6b90a09dfcf9df37b38f947a8c42f19" +dependencies = [ + "bytes", + "futures", + "log", + "pin-project", + "smallvec", + "unsigned-varint 0.7.2", +] + [[package]] name = "must_future" version = "0.1.2" @@ -11003,13 +11776,77 @@ dependencies = [ ] [[package]] -name = "netif" -version = "0.1.6" +name = "netif" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d29a01b9f018d6b7b277fef6c79fdbd9bf17bb2d1e298238055cafab49baa5ee" +dependencies = [ + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "netlink-packet-core" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72724faf704479d67b388da142b186f916188505e7e0b26719019c525882eda4" +dependencies = [ + "anyhow", + "byteorder", + "netlink-packet-utils", +] + +[[package]] +name = "netlink-packet-route" +version = "0.17.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "053998cea5a306971f88580d0829e90f270f940befd7cf928da179d4187a5a66" +dependencies = [ + "anyhow", + "bitflags 1.3.2", + "byteorder", + "libc", + "netlink-packet-core", + "netlink-packet-utils", +] + +[[package]] +name = "netlink-packet-utils" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ede8a08c71ad5a95cdd0e4e52facd37190977039a4704eb82a283f713747d34" +dependencies = [ + "anyhow", + "byteorder", + "paste", + "thiserror 1.0.63", +] + +[[package]] +name = "netlink-proto" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72452e012c2f8d612410d89eea01e2d9b56205274abb35d53f60200b2ec41d60" +dependencies = [ + "bytes", + "futures", + "log", + "netlink-packet-core", + "netlink-sys", + "thiserror 2.0.11", +] + +[[package]] +name = "netlink-sys" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d29a01b9f018d6b7b277fef6c79fdbd9bf17bb2d1e298238055cafab49baa5ee" +checksum = "416060d346fbaf1f23f9512963e3e878f1a78e707cb699ba9215761754244307" dependencies = [ + "bytes", + "futures", "libc", - "winapi 0.3.9", + "log", + "tokio", ] [[package]] @@ -11145,6 +11982,12 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a0f889fb66f7acdf83442c35775764b51fed3c606ab9cee51500dbde2cf528ca" +[[package]] +name = "nohash-hasher" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2bf50223579dc7cdcfb3bfcacf7069ff68243f8c363f62ffa99cf000a6b9c451" + [[package]] name = "nom" version = "5.1.3" @@ -11401,7 +12244,7 @@ version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af1844ef2428cc3e1cb900be36181049ef3d3193c63e43026cfe202983b27a56" dependencies = [ - "proc-macro-crate 2.0.0", + "proc-macro-crate 3.1.0", "proc-macro2", "quote", "syn 2.0.87", @@ -11718,14 +12561,23 @@ version = "0.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9bedf36ffb6ba96c2eb7144ef6270557b52e54b20c0a8e1eb2ff99a6c6959bff" dependencies = [ - "asn1-rs", + "asn1-rs 0.5.2", +] + +[[package]] +name = "oid-registry" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a8d8034d9489cdaf79228eb9f6a3b8d7bb32ba00d6645ebd48eef4077ceb5bd9" +dependencies = [ + "asn1-rs 0.6.2", ] [[package]] name = "once_cell" -version = "1.19.0" +version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" [[package]] name = "one_err" @@ -12446,7 +13298,7 @@ dependencies = [ "font", "itertools 0.13.0", "log", - "ordered-float 2.10.1", + "ordered-float 5.0.0", "pathfinder_color", "pathfinder_content", "pathfinder_geometry", @@ -12489,6 +13341,16 @@ dependencies = [ "base64 0.13.1", ] +[[package]] +name = "pem" +version = "3.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38af38e8470ac9dee3ce1bae1af9c1671fffc44ddfd8bd1d0a3445bf349a8ef3" +dependencies = [ + "base64 0.22.1", + "serde", +] + [[package]] name = "pem-rfc7468" version = "0.7.0" @@ -12916,7 +13778,7 @@ version = "0.2.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" dependencies = [ - "zerocopy", + "zerocopy 0.7.35", ] [[package]] @@ -13141,6 +14003,29 @@ dependencies = [ "thiserror 1.0.63", ] +[[package]] +name = "prometheus-client" +version = "0.22.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "504ee9ff529add891127c4827eb481bd69dc0ebc72e9a682e187db4caa60c3ca" +dependencies = [ + "dtoa", + "itoa 1.0.11", + "parking_lot 0.12.3", + "prometheus-client-derive-encode", +] + +[[package]] +name = "prometheus-client-derive-encode" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "proptest" version = "1.5.0" @@ -13337,6 +14222,28 @@ version = "1.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" +[[package]] +name = "quick-protobuf" +version = "0.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d6da84cc204722a989e01ba2f6e1e276e190f22263d0cb6ce8526fcdb0d2e1f" +dependencies = [ + "byteorder", +] + +[[package]] +name = "quick-protobuf-codec" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15a0580ab32b169745d7a39db2ba969226ca16738931be152a3209b409de2474" +dependencies = [ + "asynchronous-codec", + "bytes", + "quick-protobuf", + "thiserror 1.0.63", + "unsigned-varint 0.8.0", +] + [[package]] name = "quick-xml" version = "0.26.0" @@ -13410,6 +14317,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "62e96808277ec6f97351a2380e6c25114bc9e67037775464979f3037c92d05ef" dependencies = [ "bytes", + "futures-io", "pin-project-lite", "quinn-proto 0.11.9", "quinn-udp 0.5.7", @@ -13498,6 +14406,12 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" + [[package]] name = "r2d2" version = "0.8.10" @@ -13569,6 +14483,17 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3779b94aeb87e8bd4e834cee3650289ee9e0d5677f976ecdb6d219e5f4f6cd94" +dependencies = [ + "rand_chacha 0.9.0", + "rand_core 0.9.3", + "zerocopy 0.8.24", +] + [[package]] name = "rand-utf8" version = "0.0.1" @@ -13608,6 +14533,16 @@ dependencies = [ "rand_core 0.6.4", ] +[[package]] +name = "rand_chacha" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" +dependencies = [ + "ppv-lite86", + "rand_core 0.9.3", +] + [[package]] name = "rand_core" version = "0.3.1" @@ -13641,6 +14576,15 @@ dependencies = [ "getrandom 0.2.15", ] +[[package]] +name = "rand_core" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "99d9a13982dcf210057a8a78572b2217b667c3beacbf3a0d8b454f6f82837d38" +dependencies = [ + "getrandom 0.3.2", +] + [[package]] name = "rand_distr" version = "0.4.3" @@ -13829,13 +14773,25 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffbe84efe2f38dea12e9bfc1f65377fdf03e53a18cb3b995faedf7934c7e785b" dependencies = [ - "pem", + "pem 1.1.1", "ring 0.16.20", "time 0.3.36", "yasna", "zeroize", ] +[[package]] +name = "rcgen" +version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52c4f3084aa3bc7dfbba4eff4fab2a54db4324965d8872ab933565e6fbd83bc6" +dependencies = [ + "pem 3.0.5", + "ring 0.16.20", + "time 0.3.36", + "yasna", +] + [[package]] name = "rdrand" version = "0.4.0" @@ -14370,19 +15326,6 @@ dependencies = [ "uncased", ] -[[package]] -name = "rodio" -version = "0.17.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b1bb7b48ee48471f55da122c0044fcc7600cfcc85db88240b89cb832935e611" -dependencies = [ - "claxon", - "cpal", - "hound", - "lewton", - "symphonia", -] - [[package]] name = "rodio" version = "0.20.1" @@ -14457,6 +15400,24 @@ dependencies = [ "quick-xml 0.36.1", ] +[[package]] +name = "rtnetlink" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a552eb82d19f38c3beed3f786bd23aa434ceb9ac43ab44419ca6d67a7e186c0" +dependencies = [ + "futures", + "log", + "netlink-packet-core", + "netlink-packet-route", + "netlink-packet-utils", + "netlink-proto", + "netlink-sys", + "nix 0.26.2", + "thiserror 1.0.63", + "tokio", +] + [[package]] name = "rtoolbox" version = "0.0.2" @@ -14833,6 +15794,17 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rw-stream-sink" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8c9026ff5d2f23da5e45bbc283f156383001bfb09c4e44256d02c1a685fe9a1" +dependencies = [ + "futures", + "pin-project", + "static_assertions", +] + [[package]] name = "rwhisper" version = "0.3.5" @@ -14851,7 +15823,7 @@ dependencies = [ "kalosm-language-model", "kalosm-streams", "rand 0.8.5", - "rodio 0.20.1", + "rodio", "serde_json", "thiserror 2.0.11", "tokenizers", @@ -15917,12 +16889,29 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38d1e02fca405f6280643174a50c942219f0bbf4dbf7d480f1dd864d6f211ae5" dependencies = [ - "heck 0.4.1", + "heck 0.5.0", "proc-macro2", "quote", "syn 2.0.87", ] +[[package]] +name = "snow" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "850948bee068e713b8ab860fe1adc4d109676ab4c3b621fd8147f06b261f2f85" +dependencies = [ + "aes-gcm", + "blake2", + "chacha20poly1305", + "curve25519-dalek 4.1.3", + "rand_core 0.6.4", + "ring 0.17.8", + "rustc_version 0.4.1", + "sha2 0.10.8", + "subtle", +] + [[package]] name = "socket2" version = "0.4.10" @@ -16149,7 +17138,7 @@ version = "0.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b8c4a4445d81357df8b1a650d0d0d6fbbbfe99d064aa5e02f3e4022061476d8" dependencies = [ - "loom", + "loom 0.5.6", ] [[package]] @@ -16990,6 +17979,12 @@ dependencies = [ "serde", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tao" version = "0.30.8" @@ -18172,9 +19167,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.40" +version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ "log", "pin-project-lite", @@ -18184,9 +19179,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.27" +version = "0.1.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" dependencies = [ "proc-macro2", "quote", @@ -18195,9 +19190,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.32" +version = "0.1.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" dependencies = [ "once_cell", "valuable", @@ -18492,8 +19487,8 @@ version = "1.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" dependencies = [ - "cfg-if 0.1.10", - "rand 0.6.5", + "cfg-if 1.0.0", + "rand 0.8.5", "static_assertions", ] @@ -18604,7 +19599,7 @@ dependencies = [ "parking_lot 0.12.3", "rand 0.8.5", "rand-utf8", - "rcgen", + "rcgen 0.10.0", "ring 0.16.20", "rustls 0.20.9", "rustls-native-certs", @@ -18757,6 +19752,18 @@ dependencies = [ "ug", ] +[[package]] +name = "uint" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "909988d098b2f738727b161a106cfc7cab00c539c2687a8836f8e565976fb53e" +dependencies = [ + "byteorder", + "crunchy", + "hex", + "static_assertions", +] + [[package]] name = "unarray" version = "0.1.4" @@ -18935,6 +19942,12 @@ version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6889a77d49f1f013504cec6bf97a2c730394adedaeb1deb5ea08949a50541105" +[[package]] +name = "unsigned-varint" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" + [[package]] name = "untrusted" version = "0.7.1" @@ -19287,6 +20300,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasite" version = "0.1.0" @@ -19859,7 +20881,7 @@ dependencies = [ "log", "naga", "once_cell", - "parking_lot 0.11.2", + "parking_lot 0.12.3", "profiling", "raw-window-handle 0.6.2", "ron", @@ -19894,14 +20916,14 @@ dependencies = [ "js-sys", "khronos-egl", "libc", - "libloading 0.7.4", + "libloading 0.8.5", "log", "metal 0.28.0", "naga", "ndk-sys 0.5.0+25.2.9519653", "objc", "once_cell", - "parking_lot 0.11.2", + "parking_lot 0.12.3", "profiling", "range-alloc", "raw-window-handle 0.6.2", @@ -20580,6 +21602,15 @@ version = "0.0.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d135d17ab770252ad95e9a872d365cf3090e3be864a34ab46f48555993efc904" +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags 2.6.0", +] + [[package]] name = "write16" version = "1.0.0" @@ -20717,12 +21748,29 @@ version = "0.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7069fba5b66b9193bd2c5d3d4ff12b839118f6bcbef5328efafafb5395cf63da" dependencies = [ - "asn1-rs", + "asn1-rs 0.5.2", + "data-encoding", + "der-parser 8.2.0", + "lazy_static", + "nom 7.1.3", + "oid-registry 0.6.1", + "rusticata-macros", + "thiserror 1.0.63", + "time 0.3.36", +] + +[[package]] +name = "x509-parser" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcbc162f30700d6f3f82a24bf7cc62ffe7caea42c0b2cba8bf7f3ae50cf51f69" +dependencies = [ + "asn1-rs 0.6.2", "data-encoding", - "der-parser", + "der-parser 9.0.0", "lazy_static", "nom 7.1.3", - "oid-registry", + "oid-registry 0.7.1", "rusticata-macros", "thiserror 1.0.63", "time 0.3.36", @@ -20773,6 +21821,15 @@ dependencies = [ "time 0.1.45", ] +[[package]] +name = "xmltree" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7d8a75eaf6557bb84a65ace8609883db44a29951042ada9b393151532e41fcb" +dependencies = [ + "xml-rs", +] + [[package]] name = "xsalsa20poly1305" version = "0.9.1" @@ -20801,6 +21858,37 @@ dependencies = [ "linked-hash-map", ] +[[package]] +name = "yamux" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ed0164ae619f2dc144909a9f082187ebb5893693d8c0196e8085283ccd4b776" +dependencies = [ + "futures", + "log", + "nohash-hasher", + "parking_lot 0.12.3", + "pin-project", + "rand 0.8.5", + "static_assertions", +] + +[[package]] +name = "yamux" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17610762a1207ee816c6fadc29220904753648aba0a9ed61c7b8336e80a559c4" +dependencies = [ + "futures", + "log", + "nohash-hasher", + "parking_lot 0.12.3", + "pin-project", + "rand 0.8.5", + "static_assertions", + "web-time", +] + [[package]] name = "yansi" version = "0.5.1" @@ -20919,7 +22007,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" dependencies = [ "byteorder", - "zerocopy-derive", + "zerocopy-derive 0.7.35", +] + +[[package]] +name = "zerocopy" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2586fea28e186957ef732a5f8b3be2da217d65c5969d4b1e17f973ebbe876879" +dependencies = [ + "zerocopy-derive 0.8.24", ] [[package]] @@ -20933,6 +22030,17 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "zerocopy-derive" +version = "0.8.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a996a8f63c5c4448cd959ac1bab0aaa3306ccfd060472f85943ee0750f0169be" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.87", +] + [[package]] name = "zerofrom" version = "0.1.4" diff --git a/rust-executor/Cargo.toml b/rust-executor/Cargo.toml index be790a41c..de4a7b9ce 100644 --- a/rust-executor/Cargo.toml +++ b/rust-executor/Cargo.toml @@ -117,6 +117,8 @@ libc = "0.2" chat-gpt-lib-rs = { version = "0.5.1", git = "https://github.com/coasys/chat-gpt-lib-rs" } anyhow = "1.0.95" +libp2p = { version = "0.55", features = ["tcp", "dns", "noise", "gossipsub", "kad", "request-response", "yamux", "tokio", "json", "macros", "identify", "relay", "quic", "ping"] } + [dev-dependencies] maplit = "1.0.2" lazy_static = "1.4.0" diff --git a/rust-executor/src/lib.rs b/rust-executor/src/lib.rs index 35b2a8ab2..951649b13 100644 --- a/rust-executor/src/lib.rs +++ b/rust-executor/src/lib.rs @@ -24,6 +24,7 @@ mod pubsub; #[cfg(test)] mod test_utils; pub mod types; +pub mod libp2p_service; use std::{env, thread::JoinHandle}; @@ -40,11 +41,41 @@ pub use config::Ad4mConfig; pub use holochain_service::run_local_hc_services; use libc::{sigaction, sigemptyset, sighandler_t, SA_ONSTACK, SIGURG}; use std::ptr; +use libp2p_service::Libp2pService; +use std::sync::Arc; +use tokio::sync::Mutex; extern "C" fn handle_sigurg(_: libc::c_int) { //println!("Received SIGURG signal, but ignoring it."); } +lazy_static! { + static ref LIBP2P_SERVICE: Arc>> = Arc::new(Mutex::new(None)); +} + +pub async fn init_libp2p_service() -> Result<()> { + let bootstrap_nodes = vec![ + // TODO: Add the actual bootstrap node URL + "/ip4/127.0.0.1/tcp/4001".to_string(), + ]; + + let service = Libp2pService::new(bootstrap_nodes).await?; + service.start().await?; + + let mut global_service = LIBP2P_SERVICE.lock().await; + *global_service = Some(service); + + Ok(()) +} + +pub async fn get_libp2p_service() -> Result> { + let service = LIBP2P_SERVICE.lock().await; + match &*service { + Some(s) => Ok(Arc::new(s.clone())), + None => Err(anyhow!("Libp2p service not initialized")), + } +} + /// Runs the GraphQL server and the deno core runtime pub async fn run(mut config: Ad4mConfig) -> JoinHandle<()> { unsafe { diff --git a/rust-executor/src/libp2p_service/mod.rs b/rust-executor/src/libp2p_service/mod.rs new file mode 100644 index 000000000..3dd676e66 --- /dev/null +++ b/rust-executor/src/libp2p_service/mod.rs @@ -0,0 +1,309 @@ +use std::{ + hash::{Hash, Hasher}, +}; + +use libp2p::{ + core::upgrade, + futures::StreamExt, + swarm::{Swarm, SwarmEvent, NetworkBehaviour}, + Transport, PeerId, + gossipsub::{self, IdentTopic, MessageAuthenticity, Event as GossipsubEvent}, + request_response::{self, ProtocolSupport, json, Event as RequestResponseEvent, Message as RequestResponseMessage}, + noise, tcp, yamux, + StreamProtocol, + //NetworkBehaviour as _, +}; +//use libp2p::swarm::derive::NetworkBehaviour; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::Mutex; +use anyhow::{Result, anyhow}; +use crate::types::Expression; +use std::io; + +// Define PerspectiveExpression as an alias for Expression +pub type PerspectiveExpression = Expression; + +// Topic for each neighbourhood's telepresence signals +const TELEPRESENCE_TOPIC_PREFIX: &str = "/ad4m/telepresence/"; +const REQUEST_RESPONSE_PROTOCOL: StreamProtocol = StreamProtocol::new("/ad4m/telepresence/1.0.0"); + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum TelepresenceMessage { + OnlineStatusRequest, + OnlineStatusResponse(PerspectiveExpression), + Signal(PerspectiveExpression), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct OnlineAgent { + pub did: String, + pub status: PerspectiveExpression, +} + +#[derive(Debug)] +pub enum MyBehaviourEvent { + Gossipsub(gossipsub::Event), + RequestResponse(request_response::Event), +} + +impl From for MyBehaviourEvent { + fn from(event: gossipsub::Event) -> Self { + MyBehaviourEvent::Gossipsub(event) + } +} + +impl From> for MyBehaviourEvent { + fn from(event: request_response::Event) -> Self { + MyBehaviourEvent::RequestResponse(event) + } +} + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "MyBehaviourEvent")] +struct MyBehaviour { + gossipsub: gossipsub::Behaviour, + request_response: json::Behaviour, +} + +pub struct Libp2pService { + swarm: Arc>>, + online_agents: Arc>>>, // neighbourhood_id -> agent_did -> OnlineAgent + bootstrap_nodes: Vec, + signal_callbacks: Arc>>>>, + peer_to_did: Arc>>, // Map peer IDs to agent DIDs +} + +impl Libp2pService { + pub async fn new(bootstrap_nodes: Vec) -> Result { + // Create a swarm with a custom network behaviour + let swarm = libp2p::SwarmBuilder::with_new_identity() + .with_tokio() + .with_tcp( + tcp::Config::default(), + noise::Config::new, + yamux::Config::default + )? + .with_quic() + .with_behaviour(|key| { + // Set up gossipsub + let message_id_fn = |message: &gossipsub::Message| { + let mut s = std::collections::hash_map::DefaultHasher::new(); + message.data.hash(&mut s); + gossipsub::MessageId::from(s.finish().to_string()) + }; + + let gossipsub_config = gossipsub::ConfigBuilder::default() + .heartbeat_interval(std::time::Duration::from_secs(1)) + .validation_mode(gossipsub::ValidationMode::Strict) + .message_id_fn(message_id_fn) + .build() + .map_err(anyhow::Error::msg)?; + + let gossipsub = gossipsub::Behaviour::new( + MessageAuthenticity::Signed(key.clone()), + gossipsub_config, + )?; + + // Set up request-response + let request_response = json::Behaviour::new( + [(REQUEST_RESPONSE_PROTOCOL, ProtocolSupport::Full)], + request_response::Config::default(), + ); + + Ok(MyBehaviour { + gossipsub, + request_response, + }) + })? + .build(); + + Ok(Self { + swarm: Arc::new(Mutex::new(swarm)), + online_agents: Arc::new(Mutex::new(HashMap::new())), + bootstrap_nodes, + signal_callbacks: Arc::new(Mutex::new(HashMap::new())), + peer_to_did: Arc::new(Mutex::new(HashMap::new())), + }) + } + + pub async fn start(&self) -> Result<()> { + let mut swarm = self.swarm.lock().await; + + // Connect to bootstrap nodes + for node in &self.bootstrap_nodes { + if let Ok(addr) = node.parse::() { + (*swarm).dial(addr)?; + } + } + + // Start listening on all interfaces + (*swarm).listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; + + // Start processing events + let swarm_clone = self.swarm.clone(); + let online_agents_clone = self.online_agents.clone(); + let signal_callbacks_clone = self.signal_callbacks.clone(); + let peer_to_did_clone = self.peer_to_did.clone(); + + tokio::spawn(async move { + let mut swarm = swarm_clone.lock().await; + + while let Some(event) = swarm.next().await { + match event { + SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message { + message, + propagation_source: peer_id, + .. + })) => { + // Extract neighbourhood_id from topic + let topic_str = message.topic.to_string(); + if let Some(neighbourhood_id) = topic_str.strip_prefix(TELEPRESENCE_TOPIC_PREFIX) { + if let Ok(msg) = serde_json::from_slice::(&message.data) { + match msg { + TelepresenceMessage::Signal(payload) => { + let callbacks = signal_callbacks_clone.lock().await; + if let Some(neighbourhood_callbacks) = callbacks.get(neighbourhood_id) { + for callback in neighbourhood_callbacks { + callback(payload.clone()); + } + } + }, + _ => {} // Other message types handled by request-response + } + } + } + }, + SwarmEvent::Behaviour(MyBehaviourEvent::RequestResponse(request_response::Event::Message { + message: request_response::Message::Request { request, channel, .. }, + peer: peer_id, + .. + })) => { + match request { + TelepresenceMessage::OnlineStatusRequest => { + let online_agents = online_agents_clone.lock().await; + let peer_to_did = peer_to_did_clone.lock().await; + + // TODO: Get neighbourhood_id from somewhere + let neighbourhood_id = "placeholder"; + + if let Some(agents) = online_agents.get(neighbourhood_id) { + if let Some(agent_did) = peer_to_did.get(&peer_id) { + if let Some(agent) = agents.get(agent_did) { + swarm.behaviour_mut().request_response.send_response(channel, TelepresenceMessage::OnlineStatusResponse(agent.status.clone())) + .expect("Failed to send response"); + } + } + } + }, + _ => {} + } + }, + SwarmEvent::Behaviour(MyBehaviourEvent::RequestResponse(request_response::Event::Message { + message: request_response::Message::Response { response, .. }, + peer: peer_id, + .. + })) => { + if let TelepresenceMessage::OnlineStatusResponse(status) = response { + // Update the agent's status in our local map + let mut online_agents = online_agents_clone.lock().await; + let peer_to_did = peer_to_did_clone.lock().await; + + // TODO: Get neighbourhood_id from somewhere + let neighbourhood_id = "placeholder"; + + if let Some(agent_did) = peer_to_did.get(&peer_id) { + let neighbourhood_agents = online_agents.entry(neighbourhood_id.to_string()) + .or_insert_with(HashMap::new); + + neighbourhood_agents.insert(agent_did.clone(), OnlineAgent { + did: agent_did.clone(), + status, + }); + } + } + }, + _ => {} + } + } + }); + + Ok(()) + } + + pub async fn set_online_status(&self, neighbourhood_id: &str, status: PerspectiveExpression) -> Result<()> { + let mut online_agents = self.online_agents.lock().await; + // TODO: Get agent DID from somewhere + let agent_did = "placeholder_did".to_string(); + + let neighbourhood_agents = online_agents.entry(neighbourhood_id.to_string()) + .or_insert_with(HashMap::new); + + neighbourhood_agents.insert(agent_did.clone(), OnlineAgent { + did: agent_did, + status, + }); + + // Subscribe to the neighbourhood's telepresence topic if not already subscribed + let topic = gossipsub::IdentTopic::new(format!("{}{}", TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id)); + let mut swarm = self.swarm.lock().await; + swarm.behaviour_mut().gossipsub.subscribe(&topic)?; + + Ok(()) + } + + pub async fn get_online_agents(&self, neighbourhood_id: &str) -> Result> { + // First get local agents + let mut online_agents = self.online_agents.lock().await; + let neighbourhood_agents = online_agents.entry(neighbourhood_id.to_string()) + .or_insert_with(HashMap::new); + let mut agents = neighbourhood_agents.values().cloned().collect::>(); + + // Then request status from all others in the neighbourhood + let topic = gossipsub::IdentTopic::new(format!("{}{}", TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id)); + let request = TelepresenceMessage::OnlineStatusRequest; + let data = serde_json::to_vec(&request)?; + + let mut swarm = self.swarm.lock().await; + swarm.behaviour_mut().gossipsub.publish(topic, data)?; + + // TODO: Wait for responses and add them to agents list + // This would require implementing a timeout and response handling + + Ok(agents) + } + + pub async fn send_signal(&self, neighbourhood_id: &str, remote_agent_did: &str, payload: PerspectiveExpression) -> Result<()> { + let topic = gossipsub::IdentTopic::new(format!("{}{}", TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id)); + let message = TelepresenceMessage::Signal(payload); + let data = serde_json::to_vec(&message)?; + + let mut swarm = self.swarm.lock().await; + swarm.behaviour_mut().gossipsub.publish(topic, data)?; + + Ok(()) + } + + pub async fn send_broadcast(&self, neighbourhood_id: &str, payload: PerspectiveExpression) -> Result<()> { + let topic = gossipsub::IdentTopic::new(format!("{}{}", TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id)); + let message = TelepresenceMessage::Signal(payload); + let data = serde_json::to_vec(&message)?; + + let mut swarm = self.swarm.lock().await; + swarm.behaviour_mut().gossipsub.publish(topic, data)?; + + Ok(()) + } + + pub async fn register_signal_callback(&self, neighbourhood_id: &str, callback: F) -> Result<()> + where + F: Fn(PerspectiveExpression) + Send + Sync + 'static, + { + let mut callbacks = self.signal_callbacks.lock().await; + let neighbourhood_callbacks = callbacks.entry(neighbourhood_id.to_string()) + .or_insert_with(Vec::new); + neighbourhood_callbacks.push(Box::new(callback)); + Ok(()) + } +} \ No newline at end of file diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index ee25b6561..ea3b4b61d 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -38,6 +38,7 @@ use tokio::time::{sleep, Instant}; use tokio::{join, time}; use uuid; use uuid::Uuid; +use crate::libp2p_service::Libp2pService; static MAX_COMMIT_BYTES: usize = 3_000_000; //3MiB static MAX_PENDING_DIFFS_COUNT: usize = 150; @@ -1490,11 +1491,13 @@ impl PerspectiveInstance { } pub async fn set_online_status(&self, status: PerspectiveExpression) -> Result<(), AnyError> { - let mut link_language_guard = self.link_language.lock().await; - if let Some(link_language) = link_language_guard.as_mut() { - link_language.set_online_status(status).await + let handle = self.persisted.lock().await.clone(); + if let Some(neighbourhood) = &handle.neighbourhood { + let service = get_libp2p_service().await?; + service.set_online_status(&neighbourhood.data.link_language, status).await?; + Ok(()) } else { - Err(self.no_link_language_error().await) + Err(anyhow!("Perspective is not part of a neighbourhood")) } } @@ -1503,11 +1506,13 @@ impl PerspectiveInstance { remote_agent_did: String, payload: PerspectiveExpression, ) -> Result<(), AnyError> { - let mut link_language_guard = self.link_language.lock().await; - if let Some(link_language) = link_language_guard.as_mut() { - link_language.send_signal(remote_agent_did, payload).await + let handle = self.persisted.lock().await.clone(); + if let Some(neighbourhood) = &handle.neighbourhood { + let service = get_libp2p_service().await?; + service.send_signal(&neighbourhood.data.link_language, &remote_agent_did, payload).await?; + Ok(()) } else { - Err(self.no_link_language_error().await) + Err(anyhow!("Perspective is not part of a neighbourhood")) } } @@ -1527,11 +1532,13 @@ impl PerspectiveInstance { }); } - let mut link_language_guard = self.link_language.lock().await; - if let Some(link_language) = link_language_guard.as_mut() { - link_language.send_broadcast(payload).await + let handle = self.persisted.lock().await.clone(); + if let Some(neighbourhood) = &handle.neighbourhood { + let service = get_libp2p_service().await?; + service.send_broadcast(&neighbourhood.data.link_language, payload).await?; + Ok(()) } else { - Err(self.no_link_language_error().await) + Err(anyhow!("Perspective is not part of a neighbourhood")) } } From 1f1190d4ead40b0b4abc451950a8ffb5c4ddc1d2 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Mon, 14 Apr 2025 11:52:36 +0200 Subject: [PATCH 02/10] WIP: build Libp2pService --- rust-executor/src/lib.rs | 36 ++++----------- rust-executor/src/libp2p_service/mod.rs | 46 +++++++++++++++++-- .../src/perspectives/perspective_instance.rs | 9 ++-- rust-executor/src/types.rs | 41 +++++++++++++++++ 4 files changed, 95 insertions(+), 37 deletions(-) diff --git a/rust-executor/src/lib.rs b/rust-executor/src/lib.rs index 951649b13..6863d2db5 100644 --- a/rust-executor/src/lib.rs +++ b/rust-executor/src/lib.rs @@ -49,33 +49,6 @@ extern "C" fn handle_sigurg(_: libc::c_int) { //println!("Received SIGURG signal, but ignoring it."); } -lazy_static! { - static ref LIBP2P_SERVICE: Arc>> = Arc::new(Mutex::new(None)); -} - -pub async fn init_libp2p_service() -> Result<()> { - let bootstrap_nodes = vec![ - // TODO: Add the actual bootstrap node URL - "/ip4/127.0.0.1/tcp/4001".to_string(), - ]; - - let service = Libp2pService::new(bootstrap_nodes).await?; - service.start().await?; - - let mut global_service = LIBP2P_SERVICE.lock().await; - *global_service = Some(service); - - Ok(()) -} - -pub async fn get_libp2p_service() -> Result> { - let service = LIBP2P_SERVICE.lock().await; - match &*service { - Some(s) => Ok(Arc::new(s.clone())), - None => Err(anyhow!("Libp2p service not initialized")), - } -} - /// Runs the GraphQL server and the deno core runtime pub async fn run(mut config: Ad4mConfig) -> JoinHandle<()> { unsafe { @@ -118,6 +91,15 @@ pub async fn run(mut config: Ad4mConfig) -> JoinHandle<()> { .await .expect("Couldn't initialize AI service"); + info!("Initializing libp2p service..."); + let bootstrap_nodes = vec![ + // TODO: Add the actual bootstrap node URL + "/ip4/127.0.0.1/tcp/4001".to_string(), + ]; + Libp2pService::init_global_instance(bootstrap_nodes) + .await + .expect("Couldn't initialize libp2p service"); + info!("Initializing Agent service..."); AgentService::init_global_instance(config.app_data_path.clone().unwrap()); diff --git a/rust-executor/src/libp2p_service/mod.rs b/rust-executor/src/libp2p_service/mod.rs index 3dd676e66..0f51ee927 100644 --- a/rust-executor/src/libp2p_service/mod.rs +++ b/rust-executor/src/libp2p_service/mod.rs @@ -1,5 +1,6 @@ use std::{ hash::{Hash, Hasher}, + time::Duration, }; use libp2p::{ @@ -19,11 +20,11 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; use anyhow::{Result, anyhow}; -use crate::types::Expression; -use std::io; +use crate::types::PerspectiveExpression; -// Define PerspectiveExpression as an alias for Expression -pub type PerspectiveExpression = Expression; +lazy_static! { + static ref LIBP2P_SERVICE: Arc>> = Arc::new(Mutex::new(None)); +} // Topic for each neighbourhood's telepresence signals const TELEPRESENCE_TOPIC_PREFIX: &str = "/ad4m/telepresence/"; @@ -67,6 +68,7 @@ struct MyBehaviour { request_response: json::Behaviour, } +#[derive(Clone)] pub struct Libp2pService { swarm: Arc>>, online_agents: Arc>>>, // neighbourhood_id -> agent_did -> OnlineAgent @@ -306,4 +308,40 @@ impl Libp2pService { neighbourhood_callbacks.push(Box::new(callback)); Ok(()) } + + pub async fn init_global_instance(bootstrap_nodes: Vec) -> Result<()> { + let service = Libp2pService::new(bootstrap_nodes).await?; + service.start().await?; + + let mut global_service = LIBP2P_SERVICE.lock().await; + *global_service = Some(service); + + Ok(()) + } + + pub async fn global_instance() -> Result { + LIBP2P_SERVICE + .lock() + .await + .clone() + .ok_or(anyhow!("Libp2p service not initialized")) + } + + pub async fn with_global_instance(func: F) -> Result + where + F: FnOnce(&Libp2pService) -> R, + { + let global_instance_arc = LIBP2P_SERVICE.lock().await; + let service_ref = global_instance_arc.as_ref().ok_or(anyhow!("Libp2p service not initialized"))?; + Ok(func(service_ref)) + } + + pub async fn with_mutable_global_instance(func: F) -> Result + where + F: FnOnce(&mut Libp2pService) -> R, + { + let mut global_instance_arc = LIBP2P_SERVICE.lock().await; + let service_mut = global_instance_arc.as_mut().ok_or(anyhow!("Libp2p service not initialized"))?; + Ok(func(service_mut)) + } } \ No newline at end of file diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index ea3b4b61d..61c12cea8 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -1493,8 +1493,7 @@ impl PerspectiveInstance { pub async fn set_online_status(&self, status: PerspectiveExpression) -> Result<(), AnyError> { let handle = self.persisted.lock().await.clone(); if let Some(neighbourhood) = &handle.neighbourhood { - let service = get_libp2p_service().await?; - service.set_online_status(&neighbourhood.data.link_language, status).await?; + Libp2pService::global_instance().await?.set_online_status(&neighbourhood.data.link_language, status.into()).await?; Ok(()) } else { Err(anyhow!("Perspective is not part of a neighbourhood")) @@ -1508,8 +1507,7 @@ impl PerspectiveInstance { ) -> Result<(), AnyError> { let handle = self.persisted.lock().await.clone(); if let Some(neighbourhood) = &handle.neighbourhood { - let service = get_libp2p_service().await?; - service.send_signal(&neighbourhood.data.link_language, &remote_agent_did, payload).await?; + Libp2pService::global_instance().await?.send_signal(&neighbourhood.data.link_language, &remote_agent_did, payload.into()).await?; Ok(()) } else { Err(anyhow!("Perspective is not part of a neighbourhood")) @@ -1534,8 +1532,7 @@ impl PerspectiveInstance { let handle = self.persisted.lock().await.clone(); if let Some(neighbourhood) = &handle.neighbourhood { - let service = get_libp2p_service().await?; - service.send_broadcast(&neighbourhood.data.link_language, payload).await?; + Libp2pService::global_instance().await?.send_broadcast(&neighbourhood.data.link_language, payload.into()).await?; Ok(()) } else { Err(anyhow!("Perspective is not part of a neighbourhood")) diff --git a/rust-executor/src/types.rs b/rust-executor/src/types.rs index 89f8a96f7..498bd4fbd 100644 --- a/rust-executor/src/types.rs +++ b/rust-executor/src/types.rs @@ -242,6 +242,47 @@ impl From for Perspective { } } +impl From for Perspective { + fn from(input: crate::graphql::graphql_types::Perspective) -> Self { + Perspective { links: input.links.into_iter().map(LinkExpression::try_from).filter_map(Result::ok).collect() } + } +} + +#[derive(GraphQLObject, Serialize, Deserialize, Debug, Clone, PartialEq, Default)] +pub struct PerspectiveExpression { + pub author: String, + pub data: Perspective, + pub proof: ExpressionProof, + pub timestamp: String, +} + +impl From> for PerspectiveExpression { + fn from(expr: Expression) -> Self { + PerspectiveExpression { + author: expr.author, + data: expr.data, + proof: expr.proof, + timestamp: expr.timestamp, + } + } +} + +impl From for PerspectiveExpression { + fn from(input: crate::graphql::graphql_types::PerspectiveExpression) -> Self { + PerspectiveExpression { + author: input.author, + data: input.data.into(), + proof: ExpressionProof { + key: input.proof.key, + signature: input.proof.signature, + }, + timestamp: input.timestamp, + } + } +} + + + #[derive(GraphQLObject, Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct Neighbourhood { pub link_language: String, From cf0a5d645e1cc4ddbd0ab6e69c8ceb56cbbf40e1 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Mon, 14 Apr 2025 11:56:01 +0200 Subject: [PATCH 03/10] cleanup --- rust-executor/src/lib.rs | 2 -- rust-executor/src/libp2p_service/mod.rs | 10 ++++------ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/rust-executor/src/lib.rs b/rust-executor/src/lib.rs index 6863d2db5..5d34594cf 100644 --- a/rust-executor/src/lib.rs +++ b/rust-executor/src/lib.rs @@ -42,8 +42,6 @@ pub use holochain_service::run_local_hc_services; use libc::{sigaction, sigemptyset, sighandler_t, SA_ONSTACK, SIGURG}; use std::ptr; use libp2p_service::Libp2pService; -use std::sync::Arc; -use tokio::sync::Mutex; extern "C" fn handle_sigurg(_: libc::c_int) { //println!("Received SIGURG signal, but ignoring it."); diff --git a/rust-executor/src/libp2p_service/mod.rs b/rust-executor/src/libp2p_service/mod.rs index 0f51ee927..ec44ea2ab 100644 --- a/rust-executor/src/libp2p_service/mod.rs +++ b/rust-executor/src/libp2p_service/mod.rs @@ -1,15 +1,13 @@ use std::{ hash::{Hash, Hasher}, - time::Duration, }; use libp2p::{ - core::upgrade, futures::StreamExt, swarm::{Swarm, SwarmEvent, NetworkBehaviour}, - Transport, PeerId, - gossipsub::{self, IdentTopic, MessageAuthenticity, Event as GossipsubEvent}, - request_response::{self, ProtocolSupport, json, Event as RequestResponseEvent, Message as RequestResponseMessage}, + PeerId, + gossipsub::{self, MessageAuthenticity}, + request_response::{self, ProtocolSupport, json}, noise, tcp, yamux, StreamProtocol, //NetworkBehaviour as _, @@ -156,7 +154,7 @@ impl Libp2pService { match event { SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message { message, - propagation_source: peer_id, + //propagation_source: peer_id, .. })) => { // Extract neighbourhood_id from topic From a9f0ad35c38e5e60641393f3344ce5db3757c2c3 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Mon, 14 Apr 2025 11:57:53 +0200 Subject: [PATCH 04/10] fmt --- rust-executor/src/lib.rs | 4 +- rust-executor/src/libp2p_service/mod.rs | 211 ++++++++++++------ .../src/perspectives/perspective_instance.rs | 21 +- rust-executor/src/types.rs | 11 +- 4 files changed, 166 insertions(+), 81 deletions(-) diff --git a/rust-executor/src/lib.rs b/rust-executor/src/lib.rs index 5d34594cf..d52dc9ba0 100644 --- a/rust-executor/src/lib.rs +++ b/rust-executor/src/lib.rs @@ -18,13 +18,13 @@ mod dapp_server; mod db; pub mod init; pub mod languages; +pub mod libp2p_service; mod neighbourhoods; pub mod perspectives; mod pubsub; #[cfg(test)] mod test_utils; pub mod types; -pub mod libp2p_service; use std::{env, thread::JoinHandle}; @@ -40,8 +40,8 @@ use crate::{ pub use config::Ad4mConfig; pub use holochain_service::run_local_hc_services; use libc::{sigaction, sigemptyset, sighandler_t, SA_ONSTACK, SIGURG}; -use std::ptr; use libp2p_service::Libp2pService; +use std::ptr; extern "C" fn handle_sigurg(_: libc::c_int) { //println!("Received SIGURG signal, but ignoring it."); diff --git a/rust-executor/src/libp2p_service/mod.rs b/rust-executor/src/libp2p_service/mod.rs index ec44ea2ab..52c8097ed 100644 --- a/rust-executor/src/libp2p_service/mod.rs +++ b/rust-executor/src/libp2p_service/mod.rs @@ -1,24 +1,24 @@ -use std::{ - hash::{Hash, Hasher}, -}; +use std::hash::{Hash, Hasher}; use libp2p::{ futures::StreamExt, - swarm::{Swarm, SwarmEvent, NetworkBehaviour}, - PeerId, gossipsub::{self, MessageAuthenticity}, - request_response::{self, ProtocolSupport, json}, - noise, tcp, yamux, + noise, + request_response::{self, json, ProtocolSupport}, + swarm::{NetworkBehaviour, Swarm, SwarmEvent}, + tcp, + yamux, + PeerId, StreamProtocol, //NetworkBehaviour as _, }; //use libp2p::swarm::derive::NetworkBehaviour; +use crate::types::PerspectiveExpression; +use anyhow::{anyhow, Result}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; -use anyhow::{Result, anyhow}; -use crate::types::PerspectiveExpression; lazy_static! { static ref LIBP2P_SERVICE: Arc>> = Arc::new(Mutex::new(None)); @@ -71,7 +71,8 @@ pub struct Libp2pService { swarm: Arc>>, online_agents: Arc>>>, // neighbourhood_id -> agent_did -> OnlineAgent bootstrap_nodes: Vec, - signal_callbacks: Arc>>>>, + signal_callbacks: + Arc>>>>, peer_to_did: Arc>>, // Map peer IDs to agent DIDs } @@ -83,7 +84,7 @@ impl Libp2pService { .with_tcp( tcp::Config::default(), noise::Config::new, - yamux::Config::default + yamux::Config::default, )? .with_quic() .with_behaviour(|key| { @@ -130,7 +131,7 @@ impl Libp2pService { pub async fn start(&self) -> Result<()> { let mut swarm = self.swarm.lock().await; - + // Connect to bootstrap nodes for node in &self.bootstrap_nodes { if let Ok(addr) = node.parse::() { @@ -149,81 +150,108 @@ impl Libp2pService { tokio::spawn(async move { let mut swarm = swarm_clone.lock().await; - + while let Some(event) = swarm.next().await { match event { - SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message { - message, - //propagation_source: peer_id, - .. - })) => { + SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub( + gossipsub::Event::Message { + message, + //propagation_source: peer_id, + .. + }, + )) => { // Extract neighbourhood_id from topic let topic_str = message.topic.to_string(); - if let Some(neighbourhood_id) = topic_str.strip_prefix(TELEPRESENCE_TOPIC_PREFIX) { - if let Ok(msg) = serde_json::from_slice::(&message.data) { + if let Some(neighbourhood_id) = + topic_str.strip_prefix(TELEPRESENCE_TOPIC_PREFIX) + { + if let Ok(msg) = + serde_json::from_slice::(&message.data) + { match msg { TelepresenceMessage::Signal(payload) => { let callbacks = signal_callbacks_clone.lock().await; - if let Some(neighbourhood_callbacks) = callbacks.get(neighbourhood_id) { + if let Some(neighbourhood_callbacks) = + callbacks.get(neighbourhood_id) + { for callback in neighbourhood_callbacks { callback(payload.clone()); } } - }, + } _ => {} // Other message types handled by request-response } } } - }, - SwarmEvent::Behaviour(MyBehaviourEvent::RequestResponse(request_response::Event::Message { - message: request_response::Message::Request { request, channel, .. }, - peer: peer_id, - .. - })) => { + } + SwarmEvent::Behaviour(MyBehaviourEvent::RequestResponse( + request_response::Event::Message { + message: + request_response::Message::Request { + request, channel, .. + }, + peer: peer_id, + .. + }, + )) => { match request { TelepresenceMessage::OnlineStatusRequest => { let online_agents = online_agents_clone.lock().await; let peer_to_did = peer_to_did_clone.lock().await; - + // TODO: Get neighbourhood_id from somewhere let neighbourhood_id = "placeholder"; - + if let Some(agents) = online_agents.get(neighbourhood_id) { if let Some(agent_did) = peer_to_did.get(&peer_id) { if let Some(agent) = agents.get(agent_did) { - swarm.behaviour_mut().request_response.send_response(channel, TelepresenceMessage::OnlineStatusResponse(agent.status.clone())) + swarm + .behaviour_mut() + .request_response + .send_response( + channel, + TelepresenceMessage::OnlineStatusResponse( + agent.status.clone(), + ), + ) .expect("Failed to send response"); } } } - }, + } _ => {} } - }, - SwarmEvent::Behaviour(MyBehaviourEvent::RequestResponse(request_response::Event::Message { - message: request_response::Message::Response { response, .. }, - peer: peer_id, - .. - })) => { + } + SwarmEvent::Behaviour(MyBehaviourEvent::RequestResponse( + request_response::Event::Message { + message: request_response::Message::Response { response, .. }, + peer: peer_id, + .. + }, + )) => { if let TelepresenceMessage::OnlineStatusResponse(status) = response { // Update the agent's status in our local map let mut online_agents = online_agents_clone.lock().await; let peer_to_did = peer_to_did_clone.lock().await; - + // TODO: Get neighbourhood_id from somewhere let neighbourhood_id = "placeholder"; - + if let Some(agent_did) = peer_to_did.get(&peer_id) { - let neighbourhood_agents = online_agents.entry(neighbourhood_id.to_string()) + let neighbourhood_agents = online_agents + .entry(neighbourhood_id.to_string()) .or_insert_with(HashMap::new); - - neighbourhood_agents.insert(agent_did.clone(), OnlineAgent { - did: agent_did.clone(), - status, - }); + + neighbourhood_agents.insert( + agent_did.clone(), + OnlineAgent { + did: agent_did.clone(), + status, + }, + ); } } - }, + } _ => {} } } @@ -232,21 +260,32 @@ impl Libp2pService { Ok(()) } - pub async fn set_online_status(&self, neighbourhood_id: &str, status: PerspectiveExpression) -> Result<()> { + pub async fn set_online_status( + &self, + neighbourhood_id: &str, + status: PerspectiveExpression, + ) -> Result<()> { let mut online_agents = self.online_agents.lock().await; // TODO: Get agent DID from somewhere let agent_did = "placeholder_did".to_string(); - - let neighbourhood_agents = online_agents.entry(neighbourhood_id.to_string()) + + let neighbourhood_agents = online_agents + .entry(neighbourhood_id.to_string()) .or_insert_with(HashMap::new); - - neighbourhood_agents.insert(agent_did.clone(), OnlineAgent { - did: agent_did, - status, - }); + + neighbourhood_agents.insert( + agent_did.clone(), + OnlineAgent { + did: agent_did, + status, + }, + ); // Subscribe to the neighbourhood's telepresence topic if not already subscribed - let topic = gossipsub::IdentTopic::new(format!("{}{}", TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id)); + let topic = gossipsub::IdentTopic::new(format!( + "{}{}", + TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id + )); let mut swarm = self.swarm.lock().await; swarm.behaviour_mut().gossipsub.subscribe(&topic)?; @@ -256,15 +295,19 @@ impl Libp2pService { pub async fn get_online_agents(&self, neighbourhood_id: &str) -> Result> { // First get local agents let mut online_agents = self.online_agents.lock().await; - let neighbourhood_agents = online_agents.entry(neighbourhood_id.to_string()) + let neighbourhood_agents = online_agents + .entry(neighbourhood_id.to_string()) .or_insert_with(HashMap::new); let mut agents = neighbourhood_agents.values().cloned().collect::>(); // Then request status from all others in the neighbourhood - let topic = gossipsub::IdentTopic::new(format!("{}{}", TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id)); + let topic = gossipsub::IdentTopic::new(format!( + "{}{}", + TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id + )); let request = TelepresenceMessage::OnlineStatusRequest; let data = serde_json::to_vec(&request)?; - + let mut swarm = self.swarm.lock().await; swarm.behaviour_mut().gossipsub.publish(topic, data)?; @@ -274,34 +317,54 @@ impl Libp2pService { Ok(agents) } - pub async fn send_signal(&self, neighbourhood_id: &str, remote_agent_did: &str, payload: PerspectiveExpression) -> Result<()> { - let topic = gossipsub::IdentTopic::new(format!("{}{}", TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id)); + pub async fn send_signal( + &self, + neighbourhood_id: &str, + remote_agent_did: &str, + payload: PerspectiveExpression, + ) -> Result<()> { + let topic = gossipsub::IdentTopic::new(format!( + "{}{}", + TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id + )); let message = TelepresenceMessage::Signal(payload); let data = serde_json::to_vec(&message)?; - + let mut swarm = self.swarm.lock().await; swarm.behaviour_mut().gossipsub.publish(topic, data)?; Ok(()) } - pub async fn send_broadcast(&self, neighbourhood_id: &str, payload: PerspectiveExpression) -> Result<()> { - let topic = gossipsub::IdentTopic::new(format!("{}{}", TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id)); + pub async fn send_broadcast( + &self, + neighbourhood_id: &str, + payload: PerspectiveExpression, + ) -> Result<()> { + let topic = gossipsub::IdentTopic::new(format!( + "{}{}", + TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id + )); let message = TelepresenceMessage::Signal(payload); let data = serde_json::to_vec(&message)?; - + let mut swarm = self.swarm.lock().await; swarm.behaviour_mut().gossipsub.publish(topic, data)?; Ok(()) } - pub async fn register_signal_callback(&self, neighbourhood_id: &str, callback: F) -> Result<()> + pub async fn register_signal_callback( + &self, + neighbourhood_id: &str, + callback: F, + ) -> Result<()> where F: Fn(PerspectiveExpression) + Send + Sync + 'static, { let mut callbacks = self.signal_callbacks.lock().await; - let neighbourhood_callbacks = callbacks.entry(neighbourhood_id.to_string()) + let neighbourhood_callbacks = callbacks + .entry(neighbourhood_id.to_string()) .or_insert_with(Vec::new); neighbourhood_callbacks.push(Box::new(callback)); Ok(()) @@ -310,10 +373,10 @@ impl Libp2pService { pub async fn init_global_instance(bootstrap_nodes: Vec) -> Result<()> { let service = Libp2pService::new(bootstrap_nodes).await?; service.start().await?; - + let mut global_service = LIBP2P_SERVICE.lock().await; *global_service = Some(service); - + Ok(()) } @@ -330,7 +393,9 @@ impl Libp2pService { F: FnOnce(&Libp2pService) -> R, { let global_instance_arc = LIBP2P_SERVICE.lock().await; - let service_ref = global_instance_arc.as_ref().ok_or(anyhow!("Libp2p service not initialized"))?; + let service_ref = global_instance_arc + .as_ref() + .ok_or(anyhow!("Libp2p service not initialized"))?; Ok(func(service_ref)) } @@ -339,7 +404,9 @@ impl Libp2pService { F: FnOnce(&mut Libp2pService) -> R, { let mut global_instance_arc = LIBP2P_SERVICE.lock().await; - let service_mut = global_instance_arc.as_mut().ok_or(anyhow!("Libp2p service not initialized"))?; + let service_mut = global_instance_arc + .as_mut() + .ok_or(anyhow!("Libp2p service not initialized"))?; Ok(func(service_mut)) } -} \ No newline at end of file +} diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index 61c12cea8..0c522183a 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -12,6 +12,7 @@ use crate::graphql::graphql_types::{ }; use crate::languages::language::Language; use crate::languages::LanguageController; +use crate::libp2p_service::Libp2pService; use crate::perspectives::utils::{prolog_get_first_binding, prolog_value_to_json_string}; use crate::prolog_service::get_prolog_service; use crate::prolog_service::types::{QueryMatch, QueryResolution}; @@ -38,7 +39,6 @@ use tokio::time::{sleep, Instant}; use tokio::{join, time}; use uuid; use uuid::Uuid; -use crate::libp2p_service::Libp2pService; static MAX_COMMIT_BYTES: usize = 3_000_000; //3MiB static MAX_PENDING_DIFFS_COUNT: usize = 150; @@ -1493,7 +1493,10 @@ impl PerspectiveInstance { pub async fn set_online_status(&self, status: PerspectiveExpression) -> Result<(), AnyError> { let handle = self.persisted.lock().await.clone(); if let Some(neighbourhood) = &handle.neighbourhood { - Libp2pService::global_instance().await?.set_online_status(&neighbourhood.data.link_language, status.into()).await?; + Libp2pService::global_instance() + .await? + .set_online_status(&neighbourhood.data.link_language, status.into()) + .await?; Ok(()) } else { Err(anyhow!("Perspective is not part of a neighbourhood")) @@ -1507,7 +1510,14 @@ impl PerspectiveInstance { ) -> Result<(), AnyError> { let handle = self.persisted.lock().await.clone(); if let Some(neighbourhood) = &handle.neighbourhood { - Libp2pService::global_instance().await?.send_signal(&neighbourhood.data.link_language, &remote_agent_did, payload.into()).await?; + Libp2pService::global_instance() + .await? + .send_signal( + &neighbourhood.data.link_language, + &remote_agent_did, + payload.into(), + ) + .await?; Ok(()) } else { Err(anyhow!("Perspective is not part of a neighbourhood")) @@ -1532,7 +1542,10 @@ impl PerspectiveInstance { let handle = self.persisted.lock().await.clone(); if let Some(neighbourhood) = &handle.neighbourhood { - Libp2pService::global_instance().await?.send_broadcast(&neighbourhood.data.link_language, payload.into()).await?; + Libp2pService::global_instance() + .await? + .send_broadcast(&neighbourhood.data.link_language, payload.into()) + .await?; Ok(()) } else { Err(anyhow!("Perspective is not part of a neighbourhood")) diff --git a/rust-executor/src/types.rs b/rust-executor/src/types.rs index 498bd4fbd..89b4c2df8 100644 --- a/rust-executor/src/types.rs +++ b/rust-executor/src/types.rs @@ -244,7 +244,14 @@ impl From for Perspective { impl From for Perspective { fn from(input: crate::graphql::graphql_types::Perspective) -> Self { - Perspective { links: input.links.into_iter().map(LinkExpression::try_from).filter_map(Result::ok).collect() } + Perspective { + links: input + .links + .into_iter() + .map(LinkExpression::try_from) + .filter_map(Result::ok) + .collect(), + } } } @@ -281,8 +288,6 @@ impl From for PerspectiveE } } - - #[derive(GraphQLObject, Serialize, Deserialize, Debug, Clone, PartialEq)] pub struct Neighbourhood { pub link_language: String, From cd5b815448dcb8b20c79b8812151e370130b0261 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Mon, 14 Apr 2025 12:19:54 +0200 Subject: [PATCH 05/10] Explicit topic subscription and mapping to NH ids --- rust-executor/src/libp2p_service/mod.rs | 57 ++++++++++--------- .../src/perspectives/perspective_instance.rs | 25 ++++++-- 2 files changed, 51 insertions(+), 31 deletions(-) diff --git a/rust-executor/src/libp2p_service/mod.rs b/rust-executor/src/libp2p_service/mod.rs index 52c8097ed..e3318361c 100644 --- a/rust-executor/src/libp2p_service/mod.rs +++ b/rust-executor/src/libp2p_service/mod.rs @@ -74,6 +74,7 @@ pub struct Libp2pService { signal_callbacks: Arc>>>>, peer_to_did: Arc>>, // Map peer IDs to agent DIDs + topic_to_neighbourhood: Arc>>, // Map topic hashes to neighbourhood IDs } impl Libp2pService { @@ -126,6 +127,7 @@ impl Libp2pService { bootstrap_nodes, signal_callbacks: Arc::new(Mutex::new(HashMap::new())), peer_to_did: Arc::new(Mutex::new(HashMap::new())), + topic_to_neighbourhood: Arc::new(Mutex::new(HashMap::new())), }) } @@ -147,6 +149,7 @@ impl Libp2pService { let online_agents_clone = self.online_agents.clone(); let signal_callbacks_clone = self.signal_callbacks.clone(); let peer_to_did_clone = self.peer_to_did.clone(); + let topic_to_neighbourhood_clone = self.topic_to_neighbourhood.clone(); tokio::spawn(async move { let mut swarm = swarm_clone.lock().await; @@ -160,20 +163,14 @@ impl Libp2pService { .. }, )) => { - // Extract neighbourhood_id from topic - let topic_str = message.topic.to_string(); - if let Some(neighbourhood_id) = - topic_str.strip_prefix(TELEPRESENCE_TOPIC_PREFIX) - { - if let Ok(msg) = - serde_json::from_slice::(&message.data) - { + let topic_hash = message.topic; + let mapping = topic_to_neighbourhood_clone.lock().await; + if let Some(neighbourhood_id) = mapping.get(&topic_hash) { + if let Ok(msg) = serde_json::from_slice::(&message.data) { match msg { TelepresenceMessage::Signal(payload) => { let callbacks = signal_callbacks_clone.lock().await; - if let Some(neighbourhood_callbacks) = - callbacks.get(neighbourhood_id) - { + if let Some(neighbourhood_callbacks) = callbacks.get(neighbourhood_id) { for callback in neighbourhood_callbacks { callback(payload.clone()); } @@ -260,17 +257,31 @@ impl Libp2pService { Ok(()) } + pub async fn subscribe_to_neighbourhood(&self, neighbourhood_id: String) -> Result<()> { + let topic = gossipsub::IdentTopic::new(format!("{}{}", TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id)); + let topic_hash = topic.hash(); + + let mut swarm = self.swarm.lock().await; + swarm.behaviour_mut().gossipsub.subscribe(&topic)?; + + // Store the mapping + let mut mapping = self.topic_to_neighbourhood.lock().await; + mapping.insert(topic_hash, neighbourhood_id); + + Ok(()) + } + pub async fn set_online_status( &self, - neighbourhood_id: &str, + neighbourhood_id: String, status: PerspectiveExpression, - ) -> Result<()> { + ) -> Result<()> { let mut online_agents = self.online_agents.lock().await; // TODO: Get agent DID from somewhere let agent_did = "placeholder_did".to_string(); let neighbourhood_agents = online_agents - .entry(neighbourhood_id.to_string()) + .entry(neighbourhood_id) .or_insert_with(HashMap::new); neighbourhood_agents.insert( @@ -281,18 +292,10 @@ impl Libp2pService { }, ); - // Subscribe to the neighbourhood's telepresence topic if not already subscribed - let topic = gossipsub::IdentTopic::new(format!( - "{}{}", - TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id - )); - let mut swarm = self.swarm.lock().await; - swarm.behaviour_mut().gossipsub.subscribe(&topic)?; - Ok(()) } - pub async fn get_online_agents(&self, neighbourhood_id: &str) -> Result> { + pub async fn get_online_agents(&self, neighbourhood_id: String) -> Result> { // First get local agents let mut online_agents = self.online_agents.lock().await; let neighbourhood_agents = online_agents @@ -319,8 +322,8 @@ impl Libp2pService { pub async fn send_signal( &self, - neighbourhood_id: &str, - remote_agent_did: &str, + neighbourhood_id: String, + remote_agent_did: String, payload: PerspectiveExpression, ) -> Result<()> { let topic = gossipsub::IdentTopic::new(format!( @@ -338,7 +341,7 @@ impl Libp2pService { pub async fn send_broadcast( &self, - neighbourhood_id: &str, + neighbourhood_id: String, payload: PerspectiveExpression, ) -> Result<()> { let topic = gossipsub::IdentTopic::new(format!( @@ -356,7 +359,7 @@ impl Libp2pService { pub async fn register_signal_callback( &self, - neighbourhood_id: &str, + neighbourhood_id: String, callback: F, ) -> Result<()> where diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index 0c522183a..140f74e08 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -207,6 +207,7 @@ impl PerspectiveInstance { async fn ensure_link_language(&self) { let mut interval = time::interval(Duration::from_secs(5)); + let mut libp2p_subscribed = false; while !*self.is_teardown.lock().await { if self.link_language.lock().await.is_none() && self.persisted.lock().await.neighbourhood.is_some() @@ -255,6 +256,22 @@ impl PerspectiveInstance { .await; } } + } else { + if let Some(neighbourhood) = self.persisted.lock().await.neighbourhood.clone() { + if !libp2p_subscribed { + match Libp2pService::global_instance() + .await + .expect("Failed to get Libp2pService global instance") + .subscribe_to_neighbourhood(neighbourhood.data.link_language.clone()) + .await + { + Ok(_) => libp2p_subscribed = true, + Err(e) => { + log::error!("Error subscribing to neighbourhood: {:?}", e); + } + } + } + } } interval.tick().await; } @@ -1495,7 +1512,7 @@ impl PerspectiveInstance { if let Some(neighbourhood) = &handle.neighbourhood { Libp2pService::global_instance() .await? - .set_online_status(&neighbourhood.data.link_language, status.into()) + .set_online_status(neighbourhood.data.link_language.clone(), status.into()) .await?; Ok(()) } else { @@ -1513,8 +1530,8 @@ impl PerspectiveInstance { Libp2pService::global_instance() .await? .send_signal( - &neighbourhood.data.link_language, - &remote_agent_did, + neighbourhood.data.link_language.clone(), + remote_agent_did, payload.into(), ) .await?; @@ -1544,7 +1561,7 @@ impl PerspectiveInstance { if let Some(neighbourhood) = &handle.neighbourhood { Libp2pService::global_instance() .await? - .send_broadcast(&neighbourhood.data.link_language, payload.into()) + .send_broadcast(neighbourhood.data.link_language.clone(), payload.into()) .await?; Ok(()) } else { From 0299a0e33cfe618eca6939c24831a5565414d4e9 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Mon, 14 Apr 2025 12:39:09 +0200 Subject: [PATCH 06/10] Wire-up signal receive callback & add neeeded type conversions --- rust-executor/src/graphql/graphql_types.rs | 30 +++++++++++++++++-- .../src/perspectives/perspective_instance.rs | 26 +++++++++++----- rust-executor/src/types.rs | 16 ++++++++++ 3 files changed, 63 insertions(+), 9 deletions(-) diff --git a/rust-executor/src/graphql/graphql_types.rs b/rust-executor/src/graphql/graphql_types.rs index 0c94c09ce..db3c4a1b6 100644 --- a/rust-executor/src/graphql/graphql_types.rs +++ b/rust-executor/src/graphql/graphql_types.rs @@ -2,8 +2,7 @@ use crate::agent::capabilities::{AuthInfo, Capability}; use crate::agent::signatures::verify; use crate::js_core::JsCoreHandle; use crate::types::{ - AIPromptExamples, AITask, DecoratedExpressionProof, DecoratedLinkExpression, Expression, - ExpressionProof, Link, ModelType, Notification, TriggeredNotification, + AIPromptExamples, AITask, DecoratedExpressionProof, DecoratedLinkExpression, Expression, ExpressionProof, Link, LinkExpression, ModelType, Notification, TriggeredNotification }; use coasys_juniper::{ FieldError, FieldResult, GraphQLEnum, GraphQLInputObject, GraphQLObject, GraphQLScalar, @@ -375,6 +374,17 @@ impl Perspective { } } +impl From for Perspective { + fn from(perspective: crate::types::Perspective) -> Self { + let links = perspective + .links + .into_iter() + .map(|link: LinkExpression| DecoratedLinkExpression::from(link)) + .collect(); + Perspective { links } + } +} + impl From for Perspective { fn from(input: PerspectiveInput) -> Self { let links = input @@ -435,6 +445,22 @@ impl From> for PerspectiveExpression { } } +impl From for PerspectiveExpression { + fn from(expr: crate::types::PerspectiveExpression) -> Self { + PerspectiveExpression { + author: expr.author, + data: expr.data.into(), + proof: DecoratedExpressionProof { + key: expr.proof.key, + signature: expr.proof.signature, + valid: None, + invalid: None, + }, + timestamp: expr.timestamp, + } + } +} + impl PerspectiveExpression { pub fn verify_signatures(&mut self) { self.data.verify_link_signatures(); diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index 140f74e08..499cb489b 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -259,16 +259,28 @@ impl PerspectiveInstance { } else { if let Some(neighbourhood) = self.persisted.lock().await.neighbourhood.clone() { if !libp2p_subscribed { - match Libp2pService::global_instance() + let subscribe_result = Libp2pService::global_instance() .await .expect("Failed to get Libp2pService global instance") .subscribe_to_neighbourhood(neighbourhood.data.link_language.clone()) - .await - { - Ok(_) => libp2p_subscribed = true, - Err(e) => { - log::error!("Error subscribing to neighbourhood: {:?}", e); - } + .await; + + if let Err(e) = subscribe_result { + log::error!("Error subscribing to neighbourhood: {:?}", e); + } else { + libp2p_subscribed = true; + let self_clone = self.clone(); + Libp2pService::global_instance() + .await + .expect("Failed to get Libp2pService global instance") + .register_signal_callback(neighbourhood.data.link_language.clone(), move |signal: crate::types::PerspectiveExpression| { + let self_clone = self_clone.clone(); + tokio::spawn(async move { + log::info!("Received libp2p signal from neighbourhood"); + self_clone.telepresence_signal_from_link_language(signal.into()).await; + }); + }) + .await; } } } diff --git a/rust-executor/src/types.rs b/rust-executor/src/types.rs index 89b4c2df8..bf42b5243 100644 --- a/rust-executor/src/types.rs +++ b/rust-executor/src/types.rs @@ -210,6 +210,22 @@ impl From<(LinkExpression, LinkStatus)> for DecoratedLinkExpression { } } +impl From for DecoratedLinkExpression { + fn from(expr: LinkExpression) -> Self { + let status = expr.status.clone(); + let mut expr: Expression = expr.into(); + expr.data = expr.data.normalize(); + let verified_expr: VerifiedExpression = expr.into(); + DecoratedLinkExpression { + author: verified_expr.author, + timestamp: verified_expr.timestamp, + data: verified_expr.data, + proof: verified_expr.proof, + status, + } + } +} + impl From for LinkExpression { fn from(decorated: DecoratedLinkExpression) -> Self { LinkExpression { From aaa71db2c7f7a74bb3275be66a369025ef07588a Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Mon, 14 Apr 2025 12:49:42 +0200 Subject: [PATCH 07/10] =?UTF-8?q?register=5Fsignal=5Fcallback=20can?= =?UTF-8?q?=E2=80=99t=20return=20an=20error?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rust-executor/src/libp2p_service/mod.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/rust-executor/src/libp2p_service/mod.rs b/rust-executor/src/libp2p_service/mod.rs index e3318361c..b30e1ad38 100644 --- a/rust-executor/src/libp2p_service/mod.rs +++ b/rust-executor/src/libp2p_service/mod.rs @@ -361,7 +361,7 @@ impl Libp2pService { &self, neighbourhood_id: String, callback: F, - ) -> Result<()> + ) where F: Fn(PerspectiveExpression) + Send + Sync + 'static, { @@ -370,7 +370,6 @@ impl Libp2pService { .entry(neighbourhood_id.to_string()) .or_insert_with(Vec::new); neighbourhood_callbacks.push(Box::new(callback)); - Ok(()) } pub async fn init_global_instance(bootstrap_nodes: Vec) -> Result<()> { From 62063c69f6de592858994bd03d7ea5986999a888 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Mon, 14 Apr 2025 13:01:31 +0200 Subject: [PATCH 08/10] mDNS discovery --- rust-executor/Cargo.toml | 2 +- rust-executor/src/libp2p_service/mod.rs | 28 +++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/rust-executor/Cargo.toml b/rust-executor/Cargo.toml index de4a7b9ce..89106a4e4 100644 --- a/rust-executor/Cargo.toml +++ b/rust-executor/Cargo.toml @@ -117,7 +117,7 @@ libc = "0.2" chat-gpt-lib-rs = { version = "0.5.1", git = "https://github.com/coasys/chat-gpt-lib-rs" } anyhow = "1.0.95" -libp2p = { version = "0.55", features = ["tcp", "dns", "noise", "gossipsub", "kad", "request-response", "yamux", "tokio", "json", "macros", "identify", "relay", "quic", "ping"] } +libp2p = { version = "0.55", features = ["tcp", "dns", "noise", "gossipsub", "kad", "request-response", "yamux", "tokio", "json", "macros", "identify", "relay", "quic", "ping", "mdns"] } [dev-dependencies] maplit = "1.0.2" diff --git a/rust-executor/src/libp2p_service/mod.rs b/rust-executor/src/libp2p_service/mod.rs index b30e1ad38..77fbde111 100644 --- a/rust-executor/src/libp2p_service/mod.rs +++ b/rust-executor/src/libp2p_service/mod.rs @@ -8,6 +8,7 @@ use libp2p::{ swarm::{NetworkBehaviour, Swarm, SwarmEvent}, tcp, yamux, + mdns, PeerId, StreamProtocol, //NetworkBehaviour as _, @@ -45,6 +46,7 @@ pub struct OnlineAgent { pub enum MyBehaviourEvent { Gossipsub(gossipsub::Event), RequestResponse(request_response::Event), + Mdns(mdns::Event), } impl From for MyBehaviourEvent { @@ -59,11 +61,18 @@ impl From> for } } +impl From for MyBehaviourEvent { + fn from(event: mdns::Event) -> Self { + MyBehaviourEvent::Mdns(event) + } +} + #[derive(NetworkBehaviour)] #[behaviour(out_event = "MyBehaviourEvent")] struct MyBehaviour { gossipsub: gossipsub::Behaviour, request_response: json::Behaviour, + mdns: mdns::tokio::Behaviour, } #[derive(Clone)] @@ -114,9 +123,13 @@ impl Libp2pService { request_response::Config::default(), ); + // Set up mDNS + let mdns = mdns::tokio::Behaviour::new(mdns::Config::default(), PeerId::from_public_key(&key.public()))?; + Ok(MyBehaviour { gossipsub, request_response, + mdns, }) })? .build(); @@ -249,6 +262,21 @@ impl Libp2pService { } } } + SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => { + for (peer_id, multiaddr) in list { + log::info!("mDNS discovered peer {} at {}", peer_id, multiaddr); + if let Err(e) = swarm.dial(multiaddr) { + log::error!("Failed to dial mDNS peer: {}", e); + } else { + log::info!("Dialed mDNS peer: {}", peer_id); + } + } + } + SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Expired(list))) => { + for (peer_id, multiaddr) in list { + log::info!("mDNS peer {} at {} expired", peer_id, multiaddr); + } + } _ => {} } } From ba3fcff08da2298d4fb732e08e048b17b1f64501 Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Mon, 14 Apr 2025 16:58:46 +0200 Subject: [PATCH 09/10] check-point: signals working, online agents not yet --- rust-executor/src/libp2p_service/mod.rs | 336 ++++++++++-------- .../src/perspectives/perspective_instance.rs | 73 ++-- 2 files changed, 232 insertions(+), 177 deletions(-) diff --git a/rust-executor/src/libp2p_service/mod.rs b/rust-executor/src/libp2p_service/mod.rs index 77fbde111..ffd64f1df 100644 --- a/rust-executor/src/libp2p_service/mod.rs +++ b/rust-executor/src/libp2p_service/mod.rs @@ -14,7 +14,7 @@ use libp2p::{ //NetworkBehaviour as _, }; //use libp2p::swarm::derive::NetworkBehaviour; -use crate::types::PerspectiveExpression; +use crate::{agent, types::PerspectiveExpression, graphql::graphql_types::OnlineAgent}; use anyhow::{anyhow, Result}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -36,12 +36,6 @@ pub enum TelepresenceMessage { Signal(PerspectiveExpression), } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct OnlineAgent { - pub did: String, - pub status: PerspectiveExpression, -} - #[derive(Debug)] pub enum MyBehaviourEvent { Gossipsub(gossipsub::Event), @@ -84,6 +78,8 @@ pub struct Libp2pService { Arc>>>>, peer_to_did: Arc>>, // Map peer IDs to agent DIDs topic_to_neighbourhood: Arc>>, // Map topic hashes to neighbourhood IDs + my_online_status: Arc>>, // neighbourhood_id -> status + known_peers: Arc>>, // neighbourhood_id -> list of peer IDs } impl Libp2pService { @@ -101,14 +97,20 @@ impl Libp2pService { // Set up gossipsub let message_id_fn = |message: &gossipsub::Message| { let mut s = std::collections::hash_map::DefaultHasher::new(); + message.source.hash(&mut s); message.data.hash(&mut s); gossipsub::MessageId::from(s.finish().to_string()) }; let gossipsub_config = gossipsub::ConfigBuilder::default() - .heartbeat_interval(std::time::Duration::from_secs(1)) + .heartbeat_interval(std::time::Duration::from_millis(200)) .validation_mode(gossipsub::ValidationMode::Strict) .message_id_fn(message_id_fn) + // .mesh_outbound_min(1) + // .mesh_n_low(2) // Lower bound for mesh size + // .mesh_n(2) // Minimum number of peers in mesh + // .mesh_n_high(3) // Upper bound for mesh size + // .gossip_lazy(1) // Number of peers to gossip to .build() .map_err(anyhow::Error::msg)?; @@ -141,6 +143,8 @@ impl Libp2pService { signal_callbacks: Arc::new(Mutex::new(HashMap::new())), peer_to_did: Arc::new(Mutex::new(HashMap::new())), topic_to_neighbourhood: Arc::new(Mutex::new(HashMap::new())), + my_online_status: Arc::new(Mutex::new(HashMap::new())), + known_peers: Arc::new(Mutex::new(Vec::new())), }) } @@ -159,125 +163,146 @@ impl Libp2pService { // Start processing events let swarm_clone = self.swarm.clone(); - let online_agents_clone = self.online_agents.clone(); let signal_callbacks_clone = self.signal_callbacks.clone(); - let peer_to_did_clone = self.peer_to_did.clone(); + //let peer_to_did_clone = self.peer_to_did.clone(); let topic_to_neighbourhood_clone = self.topic_to_neighbourhood.clone(); + let my_online_status_clone = self.my_online_status.clone(); + let online_agents_clone = self.online_agents.clone(); + let self_clone = self.clone(); tokio::spawn(async move { - let mut swarm = swarm_clone.lock().await; - - while let Some(event) = swarm.next().await { - match event { - SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub( - gossipsub::Event::Message { - message, - //propagation_source: peer_id, - .. - }, - )) => { - let topic_hash = message.topic; - let mapping = topic_to_neighbourhood_clone.lock().await; - if let Some(neighbourhood_id) = mapping.get(&topic_hash) { - if let Ok(msg) = serde_json::from_slice::(&message.data) { - match msg { - TelepresenceMessage::Signal(payload) => { - let callbacks = signal_callbacks_clone.lock().await; - if let Some(neighbourhood_callbacks) = callbacks.get(neighbourhood_id) { - for callback in neighbourhood_callbacks { - callback(payload.clone()); + loop { + let mut swarm = swarm_clone.lock().await; + let sleep = tokio::time::sleep(tokio::time::Duration::from_millis(500)); + tokio::pin!(sleep); + + tokio::select! { + Some(event) = swarm.next() => { + match event { + SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub( + gossipsub::Event::Message { + message, + .. + }, + )) => { + let topic_hash = message.topic; + let mapping = topic_to_neighbourhood_clone.lock().await; + if let Some(neighbourhood_id) = mapping.get(&topic_hash) { + if let Ok(msg) = serde_json::from_slice::(&message.data) { + match msg { + TelepresenceMessage::Signal(payload) => { + let callbacks = signal_callbacks_clone.lock().await; + if let Some(neighbourhood_callbacks) = callbacks.get(neighbourhood_id) { + for callback in neighbourhood_callbacks { + callback(payload.clone()); + } + } } + _ => {} // Other message types handled by request-response } } - _ => {} // Other message types handled by request-response } } - } - } - SwarmEvent::Behaviour(MyBehaviourEvent::RequestResponse( - request_response::Event::Message { - message: - request_response::Message::Request { - request, channel, .. + SwarmEvent::Behaviour(MyBehaviourEvent::RequestResponse( + request_response::Event::Message { + message: request_response::Message::Request { request, channel, .. }, + peer: peer_id, + .. }, - peer: peer_id, - .. - }, - )) => { - match request { - TelepresenceMessage::OnlineStatusRequest => { - let online_agents = online_agents_clone.lock().await; - let peer_to_did = peer_to_did_clone.lock().await; - - // TODO: Get neighbourhood_id from somewhere - let neighbourhood_id = "placeholder"; - - if let Some(agents) = online_agents.get(neighbourhood_id) { - if let Some(agent_did) = peer_to_did.get(&peer_id) { - if let Some(agent) = agents.get(agent_did) { - swarm - .behaviour_mut() - .request_response - .send_response( - channel, - TelepresenceMessage::OnlineStatusResponse( - agent.status.clone(), - ), - ) - .expect("Failed to send response"); + )) => { + match request { + TelepresenceMessage::OnlineStatusRequest => { + // Get the neighbourhood ID from the topic + let topic = swarm.behaviour().gossipsub.topics().next() + .expect("Should have at least one topic"); + let neighbourhood_id = topic_to_neighbourhood_clone.lock().await + .get(topic) + .cloned() + .expect("Topic should be mapped to a neighbourhood"); + + if let Some(status) = my_online_status_clone.lock().await.get(&neighbourhood_id) { + if let Err(e) = swarm.behaviour_mut().request_response.send_response( + channel, + TelepresenceMessage::OnlineStatusResponse(status.clone()), + ) { + log::error!("Failed to send online status response: {:?}", e); + } } } + _ => {} } } - _ => {} - } - } - SwarmEvent::Behaviour(MyBehaviourEvent::RequestResponse( - request_response::Event::Message { - message: request_response::Message::Response { response, .. }, - peer: peer_id, - .. - }, - )) => { - if let TelepresenceMessage::OnlineStatusResponse(status) = response { - // Update the agent's status in our local map - let mut online_agents = online_agents_clone.lock().await; - let peer_to_did = peer_to_did_clone.lock().await; - - // TODO: Get neighbourhood_id from somewhere - let neighbourhood_id = "placeholder"; - - if let Some(agent_did) = peer_to_did.get(&peer_id) { - let neighbourhood_agents = online_agents - .entry(neighbourhood_id.to_string()) - .or_insert_with(HashMap::new); - - neighbourhood_agents.insert( - agent_did.clone(), - OnlineAgent { - did: agent_did.clone(), - status, - }, - ); + SwarmEvent::Behaviour(MyBehaviourEvent::RequestResponse( + request_response::Event::Message { + message: request_response::Message::Response { response, .. }, + peer: peer_id, + .. + }, + )) => { + if let TelepresenceMessage::OnlineStatusResponse(status) = response { + log::info!("Received online status response from peer: {}", peer_id); + // Get the neighbourhood ID from the topic + let topic = swarm.behaviour().gossipsub.topics().next() + .expect("Should have at least one topic"); + log::info!("Topic: {}", topic); + let neighbourhood_id = topic_to_neighbourhood_clone.lock().await + .get(topic) + .cloned() + .expect("Topic should be mapped to a neighbourhood"); + log::info!("Neighbourhood ID: {}", neighbourhood_id); + let mut online_agents = online_agents_clone.lock().await; + log::info!("Online agents: {:?}", online_agents); + let neighbourhood_agents = online_agents.entry(neighbourhood_id).or_insert_with(HashMap::new); + log::info!("Neighbourhood agents: {:?}", neighbourhood_agents); + let agent_did = agent::did(); + + neighbourhood_agents.insert( + agent_did.clone(), + OnlineAgent { + did: agent_did, + status: status.into(), + }, + ); + log::info!("Inserted agent: {:?}", neighbourhood_agents); + } } - } - } - SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => { - for (peer_id, multiaddr) in list { - log::info!("mDNS discovered peer {} at {}", peer_id, multiaddr); - if let Err(e) = swarm.dial(multiaddr) { - log::error!("Failed to dial mDNS peer: {}", e); - } else { - log::info!("Dialed mDNS peer: {}", peer_id); + SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => { + for (peer_id, multiaddr) in list { + log::info!("mDNS discovered peer {} at {}", peer_id, multiaddr); + if let Err(e) = swarm.dial(multiaddr) { + log::error!("Failed to dial mDNS peer: {}", e); + } else { + log::info!("Dialed mDNS peer: {}", peer_id); + self_clone.known_peers.lock().await.push(peer_id); + } + } + } + SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Expired(list))) => { + for (peer_id, multiaddr) in list { + log::info!("mDNS peer {} at {} expired", peer_id, multiaddr); + } + } + SwarmEvent::NewListenAddr { address, .. } => { + log::info!("Listening on: {}", address); + } + SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::GossipsubNotSupported { peer_id })) => { + log::info!("Peer {} doesn't support gossip", peer_id); + } + SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Subscribed { peer_id, topic })) => { + log::info!("Peer {} grafted to topic {}", peer_id, topic); + } + SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Unsubscribed { peer_id, topic })) => { + log::info!("Peer {} pruned from topic {}", peer_id, topic); } + _ => {} } } - SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Expired(list))) => { - for (peer_id, multiaddr) in list { - log::info!("mDNS peer {} at {} expired", peer_id, multiaddr); - } + _ = &mut sleep => { + // Timeout reached, release the lock + drop(swarm); + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + continue; } - _ => {} } } }); @@ -286,16 +311,47 @@ impl Libp2pService { } pub async fn subscribe_to_neighbourhood(&self, neighbourhood_id: String) -> Result<()> { + log::info!("Subscribing to neighbourhood: {}", neighbourhood_id); let topic = gossipsub::IdentTopic::new(format!("{}{}", TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id)); let topic_hash = topic.hash(); let mut swarm = self.swarm.lock().await; swarm.behaviour_mut().gossipsub.subscribe(&topic)?; + for peer_id in self.known_peers.lock().await.iter() { + swarm.behaviour_mut().gossipsub.add_explicit_peer(peer_id); + } // Store the mapping let mut mapping = self.topic_to_neighbourhood.lock().await; mapping.insert(topic_hash, neighbourhood_id); + + // Log subscription info + let peers = swarm.behaviour().gossipsub.all_peers().collect::>(); + log::info!("Subscribed to topic {} with {} peers", topic, peers.len()); + + // Publish a dummy message to trigger mesh re-evaluation + let dummy_data = b"mesh_eval"; + let mut result = swarm.behaviour_mut().gossipsub.publish(topic.clone(), dummy_data); + let mut retries = 0; + while result.is_err() && retries < 5 { + let error = result.err().unwrap(); + log::error!("Failed to publish dummy message: {}", error); + log::error!("Retrying..."); + // Wait a bit for the mesh to form + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + result = swarm.behaviour_mut().gossipsub.publish(topic.clone(), dummy_data); + retries += 1; + } + // Release the lock to allow mesh formation + drop(swarm); + + if result.is_err() { + log::error!("Failed to publish dummy message after 5 retries"); + } else { + log::info!("Published dummy message to trigger mesh re-evaluation"); + } + Ok(()) } @@ -303,47 +359,45 @@ impl Libp2pService { &self, neighbourhood_id: String, status: PerspectiveExpression, - ) -> Result<()> { - let mut online_agents = self.online_agents.lock().await; - // TODO: Get agent DID from somewhere - let agent_did = "placeholder_did".to_string(); - - let neighbourhood_agents = online_agents - .entry(neighbourhood_id) - .or_insert_with(HashMap::new); - - neighbourhood_agents.insert( - agent_did.clone(), - OnlineAgent { - did: agent_did, - status, - }, - ); - - Ok(()) + ) { + // Store our status for this neighbourhood + self.my_online_status.lock().await.insert(neighbourhood_id.clone(), status.clone()); } pub async fn get_online_agents(&self, neighbourhood_id: String) -> Result> { - // First get local agents - let mut online_agents = self.online_agents.lock().await; - let neighbourhood_agents = online_agents - .entry(neighbourhood_id.to_string()) - .or_insert_with(HashMap::new); - let mut agents = neighbourhood_agents.values().cloned().collect::>(); + log::info!("Getting online agents for neighbourhood: {}", neighbourhood_id); + + // Clear existing agents for this neighbourhood + { + let mut online_agents = self.online_agents.lock().await; + online_agents.remove(&neighbourhood_id); + } + log::info!("Removed existing agents for neighbourhood: {}", neighbourhood_id); - // Then request status from all others in the neighbourhood - let topic = gossipsub::IdentTopic::new(format!( - "{}{}", - TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id - )); + // Request status from all others in the neighbourhood + let topic = gossipsub::IdentTopic::new(format!("{}{}", TELEPRESENCE_TOPIC_PREFIX, neighbourhood_id)); let request = TelepresenceMessage::OnlineStatusRequest; let data = serde_json::to_vec(&request)?; - let mut swarm = self.swarm.lock().await; - swarm.behaviour_mut().gossipsub.publish(topic, data)?; + log::info!("Publishing online status request for neighbourhood: {}", neighbourhood_id); - // TODO: Wait for responses and add them to agents list - // This would require implementing a timeout and response handling + // Only lock swarm for publishing + { + let mut swarm = self.swarm.lock().await; + let peers = swarm.behaviour().gossipsub.all_peers().collect::>(); + log::info!("Current peers for topic {}: {:?}", topic, peers); + swarm.behaviour_mut().gossipsub.publish(topic, data)?; + } + + log::info!("Published online status request for neighbourhood: {}", neighbourhood_id); + // Wait for 3 seconds to collect responses + tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; + + // Return whatever agents we have in the map + let online_agents = self.online_agents.lock().await; + let agents = online_agents.get(&neighbourhood_id) + .map(|agents| agents.values().cloned().collect()) + .unwrap_or_default(); Ok(agents) } diff --git a/rust-executor/src/perspectives/perspective_instance.rs b/rust-executor/src/perspectives/perspective_instance.rs index 499cb489b..718b8af06 100644 --- a/rust-executor/src/perspectives/perspective_instance.rs +++ b/rust-executor/src/perspectives/perspective_instance.rs @@ -209,9 +209,11 @@ impl PerspectiveInstance { let mut interval = time::interval(Duration::from_secs(5)); let mut libp2p_subscribed = false; while !*self.is_teardown.lock().await { + log::info!("Ensuring link language"); if self.link_language.lock().await.is_none() && self.persisted.lock().await.neighbourhood.is_some() { + log::info!("Link language not installed, installing..."); let nh = self .persisted .lock() @@ -236,7 +238,6 @@ impl PerspectiveInstance { PerspectiveState::LinkLanguageInstalledButNotSynced, ) .await; - break; } Ok(None) => { log::debug!( @@ -256,32 +257,35 @@ impl PerspectiveInstance { .await; } } - } else { + } + + if !libp2p_subscribed { + log::info!("libp2p not subscribed"); if let Some(neighbourhood) = self.persisted.lock().await.neighbourhood.clone() { - if !libp2p_subscribed { - let subscribe_result = Libp2pService::global_instance() + log::info!("Subscribing to neighbourhood: {}", neighbourhood.data.link_language.clone()); + + let subscribe_result = Libp2pService::global_instance() + .await + .expect("Failed to get Libp2pService global instance") + .subscribe_to_neighbourhood(neighbourhood.data.link_language.clone()) + .await; + + if let Err(e) = subscribe_result { + log::error!("Error subscribing to neighbourhood: {:?}", e); + } else { + libp2p_subscribed = true; + let self_clone = self.clone(); + Libp2pService::global_instance() .await .expect("Failed to get Libp2pService global instance") - .subscribe_to_neighbourhood(neighbourhood.data.link_language.clone()) + .register_signal_callback(neighbourhood.data.link_language.clone(), move |signal: crate::types::PerspectiveExpression| { + let self_clone = self_clone.clone(); + tokio::spawn(async move { + log::info!("Received libp2p signal from neighbourhood"); + self_clone.telepresence_signal_from_link_language(signal.into()).await; + }); + }) .await; - - if let Err(e) = subscribe_result { - log::error!("Error subscribing to neighbourhood: {:?}", e); - } else { - libp2p_subscribed = true; - let self_clone = self.clone(); - Libp2pService::global_instance() - .await - .expect("Failed to get Libp2pService global instance") - .register_signal_callback(neighbourhood.data.link_language.clone(), move |signal: crate::types::PerspectiveExpression| { - let self_clone = self_clone.clone(); - tokio::spawn(async move { - log::info!("Received libp2p signal from neighbourhood"); - self_clone.telepresence_signal_from_link_language(signal.into()).await; - }); - }) - .await; - } } } } @@ -1503,17 +1507,13 @@ impl PerspectiveInstance { } pub async fn online_agents(&self) -> Result, AnyError> { - let mut link_language_guard = self.link_language.lock().await; - if let Some(link_language) = link_language_guard.as_mut() { - Ok(link_language - .get_online_agents() - .await? - .into_iter() - .map(|mut a| { - a.status.verify_signatures(); - a - }) - .collect()) + let handle = self.persisted.lock().await.clone(); + if let Some(neighbourhood) = &handle.neighbourhood { + Ok(Libp2pService::global_instance() + .await + .expect("Failed to get Libp2pService") + .get_online_agents(neighbourhood.data.link_language.clone()) + .await?) } else { Err(self.no_link_language_error().await) } @@ -1523,9 +1523,10 @@ impl PerspectiveInstance { let handle = self.persisted.lock().await.clone(); if let Some(neighbourhood) = &handle.neighbourhood { Libp2pService::global_instance() - .await? + .await + .expect("Failed to get Libp2pService") .set_online_status(neighbourhood.data.link_language.clone(), status.into()) - .await?; + .await; Ok(()) } else { Err(anyhow!("Perspective is not part of a neighbourhood")) From c38e59dd51b19da69fefc4f3aee14c10de36da9d Mon Sep 17 00:00:00 2001 From: Nicolas Luck Date: Mon, 14 Apr 2025 17:04:08 +0200 Subject: [PATCH 10/10] Set hard-wired: port and persistence node IP --- rust-executor/src/lib.rs | 5 +++-- rust-executor/src/libp2p_service/mod.rs | 5 ++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/rust-executor/src/lib.rs b/rust-executor/src/lib.rs index d52dc9ba0..4af2af2b4 100644 --- a/rust-executor/src/lib.rs +++ b/rust-executor/src/lib.rs @@ -47,6 +47,8 @@ extern "C" fn handle_sigurg(_: libc::c_int) { //println!("Received SIGURG signal, but ignoring it."); } +const PERSISTENCE_NODE: &str = "207.148.16.17"; + /// Runs the GraphQL server and the deno core runtime pub async fn run(mut config: Ad4mConfig) -> JoinHandle<()> { unsafe { @@ -91,8 +93,7 @@ pub async fn run(mut config: Ad4mConfig) -> JoinHandle<()> { info!("Initializing libp2p service..."); let bootstrap_nodes = vec![ - // TODO: Add the actual bootstrap node URL - "/ip4/127.0.0.1/tcp/4001".to_string(), + format!("/ip4/{}/tcp/{}", PERSISTENCE_NODE, libp2p_service::PORT), ]; Libp2pService::init_global_instance(bootstrap_nodes) .await diff --git a/rust-executor/src/libp2p_service/mod.rs b/rust-executor/src/libp2p_service/mod.rs index ffd64f1df..ac1fdea08 100644 --- a/rust-executor/src/libp2p_service/mod.rs +++ b/rust-executor/src/libp2p_service/mod.rs @@ -21,6 +21,8 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::sync::Mutex; +const PORT: u16 = 15415; + lazy_static! { static ref LIBP2P_SERVICE: Arc>> = Arc::new(Mutex::new(None)); } @@ -159,7 +161,8 @@ impl Libp2pService { } // Start listening on all interfaces - (*swarm).listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?; + let listen_addr = format!("/ip4/0.0.0.0/tcp/{}", PORT); + (*swarm).listen_on(listen_addr.parse()?)?; // Start processing events let swarm_clone = self.swarm.clone();