diff --git a/Cargo.lock b/Cargo.lock index 92b39c38a..be5d87e83 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -187,7 +187,7 @@ dependencies = [ "nom 7.1.3", "num-traits", "rusticata-macros", - "thiserror", + "thiserror 2.0.18", "time", ] @@ -199,7 +199,7 @@ checksum = "3109e49b1e4909e9db6515a30c633684d68cdeaa252f215214cb4fa1a5bfee2c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "synstructure", ] @@ -211,7 +211,7 @@ checksum = "7b18050c2cd6fe86c3a76584ef5e0baf286d038cda203eb6223df2cc413565f7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -364,7 +364,7 @@ checksum = "c7c24de15d275a1ecfd47a380fb4d5ec9bfe0933f309ed5e705b775596a3574d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -381,7 +381,7 @@ checksum = "9035ad2d096bed7955a320ee7e2230574d28fd3c3a0f186cbea1ff3c7eed5dbb" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -482,7 +482,7 @@ checksum = "604fde5e028fea851ce1d8570bbdc034bec850d157f7569d10f347d06808c05c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -589,7 +589,7 @@ dependencies = [ "serde_derive", "serde_json", "serde_urlencoded", - "thiserror", + "thiserror 2.0.18", "time", "tokio", "tokio-stream", @@ -655,7 +655,7 @@ checksum = "89385e82b5d1821d2219e0b095efa2cc1f246cbf99080f3be46a1a85c0d392d9" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -821,7 +821,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1116,7 +1116,7 @@ dependencies = [ "proc-macro2", "quote", "strsim", - "syn", + "syn 2.0.117", ] [[package]] @@ -1127,7 +1127,7 @@ checksum = "ac3984ec7bd6cfa798e62b4a642426a5be0e68f9401cfc2a01e3fa9ea2fcdb8d" dependencies = [ "darling_core", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1209,7 +1209,7 @@ checksum = "8034092389675178f570469e6c3b0465d3d30b4505c294a6550db47f3c17ad18" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1230,7 +1230,7 @@ checksum = "1e567bd82dcff979e4b03460c307b3cdc9e96fde3d73bed1496d2bc75d9dd62a" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1282,7 +1282,7 @@ checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1515,7 +1515,7 @@ version = "0.1.0" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -1587,7 +1587,7 @@ checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2109,6 +2109,7 @@ dependencies = [ "indicatif", "inventory", "lapin", + "machineid-rs", "mockall", "nix", "notify", @@ -2137,7 +2138,7 @@ dependencies = [ "tempfile", "testcontainers", "testcontainers-modules", - "thiserror", + "thiserror 2.0.18", "tokio", "tokio-tungstenite", "toml", @@ -2205,8 +2206,8 @@ dependencies = [ "serde", "serde_json", "serial_test", - "sysinfo", - "thiserror", + "sysinfo 0.38.4", + "thiserror 2.0.18", "tokio", "tokio-tungstenite", "tracing", @@ -2498,6 +2499,26 @@ dependencies = [ "pkg-config", ] +[[package]] +name = "machineid-rs" +version = "1.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35ceb4d434d69d7199abc3036541ba6ef86767a4356e3077d5a3419f85b70b14" +dependencies = [ + "hex", + "hmac", + "md-5", + "serde", + "serde_json", + "sha-1", + "sha2", + "sysinfo 0.29.11", + "uuid", + "whoami", + "winreg", + "wmi", +] + [[package]] name = "matchers" version = "0.2.0" @@ -2513,6 +2534,16 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "md-5" +version = "0.10.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d89e7ee0cfbedfc4da3340218492196241d89eefb6dab27de5df917a6d2e78cf" +dependencies = [ + "cfg-if", + "digest", +] + [[package]] name = "memchr" version = "2.8.0" @@ -2586,7 +2617,7 @@ dependencies = [ "cfg-if", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2606,7 +2637,7 @@ checksum = "4568f25ccbd45ab5d5603dc34318c1ec56b117531781260002151b8530a9f931" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -2842,7 +2873,7 @@ dependencies = [ "futures-sink", "js-sys", "pin-project-lite", - "thiserror", + "thiserror 2.0.18", "tracing", ] @@ -2872,7 +2903,7 @@ dependencies = [ "opentelemetry_sdk", "prost", "reqwest", - "thiserror", + "thiserror 2.0.18", "tokio", "tonic", "tracing", @@ -2913,7 +2944,7 @@ dependencies = [ "opentelemetry", "percent-encoding", "rand 0.9.2", - "thiserror", + "thiserror 2.0.18", "tokio", "tokio-stream", ] @@ -2943,7 +2974,7 @@ dependencies = [ "rc2", "sha1", "sha2", - "thiserror", + "thiserror 2.0.18", "x509-parser", ] @@ -2998,7 +3029,7 @@ dependencies = [ "regex", "regex-syntax", "structmeta", - "syn", + "syn 2.0.117", ] [[package]] @@ -3043,7 +3074,7 @@ checksum = "d9b20ed30f105399776b9c883e68e536ef602a16ae6f596d2c473591d6ad64c6" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3216,7 +3247,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" dependencies = [ "proc-macro2", - "syn", + "syn 2.0.117", ] [[package]] @@ -3263,7 +3294,7 @@ dependencies = [ "itertools 0.14.0", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3292,7 +3323,7 @@ checksum = "7347867d0a7e1208d93b46767be83e2b8f978c3dad35f775ac8d8847551d6fe1" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3309,7 +3340,7 @@ dependencies = [ "rustc-hash", "rustls", "socket2", - "thiserror", + "thiserror 2.0.18", "tokio", "tracing", "web-time", @@ -3330,7 +3361,7 @@ dependencies = [ "rustls", "rustls-pki-types", "slab", - "thiserror", + "thiserror 2.0.18", "tinyvec", "tracing", "web-time", @@ -3572,7 +3603,7 @@ checksum = "a4e608c6638b9c18977b00b475ac1f28d14e84b27d8d42f70e0bf1e3dec127ac" dependencies = [ "getrandom 0.2.17", "libredox", - "thiserror", + "thiserror 2.0.18", ] [[package]] @@ -3592,7 +3623,7 @@ checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3717,7 +3748,7 @@ checksum = "8100bb34c0a1d0f907143db3149e6b4eea3c33b9ee8b189720168e818303986f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -3740,7 +3771,7 @@ dependencies = [ "proc-macro2", "quote", "rust-embed-utils", - "syn", + "syn 2.0.117", "walkdir", ] @@ -3949,7 +3980,7 @@ dependencies = [ "proc-macro2", "quote", "serde_derive_internals", - "syn", + "syn 2.0.117", ] [[package]] @@ -4035,7 +4066,7 @@ checksum = "d540f220d3187173da220f885ab66608367b6574e925011a9353e4badda91d79" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4046,7 +4077,7 @@ checksum = "18d26a20a969b9e3fdf2fc2d9f21eda6c40e2de84c9408bb5d3b05d499aae711" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4081,7 +4112,7 @@ checksum = "175ee3e80ae9982737ca543e96133087cbd9a485eecc3bc4de9c1a37b47ea59c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4133,7 +4164,7 @@ dependencies = [ "darling", "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4187,7 +4218,18 @@ checksum = "0a7d91949b85b0d2fb687445e448b40d322b6b3e4af6b44a29b21d9a5f33e6d9" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", +] + +[[package]] +name = "sha-1" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5058ada175748e33390e40e872bd0fe59a19f265d0158daa551c5a88a76009c" +dependencies = [ + "cfg-if", + "cpufeatures 0.2.17", + "digest", ] [[package]] @@ -4317,7 +4359,7 @@ dependencies = [ "proc-macro2", "quote", "structmeta-derive", - "syn", + "syn 2.0.117", ] [[package]] @@ -4328,7 +4370,7 @@ checksum = "152a0b65a590ff6c3da95cabe2353ee04e6167c896b28e3b14478c2636c922fc" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4337,6 +4379,17 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.117" @@ -4365,7 +4418,22 @@ checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", +] + +[[package]] +name = "sysinfo" +version = "0.29.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd727fc423c2060f6c92d9534cef765c65a6ed3f428a03d7def74a8c4348e666" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "winapi", ] [[package]] @@ -4379,7 +4447,7 @@ dependencies = [ "ntapi", "objc2-core-foundation", "objc2-io-kit", - "windows", + "windows 0.62.2", ] [[package]] @@ -4459,7 +4527,7 @@ dependencies = [ "serde", "serde_json", "serde_with", - "thiserror", + "thiserror 2.0.18", "tokio", "tokio-stream", "tokio-util", @@ -4475,13 +4543,33 @@ dependencies = [ "testcontainers", ] +[[package]] +name = "thiserror" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6aaf5339b578ea85b50e080feb250a3e8ae8cfcdff9a461c9ec2904bc923f52" +dependencies = [ + "thiserror-impl 1.0.69", +] + [[package]] name = "thiserror" version = "2.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4288b5bcbc7920c07a1149a35cf9590a2aa808e0bc1eafaade0b80947865fbc4" dependencies = [ - "thiserror-impl", + "thiserror-impl 2.0.18", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fee6c4efc90059e10f81e6d42c60a18f76588c3d74cb83a0b242a2b6c7504c1" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", ] [[package]] @@ -4492,7 +4580,7 @@ checksum = "ebc4ee7f67670e9b64d05fa4253e753e016c6c95ff35b89b7941d6b856dec1d5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4595,7 +4683,7 @@ checksum = "5c55a2eff8b69ce66c84f85e1da1c233edc36ceb85a2058d11b0d6a3c7e7569c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4819,7 +4907,7 @@ checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -4909,7 +4997,7 @@ dependencies = [ "log", "rand 0.9.2", "sha1", - "thiserror", + "thiserror 2.0.18", "utf-8", ] @@ -5092,6 +5180,12 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b" + [[package]] name = "wasm-bindgen" version = "0.2.114" @@ -5138,7 +5232,7 @@ dependencies = [ "bumpalo", "proc-macro2", "quote", - "syn", + "syn 2.0.117", "wasm-bindgen-shared", ] @@ -5227,6 +5321,17 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "whoami" +version = "1.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5d4a4db5077702ca3015d3d02d74974948aba2ad9e12ab7df718ee64ccd7e97d" +dependencies = [ + "libredox", + "wasite", + "web-sys", +] + [[package]] name = "winapi" version = "0.3.9" @@ -5258,6 +5363,17 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" +[[package]] +name = "windows" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e686886bc078bc1b0b600cac0147aadb815089b6e4da64016cbd754b6342700f" +dependencies = [ + "windows-implement 0.48.0", + "windows-interface 0.48.0", + "windows-targets 0.48.5", +] + [[package]] name = "windows" version = "0.62.2" @@ -5285,8 +5401,8 @@ version = "0.62.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b8e83a14d34d0623b51dce9581199302a221863196a1dde71a7663a4c2be9deb" dependencies = [ - "windows-implement", - "windows-interface", + "windows-implement 0.60.2", + "windows-interface 0.59.3", "windows-link", "windows-result", "windows-strings", @@ -5303,6 +5419,17 @@ dependencies = [ "windows-threading", ] +[[package]] +name = "windows-implement" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e2ee588991b9e7e6c8338edf3333fbe4da35dc72092643958ebb43f0ab2c49c" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "windows-implement" version = "0.60.2" @@ -5311,7 +5438,18 @@ checksum = "053e2e040ab57b9dc951b72c264860db7eb3b0200ba345b4e4c3b14f67855ddf" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", +] + +[[package]] +name = "windows-interface" +version = "0.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6fb8df20c9bcaa8ad6ab513f7b40104840c8867d5751126e4df3b08388d0cc7" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", ] [[package]] @@ -5322,7 +5460,7 @@ checksum = "3f316c4a2570ba26bbec722032c4099d8c8bc095efccdc15688708623367e358" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5395,6 +5533,21 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm 0.48.5", + "windows_aarch64_msvc 0.48.5", + "windows_i686_gnu 0.48.5", + "windows_i686_msvc 0.48.5", + "windows_x86_64_gnu 0.48.5", + "windows_x86_64_gnullvm 0.48.5", + "windows_x86_64_msvc 0.48.5", +] + [[package]] name = "windows-targets" version = "0.52.6" @@ -5437,6 +5590,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.52.6" @@ -5449,6 +5608,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a9d8416fa8b42f5c947f8482c43e7d89e73a173cead56d044f6a56104a6d1b53" +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" + [[package]] name = "windows_aarch64_msvc" version = "0.52.6" @@ -5461,6 +5626,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b9d782e804c2f632e395708e99a94275910eb9100b2114651e04744e9b125006" +[[package]] +name = "windows_i686_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" + [[package]] name = "windows_i686_gnu" version = "0.52.6" @@ -5485,6 +5656,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fa7359d10048f68ab8b09fa71c3daccfb0e9b559aed648a8f95469c27057180c" +[[package]] +name = "windows_i686_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" + [[package]] name = "windows_i686_msvc" version = "0.52.6" @@ -5497,6 +5674,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e7ac75179f18232fe9c285163565a57ef8d3c89254a30685b57d83a38d326c2" +[[package]] +name = "windows_x86_64_gnu" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + [[package]] name = "windows_x86_64_gnu" version = "0.52.6" @@ -5509,6 +5692,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c3842cdd74a865a8066ab39c8a7a473c0778a3f29370b5fd6b4b9aa7df4a499" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" + [[package]] name = "windows_x86_64_gnullvm" version = "0.52.6" @@ -5521,6 +5710,12 @@ version = "0.53.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ffa179e2d07eee8ad8f57493436566c7cc30ac536a3379fdf008f47f6bb7ae1" +[[package]] +name = "windows_x86_64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + [[package]] name = "windows_x86_64_msvc" version = "0.52.6" @@ -5557,6 +5752,16 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a90e88e4667264a994d34e6d1ab2d26d398dcdca8b7f52bec8668957517fc7d8" +[[package]] +name = "winreg" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76a1a57ff50e9b408431e8f97d5456f2807f8eb2a2cd79b06068fc87f8ecf189" +dependencies = [ + "cfg-if", + "winapi", +] + [[package]] name = "wiremock" version = "0.6.5" @@ -5610,7 +5815,7 @@ dependencies = [ "heck", "indexmap 2.13.0", "prettyplease", - "syn", + "syn 2.0.117", "wasm-metadata", "wit-bindgen-core", "wit-component", @@ -5626,7 +5831,7 @@ dependencies = [ "prettyplease", "proc-macro2", "quote", - "syn", + "syn 2.0.117", "wit-bindgen-core", "wit-bindgen-rust", ] @@ -5668,6 +5873,20 @@ dependencies = [ "wasmparser", ] +[[package]] +name = "wmi" +version = "0.12.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "daffb44abb7d2e87a1233aa17fdbde0d55b890b32a23a1f908895b87fa6f1a00" +dependencies = [ + "chrono", + "futures", + "log", + "serde", + "thiserror 1.0.69", + "windows 0.48.0", +] + [[package]] name = "writeable" version = "0.6.2" @@ -5698,7 +5917,7 @@ dependencies = [ "nom 7.1.3", "oid-registry", "rusticata-macros", - "thiserror", + "thiserror 2.0.18", "time", ] @@ -5746,7 +5965,7 @@ checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "synstructure", ] @@ -5767,7 +5986,7 @@ checksum = "0e8bc7269b54418e7aeeef514aa68f8690b8c0489a06b0136e5f57c4c5ccab89" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5787,7 +6006,7 @@ checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", "synstructure", ] @@ -5808,7 +6027,7 @@ checksum = "85a5b4158499876c763cb03bc4e49185d3cccbabb15b33c627f7884f43db852e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5841,7 +6060,7 @@ checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.117", ] [[package]] @@ -5866,7 +6085,7 @@ dependencies = [ "memchr", "pbkdf2", "sha1", - "thiserror", + "thiserror 2.0.18", "time", "xz2", "zeroize", diff --git a/engine/Cargo.toml b/engine/Cargo.toml index 4877a9a7a..e59fab540 100644 --- a/engine/Cargo.toml +++ b/engine/Cargo.toml @@ -134,6 +134,7 @@ nix = { version = "0.30.1", features = ["signal", "process"] } winapi = { version = "0.3.9", features = ["minwindef", "wincon", "winbase", "consoleapi"] } dirs = "6" hostname = "0.4" +machineid-rs = "1.2.4" sha2 = "0.10" iana-time-zone = "0.1" ring = "0.17.14" diff --git a/engine/Dockerfile b/engine/Dockerfile index 36815bc47..8d0a92eee 100644 --- a/engine/Dockerfile +++ b/engine/Dockerfile @@ -3,7 +3,7 @@ FROM gcr.io/distroless/cc-debian12:nonroot ARG TARGETARCH COPY iii-${TARGETARCH} /app/iii -ENV III_CONTAINER=docker +ENV III_EXECUTION_CONTEXT=docker ENV III_ENV=development EXPOSE 49134 3111 3112 9464 diff --git a/engine/Dockerfile.debug b/engine/Dockerfile.debug index 894ff5121..3d189dccc 100644 --- a/engine/Dockerfile.debug +++ b/engine/Dockerfile.debug @@ -22,6 +22,9 @@ WORKDIR /app COPY --from=builder --chown=iii:iii /build/target/release/iii /app/iii USER iii +ENV III_EXECUTION_CONTEXT=docker +ENV III_ENV=debug + HEALTHCHECK --interval=30s --timeout=5s --start-period=15s --retries=3 \ CMD curl -sf http://localhost:3111/health || /app/iii --version diff --git a/engine/docker-compose.prod.yml b/engine/docker-compose.prod.yml index 46ff5c746..b19202874 100644 --- a/engine/docker-compose.prod.yml +++ b/engine/docker-compose.prod.yml @@ -19,7 +19,7 @@ services: volumes: - ./config.prod.yaml:/app/config.yaml:ro environment: - - III_HOST_USER_ID=${III_HOST_USER_ID:-} + - III_EXECUTION_CONTEXT=docker healthcheck: test: ["CMD-SHELL", "nc -z 127.0.0.1 3111 || exit 1"] interval: 10s diff --git a/engine/docker-compose.yml b/engine/docker-compose.yml index 8b79f426a..5f59c4cbd 100644 --- a/engine/docker-compose.yml +++ b/engine/docker-compose.yml @@ -12,7 +12,7 @@ services: - ./config.yaml:/app/config.yaml:ro environment: - RUST_LOG=info - - III_HOST_USER_ID=${III_HOST_USER_ID:-} + - III_EXECUTION_CONTEXT=docker depends_on: redis: condition: service_healthy diff --git a/engine/install.sh b/engine/install.sh index 5075ed916..ada9cd61f 100755 --- a/engine/install.sh +++ b/engine/install.sh @@ -4,24 +4,45 @@ set -eu REPO="${REPO:-iii-hq/iii}" BIN_NAME="${BIN_NAME:-iii}" -AMPLITUDE_ENDPOINT="https://api2.amplitude.com/2/httpapi" -AMPLITUDE_API_KEY="${III_INSTALL_AMPLITUDE_API_KEY:-a7182ac460dde671c8f2e1318b517228}" +telemetry_emitter="" +install_event_prefix="" +from_version="" +release_version="" + +# Returns a JSON-encoded string (with surrounding quotes). +# Uses jq when available; falls back to awk for escaping. +json_str() { + if command -v jq >/dev/null 2>&1; then + printf '%s' "$1" | jq -Rs '.' + return + fi + printf '%s' "$1" | awk ' + BEGIN { ORS=""; printf "\"" } + { + gsub(/\\/, "\\\\") + gsub(/"/, "\\\"") + gsub(/\t/, "\\t") + gsub(/\r/, "\\r") + if (NR > 1) printf "\\n" + printf "%s", $0 + } + END { printf "\"" } + ' +} err() { _stage="$1"; shift echo "error: $*" >&2 - if [ -n "${install_event_prefix:-}" ] && [ -n "${install_id:-}" ] && [ -n "${telemetry_id:-}" ]; then - _err_msg=$(echo "$*" | tr '"' "'") + if [ -n "${install_event_prefix:-}" ]; then if [ "$install_event_prefix" = "upgrade" ]; then - iii_send_event "upgrade_failed" \ - "\"install_id\":\"${install_id}\",\"from_version\":\"${from_version:-}\",\"install_method\":\"sh\",\"target_binary\":\"${BIN_NAME}\",\"error_stage\":\"${_stage}\",\"error_message\":\"${_err_msg}\"" \ - "$telemetry_id" "$install_id" + payload=$(printf '{"from_version":%s,"to_version":%s,"install_method":"sh","target_binary":%s,"error_stage":%s,"error_message":%s}' \ + "$(json_str "${from_version:-}")" "$(json_str "$release_version")" "$(json_str "$BIN_NAME")" "$(json_str "$_stage")" "$(json_str "$*")") + iii_emit_event "upgrade_failed" "$payload" else - iii_send_event "install_failed" \ - "\"install_id\":\"${install_id}\",\"install_method\":\"sh\",\"target_binary\":\"${BIN_NAME}\",\"error_stage\":\"${_stage}\",\"error_message\":\"${_err_msg}\"" \ - "$telemetry_id" "$install_id" + payload=$(printf '{"install_method":"sh","target_binary":%s,"error_stage":%s,"error_message":%s}' \ + "$(json_str "$BIN_NAME")" "$(json_str "$_stage")" "$(json_str "$*")") + iii_emit_event "install_failed" "$payload" fi - wait fi exit 1 } @@ -30,163 +51,17 @@ err() { # Telemetry helpers # --------------------------------------------------------------------------- -iii_telemetry_enabled() { - case "${III_TELEMETRY_ENABLED:-}" in - false|0) return 1 ;; - esac - for ci_var in CI GITHUB_ACTIONS GITLAB_CI CIRCLECI JENKINS_URL TRAVIS BUILDKITE TF_BUILD CODEBUILD_BUILD_ID BITBUCKET_BUILD_NUMBER DRONE TEAMCITY_VERSION; do - if [ -n "$(eval "echo \${${ci_var}:-}")" ]; then - return 1 - fi - done - return 0 -} - -iii_gen_uuid() { - if command -v uuidgen >/dev/null 2>&1; then - uuidgen | tr '[:upper:]' '[:lower:]' - elif [ -r /proc/sys/kernel/random/uuid ]; then - cat /proc/sys/kernel/random/uuid - else - od -x /dev/urandom 2>/dev/null | head -1 | awk '{OFS="-"; print $2$3,$4,$5,$6,$7$8$9}' | head -c 36 || echo "00000000-0000-0000-0000-000000000000" - fi -} - -iii_toml_path() { - echo "${HOME}/.iii/telemetry.toml" -} - -iii_read_toml_key() { - _toml_section="$1" - _toml_key="$2" - _toml_file=$(iii_toml_path) - if [ ! -f "$_toml_file" ]; then - echo "" - return - fi - _in_section=0 - while IFS= read -r _line || [ -n "$_line" ]; do - _line=$(printf '%s' "$_line" | sed 's/^[[:space:]]*//;s/[[:space:]]*$//') - case "$_line" in - "[$_toml_section]") _in_section=1 ;; - "["*"]") _in_section=0 ;; - *) - if [ "$_in_section" = "1" ]; then - case "$_line" in - "$_toml_key ="*|"$_toml_key= "*|"$_toml_key=") - _val=$(printf '%s' "$_line" | cut -d'=' -f2- | sed 's/^[[:space:]]*//;s/^"//;s/"$//') - echo "$_val" - return - ;; - esac - fi - ;; - esac - done < "$_toml_file" - echo "" -} - -iii_set_toml_key() { - _toml_section="$1" - _toml_key="$2" - _toml_value="$3" - _toml_file=$(iii_toml_path) - mkdir -p "$(dirname "$_toml_file")" - _tmp_file="${_toml_file}.tmp" - _written=0 - _in_target=0 - _key_written=0 - : > "$_tmp_file" - if [ -f "$_toml_file" ]; then - while IFS= read -r _line || [ -n "$_line" ]; do - _trimmed=$(printf '%s' "$_line" | sed 's/^[[:space:]]*//;s/[[:space:]]*$//') - case "$_trimmed" in - "[$_toml_section]") - printf '%s\n' "$_trimmed" >> "$_tmp_file" - _in_target=1 - ;; - "["*"]") - if [ "$_in_target" = "1" ] && [ "$_key_written" = "0" ]; then - printf '%s = "%s"\n' "$_toml_key" "$_toml_value" >> "$_tmp_file" - _key_written=1 - fi - _in_target=0 - printf '%s\n' "$_trimmed" >> "$_tmp_file" - ;; - "$_toml_key ="*|"$_toml_key= "*|"$_toml_key=") - if [ "$_in_target" = "1" ]; then - printf '%s = "%s"\n' "$_toml_key" "$_toml_value" >> "$_tmp_file" - _key_written=1 - else - printf '%s\n' "$_line" >> "$_tmp_file" - fi - ;; - "") - printf '\n' >> "$_tmp_file" - ;; - *) - printf '%s\n' "$_line" >> "$_tmp_file" - ;; - esac - done < "$_toml_file" - fi - if [ "$_key_written" = "0" ]; then - if [ "$_in_target" = "1" ]; then - printf '%s = "%s"\n' "$_toml_key" "$_toml_value" >> "$_tmp_file" - else - printf '\n[%s]\n%s = "%s"\n' "$_toml_section" "$_toml_key" "$_toml_value" >> "$_tmp_file" - fi - fi - mv "$_tmp_file" "$_toml_file" -} - -iii_get_or_create_telemetry_id() { - _existing_id=$(iii_read_toml_key "identity" "id") - if [ -n "$_existing_id" ]; then - echo "$_existing_id" - return - fi - - _legacy_path="${HOME}/.iii/telemetry_id" - if [ -f "$_legacy_path" ]; then - _legacy_id=$(cat "$_legacy_path" 2>/dev/null | tr -d '[:space:]') - if [ -n "$_legacy_id" ]; then - iii_set_toml_key "identity" "id" "$_legacy_id" - echo "$_legacy_id" - return - fi - fi - - mkdir -p "${HOME}/.iii" - _new_id="auto-$(iii_gen_uuid)" - iii_set_toml_key "identity" "id" "$_new_id" - echo "$_new_id" -} - -iii_send_event() { +iii_emit_event() { _event_type="$1" _event_props="$2" - _telemetry_id="$3" - _install_id="$4" - - if [ -z "$AMPLITUDE_API_KEY" ]; then - return 0 - fi - - if ! iii_telemetry_enabled; then + if [ -z "${telemetry_emitter:-}" ] || [ ! -x "$telemetry_emitter" ]; then return 0 fi - - _os=$(uname -s 2>/dev/null | tr '[:upper:]' '[:lower:]' || echo "unknown") - _arch=$(uname -m 2>/dev/null || echo "unknown") - _ts=$(date +%s 2>/dev/null || echo "0") - _ts_ms=$(( _ts * 1000 )) - - _payload="{\"api_key\":\"${AMPLITUDE_API_KEY}\",\"events\":[{\"device_id\":\"${_telemetry_id}\",\"user_id\":\"${_telemetry_id}\",\"event_type\":\"${_event_type}\",\"event_properties\":{${_event_props}},\"platform\":\"install-script\",\"os_name\":\"${_os}\",\"app_version\":\"script\",\"time\":${_ts_ms},\"insert_id\":\"$(iii_gen_uuid)\",\"ip\":\"\$remote\"}]}" - - curl -s -o /dev/null -X POST "$AMPLITUDE_ENDPOINT" \ - -H "Content-Type: application/json" \ - --data-raw "$_payload" & + "$telemetry_emitter" \ + --install-only-generate-ids \ + --install-event-type "$_event_type" \ + --install-event-properties "$_event_props" \ + >/dev/null 2>&1 || true } iii_detect_from_version() { @@ -200,20 +75,6 @@ iii_detect_from_version() { fi } -iii_export_host_user_id() { - _huid=$(iii_read_toml_key "identity" "id") - if [ -z "$_huid" ]; then - return 0 - fi - _export_line="export III_HOST_USER_ID=\"${_huid}\"" - for _profile in "${HOME}/.bashrc" "${HOME}/.zshrc" "${HOME}/.profile"; do - if [ -f "$_profile" ] && ! grep -qF "III_HOST_USER_ID" "$_profile" 2>/dev/null; then - printf '\n# iii host correlation\n%s\n' "$_export_line" >> "$_profile" - break - fi - done -} - # --- Argument parsing --- engine_version="${VERSION:-}" @@ -264,9 +125,6 @@ if ! command -v curl >/dev/null 2>&1; then err "dependency" "curl is required" fi -install_id=$(iii_gen_uuid) -telemetry_id=$(iii_get_or_create_telemetry_id) - if [ -n "${TARGET:-}" ]; then target="$TARGET" else @@ -335,6 +193,7 @@ if [ -n "$VERSION" ]; then echo "installing version: $VERSION" _ver="${VERSION#iii/}" _ver="${_ver#v}" + release_version="$_ver" _tag="iii/v${_ver}" api_url="https://api.github.com/repos/$REPO/releases/tags/${_tag}" json=$(github_api "$api_url" 2>/dev/null) || { @@ -409,6 +268,18 @@ else fi fi +if [ -z "$release_version" ]; then + if command -v jq >/dev/null 2>&1; then + release_version=$(printf '%s' "$json" | jq -r '.tag_name' | sed -E 's#^(iii/)?v##') + else + release_version=$(printf '%s' "$json" \ + | grep -oE '"tag_name"[[:space:]]*:[[:space:]]*"[^"]+"' \ + | head -n 1 \ + | sed -E 's/.*"([^"]+)".*/\1/' \ + | sed -E 's#^(iii/)?v##') + fi +fi + if command -v jq >/dev/null 2>&1; then asset_url=$(printf '%s' "$json" \ | jq -r --arg bn "$BIN_NAME" --arg target "$target" \ @@ -446,14 +317,12 @@ fi from_version=$(iii_detect_from_version "$bin_dir/$BIN_NAME") if [ -n "$from_version" ]; then install_event_prefix="upgrade" - iii_send_event "upgrade_started" \ - "\"install_id\":\"${install_id}\",\"from_version\":\"${from_version}\",\"install_method\":\"sh\",\"target_binary\":\"${BIN_NAME}\"" \ - "$telemetry_id" "$install_id" + # Use the existing binary for telemetry until the new one is extracted + if [ -x "$bin_dir/$BIN_NAME" ]; then + telemetry_emitter="$bin_dir/$BIN_NAME" + fi else install_event_prefix="install" - iii_send_event "install_started" \ - "\"install_id\":\"${install_id}\",\"install_method\":\"sh\",\"target_binary\":\"${BIN_NAME}\",\"os\":\"$(uname -s 2>/dev/null | tr '[:upper:]' '[:lower:]' || echo unknown)\",\"arch\":\"$(uname -m 2>/dev/null || echo unknown)\"" \ - "$telemetry_id" "$install_id" fi mkdir -p "$bin_dir" @@ -464,17 +333,20 @@ cleanup() { } trap cleanup EXIT INT TERM -curl -fsSL -L "$asset_url" -o "$tmpdir/$asset_name" +curl -fsSL -L "$asset_url" -o "$tmpdir/$asset_name" \ + || err "download" "failed to download $asset_url" case "$asset_name" in *.tar.gz|*.tgz) - tar -xzf "$tmpdir/$asset_name" -C "$tmpdir" + tar -xzf "$tmpdir/$asset_name" -C "$tmpdir" \ + || err "extract" "failed to extract $asset_name" ;; *.zip) if ! command -v unzip >/dev/null 2>&1; then err "extract" "unzip is required to extract $asset_name" fi - unzip -q "$tmpdir/$asset_name" -d "$tmpdir" + unzip -q "$tmpdir/$asset_name" -d "$tmpdir" \ + || err "extract" "failed to extract $asset_name" ;; *) ;; @@ -490,12 +362,24 @@ if [ -z "${bin_file:-}" ] || [ ! -f "$bin_file" ]; then err "binary_lookup" "binary not found in downloaded asset" fi +telemetry_emitter="$bin_file" +if [ "$install_event_prefix" = "upgrade" ]; then + payload=$(printf '{"from_version":%s,"to_version":%s,"install_method":"sh","target_binary":%s}' \ + "$(json_str "$from_version")" "$(json_str "$release_version")" "$(json_str "$BIN_NAME")") + iii_emit_event "upgrade_started" "$payload" +else + payload=$(printf '{"install_method":"sh","target_binary":%s,"os":%s,"arch":%s}' \ + "$(json_str "$BIN_NAME")" "$(json_str "$(uname -s 2>/dev/null | tr '[:upper:]' '[:lower:]' || echo unknown)")" "$(json_str "$(uname -m 2>/dev/null || echo unknown)")") + iii_emit_event "install_started" "$payload" +fi + installed_version="" if command -v install >/dev/null 2>&1; then - install -m 755 "$bin_file" "$bin_dir/$BIN_NAME" + install -m 755 "$bin_file" "$bin_dir/$BIN_NAME" \ + || err "install" "failed to install binary to $bin_dir/$BIN_NAME" else - cp "$bin_file" "$bin_dir/$BIN_NAME" - chmod 755 "$bin_dir/$BIN_NAME" + { cp "$bin_file" "$bin_dir/$BIN_NAME" && chmod 755 "$bin_dir/$BIN_NAME"; } \ + || err "install" "failed to copy binary to $bin_dir/$BIN_NAME" fi installed_version=$("$bin_dir/$BIN_NAME" --version 2>/dev/null | awk '{print $NF}' || echo "") @@ -503,16 +387,18 @@ installed_version=$("$bin_dir/$BIN_NAME" --version 2>/dev/null | awk '{print $NF printf 'installed %s to %s\n' "$BIN_NAME" "$bin_dir/$BIN_NAME" if [ "$install_event_prefix" = "upgrade" ]; then - iii_send_event "upgrade_succeeded" \ - "\"install_id\":\"${install_id}\",\"from_version\":\"${from_version}\",\"to_version\":\"${installed_version}\",\"install_method\":\"sh\",\"target_binary\":\"${BIN_NAME}\"" \ - "$telemetry_id" "$install_id" + payload=$(printf '{"from_version":%s,"to_version":%s,"install_method":"sh","target_binary":%s}' \ + "$(json_str "$from_version")" "$(json_str "$installed_version")" "$(json_str "$BIN_NAME")") + iii_emit_event "upgrade_succeeded" "$payload" else - iii_send_event "install_succeeded" \ - "\"install_id\":\"${install_id}\",\"installed_version\":\"${installed_version}\",\"install_method\":\"sh\",\"target_binary\":\"${BIN_NAME}\"" \ - "$telemetry_id" "$install_id" + payload=$(printf '{"installed_version":%s,"install_method":"sh","target_binary":%s}' \ + "$(json_str "$installed_version")" "$(json_str "$BIN_NAME")") + iii_emit_event "install_succeeded" "$payload" fi -iii_export_host_user_id +# Best-effort: have the binary initialize its telemetry IDs. +# Older binaries won't have this flag — silently skip. +"$bin_dir/$BIN_NAME" --install-only-generate-ids >/dev/null 2>&1 || true case ":$PATH:" in *":$bin_dir:"*) diff --git a/engine/src/cli/registry.rs b/engine/src/cli/registry.rs index 1810f5b3b..7898f8e5d 100644 --- a/engine/src/cli/registry.rs +++ b/engine/src/cli/registry.rs @@ -120,7 +120,7 @@ pub static REGISTRY: &[BinarySpec] = &[ cli_command: "cloud", binary_subcommand: None, }], - tag_prefix: None, + tag_prefix: Some("cli"), }, ]; diff --git a/engine/src/cli/telemetry.rs b/engine/src/cli/telemetry.rs index 14e5609a2..3744bd67e 100644 --- a/engine/src/cli/telemetry.rs +++ b/engine/src/cli/telemetry.rs @@ -4,63 +4,11 @@ // This software is patent protected. We welcome discussions - reach out at support@motia.dev // See LICENSE and PATENTS files for details. -use std::collections::BTreeMap; - use serde::Serialize; -use sha2::{Digest, Sha256}; - -const AMPLITUDE_ENDPOINT: &str = "https://api2.amplitude.com/2/httpapi"; - -// --------------------------------------------------------------------------- -// ~/.iii/telemetry.toml helpers (shared format with engine) -// --------------------------------------------------------------------------- - -type TomlSections = BTreeMap>; - -fn telemetry_toml_path() -> std::path::PathBuf { - dirs::home_dir() - .unwrap_or_else(std::env::temp_dir) - .join(".iii") - .join("telemetry.toml") -} - -fn read_toml_key(section: &str, key: &str) -> Option { - let contents = std::fs::read_to_string(telemetry_toml_path()).ok()?; - let sections: TomlSections = toml::from_str(&contents).ok()?; - sections - .get(section)? - .get(key) - .filter(|v| !v.is_empty()) - .cloned() -} -fn set_toml_key(section: &str, key: &str, value: &str) { - let path = telemetry_toml_path(); - let contents = std::fs::read_to_string(&path).unwrap_or_default(); - let mut sections: TomlSections = toml::from_str(&contents).unwrap_or_default(); - sections - .entry(section.to_string()) - .or_default() - .insert(key.to_string(), value.to_string()); - let serialized = match toml::to_string(§ions) { - Ok(s) => s, - Err(_) => return, - }; - if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent).ok(); - } - let tmp = path.with_extension("tmp"); - if std::fs::write(&tmp, &serialized).is_ok() { - #[cfg(unix)] - { - use std::os::unix::fs::PermissionsExt; - let perms = std::fs::Permissions::from_mode(0o600); - std::fs::set_permissions(&tmp, perms).ok(); - } - std::fs::rename(&tmp, &path).ok(); - } -} +use iii::modules::telemetry::environment; +const AMPLITUDE_ENDPOINT: &str = "https://api2.amplitude.com/2/httpapi"; const API_KEY: &str = "a7182ac460dde671c8f2e1318b517228"; #[derive(Serialize)] @@ -87,77 +35,6 @@ struct AmplitudePayload<'a> { events: Vec, } -fn detect_machine_id() -> String { - let hostname = std::env::var("HOSTNAME").unwrap_or_else(|_| "unknown".to_string()); - let mut hasher = Sha256::new(); - hasher.update(hostname.as_bytes()); - let result = hasher.finalize(); - result[..8].iter().map(|b| format!("{:02x}", b)).collect() -} - -fn detect_is_container() -> bool { - if std::env::var("III_CONTAINER").is_ok() { - return true; - } - if std::env::var("KUBERNETES_SERVICE_HOST").is_ok() { - return true; - } - std::path::Path::new("/.dockerenv").exists() -} - -fn detect_install_method() -> &'static str { - if let Ok(exe) = std::env::current_exe() { - let path = exe.to_string_lossy(); - if path.contains("homebrew") || path.contains("Cellar") || path.contains("linuxbrew") { - return "brew"; - } - if path.contains("chocolatey") || path.contains("choco") { - return "chocolatey"; - } - if path.contains(".local/bin") { - return "sh"; - } - } - "manual" -} - -fn build_user_properties() -> serde_json::Value { - serde_json::json!({ - "environment.os": std::env::consts::OS, - "environment.arch": std::env::consts::ARCH, - "environment.cpu_cores": std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1), - "environment.timezone": std::env::var("TZ").unwrap_or_else(|_| "Unknown".to_string()), - "environment.machine_id": detect_machine_id(), - "environment.is_container": detect_is_container(), - "env": std::env::var("III_ENV").unwrap_or_else(|_| "unknown".to_string()), - "install_method": detect_install_method(), - "cli_version": env!("CARGO_PKG_VERSION"), - "host_user_id": std::env::var("III_HOST_USER_ID").ok(), - }) -} - -fn get_or_create_telemetry_id() -> String { - if let Some(id) = read_toml_key("identity", "id") { - return id; - } - - let legacy_path = dirs::home_dir() - .unwrap_or_else(std::env::temp_dir) - .join(".iii") - .join("telemetry_id"); - if let Ok(id) = std::fs::read_to_string(&legacy_path) { - let id = id.trim().to_string(); - if !id.is_empty() { - set_toml_key("identity", "id", &id); - return id; - } - } - - let id = format!("auto-{}", uuid::Uuid::new_v4()); - set_toml_key("identity", "id", &id); - id -} - fn is_telemetry_disabled() -> bool { if let Ok(val) = std::env::var("III_TELEMETRY_ENABLED") && (val == "false" || val == "0") @@ -165,44 +42,44 @@ fn is_telemetry_disabled() -> bool { return true; } - if std::env::var("III_TELEMETRY_DEV").ok().as_deref() == Some("true") { - return true; - } - - const CI_VARS: &[&str] = &[ - "CI", - "GITHUB_ACTIONS", - "GITLAB_CI", - "CIRCLECI", - "JENKINS_URL", - "TRAVIS", - "BUILDKITE", - "TF_BUILD", - "CODEBUILD_BUILD_ID", - "BITBUCKET_BUILD_NUMBER", - "DRONE", - "TEAMCITY_VERSION", - ]; - if CI_VARS.iter().any(|v| std::env::var(v).is_ok()) { - return true; - } + environment::is_ci_environment() || environment::is_dev_optout() +} - false +fn build_user_properties(install_method_override: Option<&str>) -> serde_json::Value { + let env_info = environment::EnvironmentInfo::collect(); + let install_method = match install_method_override { + Some(m) => m, + None => environment::detect_install_method(), + }; + serde_json::json!({ + "environment.os": env_info.os, + "environment.arch": env_info.arch, + "environment.cpu_cores": env_info.cpu_cores, + "environment.timezone": env_info.timezone, + "environment.machine_id": env_info.machine_id, + "iii_execution_context": env_info.iii_execution_context, + "env": environment::detect_env(), + "install_method": install_method, + "cli_version": env!("CARGO_PKG_VERSION"), + }) } -fn build_event(event_type: &str, properties: serde_json::Value) -> Option { +fn build_event( + event_type: &str, + properties: serde_json::Value, + install_method_override: Option<&str>, +) -> Option { if is_telemetry_disabled() { return None; } - let telemetry_id = get_or_create_telemetry_id(); + let device_id = environment::get_or_create_device_id(); Some(AmplitudeEvent { - device_id: telemetry_id.clone(), - // user_id: currently telemetry_id, will become iii cloud user ID when accounts ship - user_id: Some(telemetry_id), + device_id, + user_id: None, event_type: event_type.to_string(), event_properties: properties, - user_properties: Some(build_user_properties()), + user_properties: Some(build_user_properties(install_method_override)), platform: "iii".to_string(), os_name: std::env::consts::OS.to_string(), app_version: env!("CARGO_PKG_VERSION").to_string(), @@ -212,30 +89,46 @@ fn build_event(event_type: &str, properties: serde_json::Value) -> Option, + + /// Install lifecycle event properties as JSON. + #[arg( + long, + hide = true, + global = true, + requires = "install_only_generate_ids" + )] + install_event_properties: Option, } #[derive(Subcommand, Debug)] @@ -172,6 +194,23 @@ async fn main() -> anyhow::Result<()> { return Ok(()); } + if cli_args.install_only_generate_ids { + let _ = iii::modules::telemetry::environment::get_or_create_device_id(); + let _ = iii::modules::telemetry::environment::resolve_execution_context(); + + if let Some(event_type) = cli_args.install_event_type.as_deref() { + let properties = if let Some(raw) = cli_args.install_event_properties.as_deref() { + serde_json::from_str(raw).map_err(|e| { + anyhow::anyhow!("invalid --install-event-properties JSON '{}': {}", raw, e) + })? + } else { + serde_json::json!({}) + }; + cli::telemetry::send_install_lifecycle_event(event_type, properties).await; + } + return Ok(()); + } + match &cli_args.command { Some(Commands::Trigger(args)) => cli_trigger::run_trigger(args).await, Some(Commands::Console { args }) => { @@ -499,6 +538,31 @@ mod tests { } } + #[test] + fn hidden_install_only_generate_ids_parses() { + let cli = Cli::try_parse_from(["iii", "--install-only-generate-ids"]) + .expect("should parse hidden install-only flag"); + assert!(cli.install_only_generate_ids); + } + + #[test] + fn hidden_install_event_fields_parse() { + let cli = Cli::try_parse_from([ + "iii", + "--install-only-generate-ids", + "--install-event-type", + "install_succeeded", + "--install-event-properties", + r#"{"target_binary":"iii"}"#, + ]) + .expect("should parse hidden install event flags"); + assert_eq!(cli.install_event_type.as_deref(), Some("install_succeeded")); + assert_eq!( + cli.install_event_properties.as_deref(), + Some(r#"{"target_binary":"iii"}"#) + ); + } + #[test] fn update_iii_cli_target_is_accepted() { // Users with old iii-cli may type "iii update iii-cli" — this must diff --git a/engine/src/modules/telemetry/environment.rs b/engine/src/modules/telemetry/environment.rs index 7290c9bf2..d38384f22 100644 --- a/engine/src/modules/telemetry/environment.rs +++ b/engine/src/modules/telemetry/environment.rs @@ -4,11 +4,36 @@ // This software is patent protected. We welcome discussions - reach out at support@motia.dev // See LICENSE and PATENTS files for details. -use std::collections::BTreeMap; - +use machineid_rs::{Encryption, HWIDComponent, IdBuilder}; use sha2::{Digest, Sha256}; -type TomlSections = BTreeMap>; +const TELEMETRY_SCHEMA_VERSION: u8 = 2; +const DEVICE_ID_SALT: &str = "iii-machine-id"; +const EXECUTION_CONTEXT_ENV: &str = "III_EXECUTION_CONTEXT"; +const EXECUTION_CONTEXT_YAML_DEFAULT: &str = "${III_EXECUTION_CONTEXT:user}"; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)] +struct TelemetryYaml { + version: Option, + #[serde(default)] + identity: IdentitySection, + #[serde(default)] + state: StateSection, + #[serde(default)] + iii_execution_context: Option, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)] +struct IdentitySection { + #[serde(default)] + device_id: Option, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, Default)] +struct StateSection { + #[serde(default)] + first_run_sent: Option, +} fn iii_dir() -> std::path::PathBuf { dirs::home_dir() @@ -17,7 +42,7 @@ fn iii_dir() -> std::path::PathBuf { } pub fn telemetry_config_path() -> std::path::PathBuf { - iii_dir().join("telemetry.toml") + iii_dir().join("telemetry.yaml") } fn write_atomic(path: &std::path::Path, content: &str) { @@ -36,96 +61,337 @@ fn write_atomic(path: &std::path::Path, content: &str) { } } +fn normalize_execution_context(value: &str) -> String { + match value.trim().to_lowercase().as_str() { + "kubernetes" | "k8s" => "kubernetes".to_string(), + "docker" => "docker".to_string(), + "container" => "container".to_string(), + "ci" | "cicd" => "ci".to_string(), + "user" => "user".to_string(), + "" => "user".to_string(), + _ => "unknown".to_string(), + } +} + +fn parse_yaml_env_syntax(raw: &str) -> Option<(String, Option)> { + if !(raw.starts_with("${") && raw.ends_with('}')) { + return None; + } + + let inner = &raw[2..raw.len() - 1]; + let mut parts = inner.splitn(2, ':'); + let var = parts.next()?.trim(); + if var.is_empty() { + return None; + } + let default = parts.next().map(|s| s.to_string()); + Some((var.to_string(), default)) +} + +fn expand_yaml_env_syntax(raw: &str) -> String { + if let Some((var, default)) = parse_yaml_env_syntax(raw) { + match std::env::var(&var) { + Ok(val) if !val.is_empty() => val, + _ => default.unwrap_or_default(), + } + } else { + raw.to_string() + } +} + +fn machine_id_from_machineid_rs() -> Option { + let mut builder = IdBuilder::new(Encryption::SHA256); + builder.add_component(HWIDComponent::SystemID); + builder + .build(DEVICE_ID_SALT) + .ok() + .filter(|id| !id.trim().is_empty()) +} + +fn is_container_environment() -> bool { + if detect_container_runtime() != "none" { + return true; + } + let ctx = std::env::var(EXECUTION_CONTEXT_ENV).unwrap_or_default(); + matches!( + normalize_execution_context(&ctx).as_str(), + "docker" | "container" | "kubernetes" + ) +} + +fn container_hostname() -> String { + std::env::var("HOSTNAME") + .ok() + .filter(|h| !h.is_empty()) + .or_else(|| { + std::fs::read_to_string("/etc/hostname") + .ok() + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + }) + .unwrap_or_else(|| "unknown".to_string()) +} + +fn find_project_root() -> Option { + std::env::var("III_PROJECT_ROOT") + .ok() + .filter(|s| !s.is_empty()) + .map(std::path::PathBuf::from) + .or_else(|| { + let mut dir = std::env::current_dir().ok()?; + loop { + if dir.join(".iii").join("project.ini").exists() { + return Some(dir); + } + if !dir.pop() { + break; + } + } + None + }) +} + +pub fn find_project_ini_device_id() -> Option { + let root = find_project_root()?; + let contents = std::fs::read_to_string(root.join(".iii").join("project.ini")).ok()?; + contents.lines().find_map(|line| { + line.trim() + .strip_prefix("device_id=") + .map(|v| v.trim().to_string()) + .filter(|v| !v.is_empty()) + }) +} + +fn write_device_id_to_project_ini(device_id: &str) { + let Some(root) = find_project_root() else { + return; + }; + let path = root.join(".iii").join("project.ini"); + let contents = match std::fs::read_to_string(&path) { + Ok(c) => c, + Err(_) => return, + }; + + // Already has a device_id line — don't overwrite + if contents.lines().any(|l| l.trim().starts_with("device_id=")) { + return; + } + + let new_contents = format!("{}device_id={}\n", contents, device_id); + write_atomic(&path, &new_contents); +} + +fn salted_sha256(input: &str) -> String { + let mut hasher = Sha256::new(); + hasher.update(input); + hasher.update(DEVICE_ID_SALT); + format!("{:x}", hasher.finalize()) +} + +fn generate_container_device_id() -> String { + let hostname = container_hostname(); + + if let Some(host_device_id) = find_project_ini_device_id() { + return salted_sha256(&format!("{host_device_id}-{hostname}")); + } + + let base = match std::env::var("III_HOST_USER_ID") + .ok() + .filter(|s| !s.is_empty()) + { + Some(host_id) => format!("{host_id}-{hostname}"), + None => hostname, + }; + + format!("docker-{}", salted_sha256(&base)) +} + +fn generate_new_device_id() -> String { + if is_container_environment() { + return generate_container_device_id(); + } + if let Some(machine_id) = machine_id_from_machineid_rs() { + return machine_id; + } + format!("fallback-{}", uuid::Uuid::new_v4()) +} + +fn build_fresh_v2_yaml() -> TelemetryYaml { + TelemetryYaml { + version: Some(TELEMETRY_SCHEMA_VERSION), + identity: IdentitySection { + device_id: Some(generate_new_device_id()), + }, + state: StateSection::default(), + iii_execution_context: Some(EXECUTION_CONTEXT_YAML_DEFAULT.to_string()), + } +} + +fn read_telemetry_yaml(path: &std::path::Path) -> Option { + let contents = std::fs::read_to_string(path).ok()?; + serde_yaml::from_str(&contents).ok() +} + +fn write_telemetry_yaml(state: &TelemetryYaml) { + if let Ok(serialized) = serde_yaml::to_string(state) { + write_atomic(&telemetry_config_path(), &serialized); + } +} + +fn load_or_migrate_v2_state() -> TelemetryYaml { + let path = telemetry_config_path(); + let parsed = read_telemetry_yaml(&path); + + if let Some(mut state) = parsed + && state.version == Some(TELEMETRY_SCHEMA_VERSION) + { + let mut changed = false; + if state.identity.device_id.is_none() { + state.identity.device_id = Some(generate_new_device_id()); + changed = true; + } + if state.iii_execution_context.is_none() { + state.iii_execution_context = Some(EXECUTION_CONTEXT_YAML_DEFAULT.to_string()); + changed = true; + } + if changed { + write_telemetry_yaml(&state); + } + return state; + } + + let fresh = build_fresh_v2_yaml(); + write_telemetry_yaml(&fresh); + fresh +} + +pub fn get_or_create_device_id() -> String { + let mut state = load_or_migrate_v2_state(); + if let Some(existing) = state.identity.device_id.clone() + && !existing.is_empty() + { + if !is_container_environment() { + write_device_id_to_project_ini(&existing); + } + return existing; + } + + let id = generate_new_device_id(); + state.identity.device_id = Some(id.clone()); + write_telemetry_yaml(&state); + if !is_container_environment() { + write_device_id_to_project_ini(&id); + } + id +} + +pub fn resolve_execution_context() -> String { + if is_ci_environment() { + return "ci".to_string(); + } + + if let Ok(env_ctx) = std::env::var(EXECUTION_CONTEXT_ENV) + && !env_ctx.is_empty() + { + return normalize_execution_context(&env_ctx); + } + + let runtime = detect_container_runtime(); + if runtime != "none" { + return runtime; + } + + let mut state = load_or_migrate_v2_state(); + if state.iii_execution_context.is_none() { + state.iii_execution_context = Some(EXECUTION_CONTEXT_YAML_DEFAULT.to_string()); + write_telemetry_yaml(&state); + } + + let raw = state + .iii_execution_context + .clone() + .unwrap_or_else(|| EXECUTION_CONTEXT_YAML_DEFAULT.to_string()); + normalize_execution_context(&expand_yaml_env_syntax(&raw)) +} + pub fn read_config_key(section: &str, key: &str) -> Option { - let contents = std::fs::read_to_string(telemetry_config_path()).ok()?; - let sections: TomlSections = toml::from_str(&contents).ok()?; - sections - .get(section)? - .get(key) - .filter(|v| !v.is_empty()) - .cloned() + let state = load_or_migrate_v2_state(); + match (section, key) { + ("identity", "device_id") | ("identity", "id") => { + state.identity.device_id.filter(|v| !v.is_empty()) + } + ("state", "first_run_sent") => state.state.first_run_sent.map(|v| v.to_string()), + ("telemetry", "iii_execution_context") => state.iii_execution_context, + _ => None, + } } pub fn set_config_key(section: &str, key: &str, value: &str) { - let path = telemetry_config_path(); - let contents = std::fs::read_to_string(&path).unwrap_or_default(); - let mut sections: TomlSections = toml::from_str(&contents).unwrap_or_default(); - sections - .entry(section.to_string()) - .or_default() - .insert(key.to_string(), value.to_string()); - if let Ok(serialized) = toml::to_string(§ions) { - write_atomic(&path, &serialized); + let mut state = load_or_migrate_v2_state(); + match (section, key) { + ("identity", "device_id") | ("identity", "id") => { + state.identity.device_id = Some(value.to_string()); + } + ("state", "first_run_sent") => { + state.state.first_run_sent = Some(value.eq_ignore_ascii_case("true") || value == "1"); + } + ("telemetry", "iii_execution_context") => { + state.iii_execution_context = Some(value.to_string()); + } + _ => {} } + state.version = Some(TELEMETRY_SCHEMA_VERSION); + write_telemetry_yaml(&state); } #[derive(Debug, Clone, serde::Serialize)] pub struct EnvironmentInfo { pub machine_id: String, - pub is_container: bool, - pub container_runtime: String, + pub iii_execution_context: String, pub timezone: String, pub cpu_cores: usize, pub os: String, pub arch: String, + pub host_user_id: Option, } impl EnvironmentInfo { pub fn collect() -> Self { - let container_runtime = detect_container_runtime(); - let is_container = container_runtime != "none"; Self { - machine_id: hashed_hostname(), - is_container, - container_runtime, + machine_id: get_or_create_device_id(), + iii_execution_context: resolve_execution_context(), timezone: detect_timezone(), cpu_cores: std::thread::available_parallelism() .map(|p| p.get()) .unwrap_or(1), os: std::env::consts::OS.to_string(), arch: std::env::consts::ARCH.to_string(), + host_user_id: std::env::var("III_HOST_USER_ID") + .ok() + .filter(|s| !s.is_empty()) + .or_else(|| find_project_ini_device_id()), } } pub fn to_json(&self) -> serde_json::Value { - serde_json::json!({ + let mut obj = serde_json::json!({ "machine_id": self.machine_id, - "is_container": self.is_container, - "container_runtime": self.container_runtime, + "iii_execution_context": self.iii_execution_context, "timezone": self.timezone, "cpu_cores": self.cpu_cores, "os": self.os, "arch": self.arch, - }) + }); + if let Some(ref id) = self.host_user_id { + obj["host_user_id"] = serde_json::json!(id); + } + obj } } -fn hashed_hostname() -> String { - let raw = hostname::get() - .ok() - .and_then(|h| h.into_string().ok()) - .unwrap_or_else(|| "unknown".to_string()); - - let mut hasher = Sha256::new(); - hasher.update(raw.as_bytes()); - let result = hasher.finalize(); - hex::encode(&result[..16]) -} - -/// Detect container runtime. Returns "docker", "kubernetes", or "none". -/// Priority: III_CONTAINER env var (authoritative) > KUBERNETES_SERVICE_HOST > -/// /.dockerenv / cgroup heuristics > "none". +/// Detect whether the process runs inside a container. +/// Returns `"kubernetes"`, `"docker"`, `"container"`, or `"none"`. pub fn detect_container_runtime() -> String { - if let Ok(val) = std::env::var("III_CONTAINER") { - let lower = val.to_lowercase(); - if lower == "docker" { - return "docker".to_string(); - } - if lower == "kubernetes" || lower == "k8s" { - return "kubernetes".to_string(); - } - } - if std::env::var("KUBERNETES_SERVICE_HOST").is_ok() { return "kubernetes".to_string(); } @@ -142,7 +408,7 @@ pub fn detect_container_runtime() -> String { return "kubernetes".to_string(); } if lower.contains("docker") || lower.contains("containerd") { - return "docker".to_string(); + return "container".to_string(); } } } @@ -184,10 +450,6 @@ pub fn is_dev_optout() -> bool { return true; } - if read_config_key("preferences", "dev_optout").as_deref() == Some("true") { - return true; - } - let base_dir = dirs::home_dir().unwrap_or_else(std::env::temp_dir); base_dir.join(".iii").join("telemetry_dev_optout").exists() } @@ -239,13 +501,6 @@ pub fn detect_env() -> String { .unwrap_or_else(|| "unknown".to_string()) } -/// Read the host user ID from `III_HOST_USER_ID` (Docker correlation). -pub fn detect_host_user_id() -> Option { - std::env::var("III_HOST_USER_ID") - .ok() - .filter(|s| !s.is_empty()) -} - #[cfg(test)] mod tests { use super::*; @@ -253,237 +508,196 @@ mod tests { use std::env; // ========================================================================= - // telemetry.toml helpers + // telemetry.yaml + migration // ========================================================================= #[test] - fn test_toml_roundtrip() { - let mut sections: TomlSections = BTreeMap::new(); - sections - .entry("identity".to_string()) - .or_default() - .insert("id".to_string(), "abc-123".to_string()); - sections - .entry("state".to_string()) - .or_default() - .insert("first_run_sent".to_string(), "true".to_string()); - - let serialized = toml::to_string(§ions).unwrap(); - let parsed: TomlSections = toml::from_str(&serialized).unwrap(); - assert_eq!(parsed["identity"]["id"], "abc-123"); - assert_eq!(parsed["state"]["first_run_sent"], "true"); + #[serial] + fn test_telemetry_config_path_is_yaml() { + assert!( + telemetry_config_path() + .to_string_lossy() + .ends_with("telemetry.yaml") + ); } #[test] - fn test_set_and_read_config_key_new_file() { + #[serial] + fn test_load_or_migrate_writes_v2_yaml_with_device_id() { let dir = tempfile::tempdir().unwrap(); - let toml_path = dir.path().join(".iii").join("telemetry.toml"); - - let key = read_key_from(&toml_path, "identity", "id"); - assert!(key.is_none()); - - write_key_to(&toml_path, "identity", "id", "test-uuid"); - let key = read_key_from(&toml_path, "identity", "id"); - assert_eq!(key.as_deref(), Some("test-uuid")); + unsafe { + env::set_var("HOME", dir.path()); + } + let state = load_or_migrate_v2_state(); + assert_eq!(state.version, Some(2)); + assert!(state.identity.device_id.is_some()); + assert!(telemetry_config_path().exists()); + unsafe { + env::remove_var("HOME"); + } } #[test] - fn test_set_config_key_preserves_other_sections() { + #[serial] + fn test_v1_yaml_resets_to_fresh_v2_state() { let dir = tempfile::tempdir().unwrap(); - let toml_path = dir.path().join(".iii").join("telemetry.toml"); - - write_key_to(&toml_path, "identity", "id", "my-id"); - write_key_to(&toml_path, "state", "first_run_sent", "true"); + unsafe { + env::set_var("HOME", dir.path()); + } + let path = telemetry_config_path(); + std::fs::create_dir_all(path.parent().unwrap()).unwrap(); + std::fs::write( + &path, + r#"version: 1 +identity: + device_id: "legacy-id" +state: + first_run_sent: true +"#, + ) + .unwrap(); - let id = read_key_from(&toml_path, "identity", "id"); - let state = read_key_from(&toml_path, "state", "first_run_sent"); - assert_eq!(id.as_deref(), Some("my-id")); - assert_eq!(state.as_deref(), Some("true")); + let state = load_or_migrate_v2_state(); + assert_eq!(state.version, Some(2)); + assert_ne!(state.identity.device_id.as_deref(), Some("legacy-id")); + assert_eq!(state.state.first_run_sent, None); + unsafe { + env::remove_var("HOME"); + } } #[test] - fn test_set_config_key_updates_existing_key() { + #[serial] + fn test_set_and_read_config_key_for_state() { let dir = tempfile::tempdir().unwrap(); - let toml_path = dir.path().join(".iii").join("telemetry.toml"); - - write_key_to(&toml_path, "identity", "id", "old-id"); - write_key_to(&toml_path, "identity", "id", "new-id"); - - let id = read_key_from(&toml_path, "identity", "id"); - assert_eq!(id.as_deref(), Some("new-id")); - } - - fn read_key_from(path: &std::path::Path, section: &str, key: &str) -> Option { - let contents = std::fs::read_to_string(path).ok()?; - let sections: TomlSections = toml::from_str(&contents).ok()?; - sections - .get(section)? - .get(key) - .filter(|v| !v.is_empty()) - .cloned() - } - - fn write_key_to(path: &std::path::Path, section: &str, key: &str, value: &str) { - let contents = std::fs::read_to_string(path).unwrap_or_default(); - let mut sections: TomlSections = toml::from_str(&contents).unwrap_or_default(); - sections - .entry(section.to_string()) - .or_default() - .insert(key.to_string(), value.to_string()); - let serialized = toml::to_string(§ions).unwrap(); - if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent).ok(); + unsafe { + env::set_var("HOME", dir.path()); } - std::fs::write(path, serialized).ok(); - } - - // ========================================================================= - // EnvironmentInfo - // ========================================================================= - - #[test] - fn test_environment_info_collect_returns_valid_fields() { - let info = EnvironmentInfo::collect(); - assert!( - !info.machine_id.is_empty(), - "machine_id should not be empty" - ); - assert!(info.cpu_cores >= 1, "cpu_cores should be at least 1"); - assert!(!info.os.is_empty(), "os should not be empty"); - assert!(!info.arch.is_empty(), "arch should not be empty"); - assert!(!info.timezone.is_empty(), "timezone should not be empty"); - assert!( - !info.container_runtime.is_empty(), - "container_runtime should not be empty" + set_config_key("state", "first_run_sent", "true"); + assert_eq!( + read_config_key("state", "first_run_sent").as_deref(), + Some("true") ); + unsafe { + env::remove_var("HOME"); + } } #[test] - fn test_environment_info_os_and_arch_match_consts() { - let info = EnvironmentInfo::collect(); - assert_eq!(info.os, std::env::consts::OS); - assert_eq!(info.arch, std::env::consts::ARCH); - } - - #[test] - fn test_environment_info_to_json_has_all_keys() { - let info = EnvironmentInfo::collect(); - let json = info.to_json(); - - assert!(json.get("machine_id").is_some()); - assert!(json.get("is_container").is_some()); - assert!(json.get("container_runtime").is_some()); - assert!(json.get("timezone").is_some()); - assert!(json.get("cpu_cores").is_some()); - assert!(json.get("os").is_some()); - assert!(json.get("arch").is_some()); - } - - #[test] - fn test_environment_info_to_json_types() { - let info = EnvironmentInfo::collect(); - let json = info.to_json(); - - assert!(json["machine_id"].is_string()); - assert!(json["is_container"].is_boolean()); - assert!(json["container_runtime"].is_string()); - assert!(json["timezone"].is_string()); - assert!(json["cpu_cores"].is_number()); - assert!(json["os"].is_string()); - assert!(json["arch"].is_string()); + #[serial] + fn test_get_or_create_device_id_is_stable() { + let dir = tempfile::tempdir().unwrap(); + unsafe { + env::set_var("HOME", dir.path()); + } + let d1 = get_or_create_device_id(); + let d2 = get_or_create_device_id(); + assert!(!d1.is_empty()); + assert_eq!(d1, d2); + unsafe { + env::remove_var("HOME"); + } } #[test] - fn test_environment_info_clone() { - let info = EnvironmentInfo::collect(); - let cloned = info.clone(); - assert_eq!(info.machine_id, cloned.machine_id); - assert_eq!(info.os, cloned.os); - assert_eq!(info.arch, cloned.arch); - assert_eq!(info.cpu_cores, cloned.cpu_cores); - assert_eq!(info.is_container, cloned.is_container); - assert_eq!(info.container_runtime, cloned.container_runtime); - assert_eq!(info.timezone, cloned.timezone); - } + #[serial] + fn test_resolve_execution_context_prefers_explicit_env() { + const CI_VARS: &[&str] = &[ + "CI", + "GITHUB_ACTIONS", + "GITLAB_CI", + "CIRCLECI", + "JENKINS_URL", + "TRAVIS", + "BUILDKITE", + "TF_BUILD", + "CODEBUILD_BUILD_ID", + "BITBUCKET_BUILD_NUMBER", + "DRONE", + "TEAMCITY_VERSION", + ]; - #[test] - fn test_environment_info_debug_format() { - let info = EnvironmentInfo::collect(); - let debug_str = format!("{:?}", info); - assert!(debug_str.contains("EnvironmentInfo")); - assert!(debug_str.contains("machine_id")); - assert!(debug_str.contains("os")); - } + let saved: Vec<(&str, Option)> = + CI_VARS.iter().map(|v| (*v, env::var(v).ok())).collect(); - #[test] - fn test_is_container_consistent_with_runtime() { - let info = EnvironmentInfo::collect(); - if info.container_runtime == "none" { - assert!(!info.is_container); - } else { - assert!(info.is_container); + unsafe { + for var in CI_VARS { + env::remove_var(var); + } + env::set_var(EXECUTION_CONTEXT_ENV, "docker"); } - } - // ========================================================================= - // hashed_hostname - // ========================================================================= + assert_eq!(resolve_execution_context(), "docker"); - #[test] - fn test_hashed_hostname_is_deterministic() { - let h1 = hashed_hostname(); - let h2 = hashed_hostname(); - assert_eq!( - h1, h2, - "hashed_hostname should return same value on repeated calls" - ); + unsafe { + env::remove_var(EXECUTION_CONTEXT_ENV); + for (var, val) in &saved { + match val { + Some(v) => env::set_var(var, v), + None => env::remove_var(var), + } + } + } } #[test] - fn test_hashed_hostname_is_hex_and_32_chars() { - let h = hashed_hostname(); - assert_eq!(h.len(), 32, "hashed hostname should be 32 hex characters"); - assert!( - h.chars().all(|c| c.is_ascii_hexdigit()), - "hashed hostname should only contain hex characters" - ); + #[serial] + fn test_resolve_execution_context_detects_ci() { + unsafe { + env::remove_var(EXECUTION_CONTEXT_ENV); + env::set_var("CI", "true"); + } + assert_eq!(resolve_execution_context(), "ci"); + unsafe { + env::remove_var("CI"); + } } - // ========================================================================= - // detect_container_runtime - // ========================================================================= - #[test] #[serial] - fn test_detect_container_runtime_env_docker() { + fn test_resolve_execution_context_ci_overrides_env_var() { unsafe { - env::set_var("III_CONTAINER", "docker"); - env::remove_var("KUBERNETES_SERVICE_HOST"); + env::set_var(EXECUTION_CONTEXT_ENV, "docker"); + env::set_var("CI", "true"); } - assert_eq!(detect_container_runtime(), "docker"); + assert_eq!(resolve_execution_context(), "ci"); unsafe { - env::remove_var("III_CONTAINER"); + env::remove_var(EXECUTION_CONTEXT_ENV); + env::remove_var("CI"); } } + // ========================================================================= + // EnvironmentInfo + // ========================================================================= + #[test] #[serial] - fn test_detect_container_runtime_env_kubernetes() { + fn test_environment_info_collect_returns_valid_fields() { + let dir = tempfile::tempdir().unwrap(); unsafe { - env::set_var("III_CONTAINER", "kubernetes"); - env::remove_var("KUBERNETES_SERVICE_HOST"); + env::set_var("HOME", dir.path()); } - assert_eq!(detect_container_runtime(), "kubernetes"); + let info = EnvironmentInfo::collect(); + assert!(!info.machine_id.is_empty()); + assert!(!info.iii_execution_context.is_empty()); + assert!(info.cpu_cores >= 1); + assert!(!info.os.is_empty()); + assert!(!info.arch.is_empty()); + assert!(!info.timezone.is_empty()); unsafe { - env::remove_var("III_CONTAINER"); + env::remove_var("HOME"); } } + // ========================================================================= + // detect_container_runtime + // ========================================================================= + #[test] #[serial] - fn test_detect_container_runtime_kubernetes_service_host() { + fn test_detect_container_runtime_kubernetes_env() { unsafe { - env::remove_var("III_CONTAINER"); env::set_var("KUBERNETES_SERVICE_HOST", "10.96.0.1"); } assert_eq!(detect_container_runtime(), "kubernetes"); @@ -496,12 +710,14 @@ mod tests { #[serial] fn test_detect_container_runtime_none_on_host() { unsafe { - env::remove_var("III_CONTAINER"); env::remove_var("KUBERNETES_SERVICE_HOST"); } let runtime = detect_container_runtime(); assert!( - runtime == "none" || runtime == "docker" || runtime == "kubernetes", + runtime == "none" + || runtime == "docker" + || runtime == "container" + || runtime == "kubernetes", "unexpected runtime: {runtime}" ); } @@ -543,43 +759,6 @@ mod tests { } } - // ========================================================================= - // detect_host_user_id - // ========================================================================= - - #[test] - #[serial] - fn test_detect_host_user_id_none_when_unset() { - unsafe { - env::remove_var("III_HOST_USER_ID"); - } - assert_eq!(detect_host_user_id(), None); - } - - #[test] - #[serial] - fn test_detect_host_user_id_returns_value() { - unsafe { - env::set_var("III_HOST_USER_ID", "some-uuid"); - } - assert_eq!(detect_host_user_id(), Some("some-uuid".to_string())); - unsafe { - env::remove_var("III_HOST_USER_ID"); - } - } - - #[test] - #[serial] - fn test_detect_host_user_id_none_when_empty() { - unsafe { - env::set_var("III_HOST_USER_ID", ""); - } - assert_eq!(detect_host_user_id(), None); - unsafe { - env::remove_var("III_HOST_USER_ID"); - } - } - // ========================================================================= // detect_install_method // ========================================================================= @@ -823,15 +1002,6 @@ mod tests { } } - // ========================================================================= - // detect_container (legacy alias via collect) - // ========================================================================= - - #[test] - fn test_detect_container_on_host() { - let _result = EnvironmentInfo::collect().is_container; - } - // ========================================================================= // detect_timezone // ========================================================================= diff --git a/engine/src/modules/telemetry/mod.rs b/engine/src/modules/telemetry/mod.rs index a05a45e46..c6e1f962a 100644 --- a/engine/src/modules/telemetry/mod.rs +++ b/engine/src/modules/telemetry/mod.rs @@ -22,7 +22,6 @@ use crate::modules::module::Module; use crate::workers::WorkerTelemetryMeta; use self::amplitude::{AmplitudeClient, AmplitudeEvent}; -use self::collector::collector; use self::environment::EnvironmentInfo; const API_KEY: &str = "a7182ac460dde671c8f2e1318b517228"; @@ -132,35 +131,8 @@ fn resolve_project_context(sdk_telemetry: Option<&WorkerTelemetryMeta>) -> Proje } } -fn get_or_create_install_id() -> String { - static INSTALL_ID: std::sync::OnceLock = std::sync::OnceLock::new(); - INSTALL_ID - .get_or_init(|| { - if let Some(id) = environment::read_config_key("identity", "id") { - return id; - } - - let base_dir = dirs::home_dir().unwrap_or_else(|| { - tracing::warn!( - "Failed to resolve home directory, falling back to temp dir for telemetry_id" - ); - std::env::temp_dir() - }); - - let legacy_path = base_dir.join(".iii").join("telemetry_id"); - if let Ok(id) = std::fs::read_to_string(&legacy_path) { - let id = id.trim().to_string(); - if !id.is_empty() { - environment::set_config_key("identity", "id", &id); - return id; - } - } - - let id = format!("auto-{}", uuid::Uuid::new_v4()); - environment::set_config_key("identity", "id", &id); - id - }) - .clone() +fn get_or_create_device_id() -> String { + environment::get_or_create_device_id() } fn check_and_mark_first_run() -> bool { @@ -168,17 +140,6 @@ fn check_and_mark_first_run() -> bool { return false; } - let legacy_path = dirs::home_dir() - .unwrap_or_else(std::env::temp_dir) - .join(".iii") - .join("state.ini"); - if let Ok(contents) = std::fs::read_to_string(&legacy_path) - && contents.contains("first_run_sent=true") - { - environment::set_config_key("state", "first_run_sent", "true"); - return false; - } - environment::set_config_key("state", "first_run_sent", "true"); true } @@ -231,6 +192,188 @@ struct FunctionTriggerData { functions_non_iii_builtin_count: usize, } +struct EngineSnapshot { + ft: FunctionTriggerData, + wd: WorkerData, + project: ProjectContext, +} + +fn collect_engine_snapshot(engine: &Engine) -> EngineSnapshot { + let ft = collect_functions_and_triggers(engine); + let wd = collect_worker_data(engine); + let project = resolve_project_context(wd.sdk_telemetry.as_ref()); + EngineSnapshot { ft, wd, project } +} + +fn build_base_properties(snap: &EngineSnapshot) -> serde_json::Map { + let mut m = serde_json::Map::new(); + m.insert( + "project_id".into(), + serde_json::json!(snap.project.project_id), + ); + m.insert( + "project_name".into(), + serde_json::json!(snap.project.project_name), + ); + m.insert( + "version".into(), + serde_json::json!(env!("CARGO_PKG_VERSION")), + ); + m.insert( + "function_count".into(), + serde_json::json!(snap.ft.function_count), + ); + m.insert( + "trigger_count".into(), + serde_json::json!(snap.ft.trigger_count), + ); + m.insert("functions".into(), serde_json::json!(snap.ft.functions)); + m.insert( + "trigger_types".into(), + serde_json::json!(snap.ft.trigger_types), + ); + m.insert( + "functions_iii_builtin_count".into(), + serde_json::json!(snap.ft.functions_iii_builtin_count), + ); + m.insert( + "functions_non_iii_builtin_count".into(), + serde_json::json!(snap.ft.functions_non_iii_builtin_count), + ); + m.insert("client_type".into(), serde_json::json!(snap.wd.client_type)); + m.insert( + "sdk_languages".into(), + serde_json::json!(snap.wd.sdk_languages), + ); + m.insert( + "worker_count_total".into(), + serde_json::json!(snap.wd.worker_count_total), + ); + for (fw, count) in &snap.wd.worker_count_by_framework { + m.insert(format!("worker_count_{fw}"), serde_json::json!(count)); + } + m.insert("workers".into(), serde_json::json!(snap.wd.workers)); + m +} + +// TODO: Re-enable delta metrics reporting once more important dashboards are ready. +// +// struct DeltaAccumulator { +// invocations_total: u64, +// invocations_success: u64, +// invocations_error: u64, +// api_requests: u64, +// queue_emits: u64, +// queue_consumes: u64, +// pubsub_publishes: u64, +// pubsub_subscribes: u64, +// cron_executions: u64, +// } +// +// impl DeltaAccumulator { +// fn new() -> Self { +// Self { +// invocations_total: 0, +// invocations_success: 0, +// invocations_error: 0, +// api_requests: 0, +// queue_emits: 0, +// queue_consumes: 0, +// pubsub_publishes: 0, +// pubsub_subscribes: 0, +// cron_executions: 0, +// } +// } +// +// fn snapshot(&mut self) -> DeltaSnapshot { +// use std::sync::atomic::Ordering; +// let acc = crate::modules::observability::metrics::get_metrics_accumulator(); +// let col = collector(); +// +// let cur = DeltaAccumulator { +// invocations_total: acc.invocations_total.load(Ordering::Relaxed), +// invocations_success: acc.invocations_success.load(Ordering::Relaxed), +// invocations_error: acc.invocations_error.load(Ordering::Relaxed), +// api_requests: col.api_requests.load(Ordering::Relaxed), +// queue_emits: col.queue_emits.load(Ordering::Relaxed), +// queue_consumes: col.queue_consumes.load(Ordering::Relaxed), +// pubsub_publishes: col.pubsub_publishes.load(Ordering::Relaxed), +// pubsub_subscribes: col.pubsub_subscribes.load(Ordering::Relaxed), +// cron_executions: col.cron_executions.load(Ordering::Relaxed), +// }; +// +// let deltas = DeltaSnapshot { +// invocations_total: cur.invocations_total.saturating_sub(self.invocations_total), +// invocations_success: cur +// .invocations_success +// .saturating_sub(self.invocations_success), +// invocations_error: cur.invocations_error.saturating_sub(self.invocations_error), +// api_requests: cur.api_requests.saturating_sub(self.api_requests), +// queue_emits: cur.queue_emits.saturating_sub(self.queue_emits), +// queue_consumes: cur.queue_consumes.saturating_sub(self.queue_consumes), +// pubsub_publishes: cur.pubsub_publishes.saturating_sub(self.pubsub_publishes), +// pubsub_subscribes: cur.pubsub_subscribes.saturating_sub(self.pubsub_subscribes), +// cron_executions: cur.cron_executions.saturating_sub(self.cron_executions), +// }; +// +// *self = cur; +// deltas +// } +// } +// +// struct DeltaSnapshot { +// invocations_total: u64, +// invocations_success: u64, +// invocations_error: u64, +// api_requests: u64, +// queue_emits: u64, +// queue_consumes: u64, +// pubsub_publishes: u64, +// pubsub_subscribes: u64, +// cron_executions: u64, +// } +// +// impl DeltaSnapshot { +// fn insert_into(&self, m: &mut serde_json::Map) { +// m.insert( +// "delta_invocations_total".into(), +// serde_json::json!(self.invocations_total), +// ); +// m.insert( +// "delta_invocations_success".into(), +// serde_json::json!(self.invocations_success), +// ); +// m.insert( +// "delta_invocations_error".into(), +// serde_json::json!(self.invocations_error), +// ); +// m.insert( +// "delta_api_requests".into(), +// serde_json::json!(self.api_requests), +// ); +// m.insert( +// "delta_queue_emits".into(), +// serde_json::json!(self.queue_emits), +// ); +// m.insert( +// "delta_queue_consumes".into(), +// serde_json::json!(self.queue_consumes), +// ); +// m.insert( +// "delta_pubsub_publishes".into(), +// serde_json::json!(self.pubsub_publishes), +// ); +// m.insert( +// "delta_pubsub_subscribes".into(), +// serde_json::json!(self.pubsub_subscribes), +// ); +// m.insert( +// "delta_cron_executions".into(), +// serde_json::json!(self.cron_executions), +// ); +// } +// } + fn collect_functions_and_triggers(engine: &Engine) -> FunctionTriggerData { let mut functions_iii_builtin_count = 0usize; let mut functions_non_iii_builtin_count = 0usize; @@ -273,8 +416,7 @@ fn collect_functions_and_triggers(engine: &Engine) -> FunctionTriggerData { struct WorkerData { worker_count_total: usize, - worker_count_motia: usize, - worker_count_non_iii_sdk_framework: usize, + worker_count_by_framework: HashMap, worker_count_by_language: HashMap, workers: Vec, sdk_languages: Vec, @@ -284,9 +426,9 @@ struct WorkerData { fn collect_worker_data(engine: &Engine) -> WorkerData { let mut runtime_counts: HashMap = HashMap::new(); + let mut framework_counts: HashMap = HashMap::new(); let mut best_telemetry: Option<(uuid::Uuid, WorkerTelemetryMeta)> = None; let mut worker_count_total = 0usize; - let mut worker_count_motia = 0usize; let mut workers: Vec = Vec::new(); for entry in engine.worker_registry.workers.iter() { @@ -305,17 +447,11 @@ fn collect_worker_data(engine: &Engine) -> WorkerData { .and_then(|t| t.framework.clone()) .unwrap_or_default(); - if framework.to_lowercase().contains("motia") - || framework == "iii-js" - || framework == "iii-py" - { - worker_count_motia += 1; - } - - if framework.is_empty() { - workers.push(runtime); - } else { + if !framework.is_empty() { + *framework_counts.entry(framework.clone()).or_insert(0) += 1; workers.push(format!("{}:{}", runtime, framework)); + } else { + workers.push(runtime); } if let Some(telemetry) = worker.telemetry.as_ref() @@ -340,19 +476,16 @@ fn collect_worker_data(engine: &Engine) -> WorkerData { let sdk_languages: Vec = runtime_counts .keys() .map(|r| match r.as_str() { - "node" => "iii-js".to_string(), + "node" => "iii-node".to_string(), "python" => "iii-py".to_string(), "rust" => "iii-rust".to_string(), other => other.to_string(), }) .collect(); - let worker_count_non_iii_sdk_framework = worker_count_total.saturating_sub(worker_count_motia); - WorkerData { worker_count_total, - worker_count_motia, - worker_count_non_iii_sdk_framework, + worker_count_by_framework: framework_counts, worker_count_by_language: runtime_counts, workers, sdk_languages, @@ -364,7 +497,7 @@ fn collect_worker_data(engine: &Engine) -> WorkerData { /// Cloneable context for building telemetry events inside spawned tasks. #[derive(Clone)] struct TelemetryContext { - install_id: String, + device_id: String, env_info: EnvironmentInfo, } @@ -382,16 +515,21 @@ impl TelemetryContext { "environment.cpu_cores": env.cpu_cores, "environment.timezone": env.timezone, "environment.machine_id": env.machine_id, - "environment.is_container": env.is_container, - "environment.container_runtime": env.container_runtime, + "iii_execution_context": env.iii_execution_context, "env": environment::detect_env(), "install_method": environment::detect_install_method(), "iii_version": env!("CARGO_PKG_VERSION"), }); - if let Some(host_user_id) = environment::detect_host_user_id() { - props["host_user_id"] = serde_json::Value::String(host_user_id); + let host_user_id = std::env::var("III_HOST_USER_ID") + .ok() + .filter(|s| !s.is_empty()) + .or_else(environment::find_project_ini_device_id); + + if let Some(id) = host_user_id { + props["host_user_id"] = serde_json::Value::String(id); } + if let Some(project_id) = project.project_id { props["project_id"] = serde_json::Value::String(project_id); } @@ -412,9 +550,8 @@ impl TelemetryContext { .and_then(|t| t.language.clone()) .or_else(environment::detect_language); AmplitudeEvent { - device_id: self.install_id.clone(), - // user_id: currently telemetry_id, will become iii cloud user ID when accounts ship - user_id: Some(self.install_id.clone()), + device_id: self.device_id.clone(), + user_id: None, event_type: event_type.to_string(), event_properties: properties, user_properties: Some(self.build_user_properties(sdk_telemetry)), @@ -507,7 +644,7 @@ impl Module for TelemetryModule { return Ok(Box::new(DisabledTelemetryModule)); } - let install_id = get_or_create_install_id(); + let device_id = get_or_create_device_id(); let env_info = EnvironmentInfo::collect(); tracing::info!("Anonymous telemetry enabled. Set III_TELEMETRY_ENABLED=false to disable."); @@ -521,7 +658,7 @@ impl Module for TelemetryModule { .map(|key| Arc::new(AmplitudeClient::new(key.to_owned()))); let ctx = TelemetryContext { - install_id: install_id.clone(), + device_id: device_id.clone(), env_info, }; @@ -553,14 +690,10 @@ impl Module for TelemetryModule { let engine_for_started = Arc::clone(&self.engine); let client_for_started = Arc::clone(self.active_client()); let ctx_for_started = self.ctx.clone(); - let start_time_boot = start_time; - let interval_secs_boot = interval_secs; tokio::spawn(async move { - tokio::time::sleep(std::time::Duration::from_secs(2)).await; + tokio::time::sleep(std::time::Duration::from_secs(120)).await; - let ft = collect_functions_and_triggers(&engine_for_started); - let wd = collect_worker_data(&engine_for_started); - let project = resolve_project_context(wd.sdk_telemetry.as_ref()); + let snap = collect_engine_snapshot(&engine_for_started); if check_and_mark_first_run() { let first_run_event = ctx_for_started.build_event( @@ -571,48 +704,32 @@ impl Module for TelemetryModule { "arch": std::env::consts::ARCH, "install_method": environment::detect_install_method(), }), - wd.sdk_telemetry.as_ref(), + snap.wd.sdk_telemetry.as_ref(), ); let _ = client_for_started.send_event(first_run_event).await; } - let uptime_secs = start_time_boot.elapsed().as_secs(); + let mut props = build_base_properties(&snap); + props.insert("session_start".into(), serde_json::json!(true)); + props.insert( + "worker_count_by_language".into(), + serde_json::json!(snap.wd.worker_count_by_language), + ); + props.insert("period_secs".into(), serde_json::json!(interval_secs)); + props.insert( + "uptime_secs".into(), + serde_json::json!(start_time.elapsed().as_secs()), + ); + // TODO: Re-enable delta metrics once more important dashboards are ready. + // let d = DeltaAccumulator::new().snapshot(); + // props.insert("is_active".into(), serde_json::json!(d.invocations_total > 0)); + // d.insert_into(&mut props); + let boot_heartbeat = ctx_for_started.build_event( "heartbeat", - serde_json::json!({ - "session_start": true, - "project_id": project.project_id, - "project_name": project.project_name, - "version": env!("CARGO_PKG_VERSION"), - "function_count": ft.function_count, - "trigger_count": ft.trigger_count, - "functions": ft.functions, - "trigger_types": ft.trigger_types, - "functions_iii_builtin_count": ft.functions_iii_builtin_count, - "functions_non_iii_builtin_count": ft.functions_non_iii_builtin_count, - "client_type": wd.client_type, - "sdk_languages": wd.sdk_languages, - "worker_count_total": wd.worker_count_total, - "worker_count_motia": wd.worker_count_motia, - "worker_count_non_iii_sdk_framework": wd.worker_count_non_iii_sdk_framework, - "worker_count_by_language": wd.worker_count_by_language, - "workers": wd.workers, - "delta_invocations_total": 0u64, - "delta_invocations_success": 0u64, - "delta_invocations_error": 0u64, - "delta_api_requests": 0u64, - "delta_queue_emits": 0u64, - "delta_queue_consumes": 0u64, - "delta_pubsub_publishes": 0u64, - "delta_pubsub_subscribes": 0u64, - "delta_cron_executions": 0u64, - "period_secs": interval_secs_boot, - "uptime_secs": uptime_secs, - "is_active": false, - }), - wd.sdk_telemetry.as_ref(), + serde_json::Value::Object(props), + snap.wd.sdk_telemetry.as_ref(), ); - let _ = client_for_started.send_event(boot_heartbeat).await; }); @@ -622,57 +739,23 @@ impl Module for TelemetryModule { interval.tick().await; - let acc = crate::modules::observability::metrics::get_metrics_accumulator(); - - let mut prev_invocations_total: u64 = 0; - let mut prev_invocations_success: u64 = 0; - let mut prev_invocations_error: u64 = 0; - let mut prev_api_requests: u64 = 0; - let mut prev_queue_emits: u64 = 0; - let mut prev_queue_consumes: u64 = 0; - let mut prev_pubsub_publishes: u64 = 0; - let mut prev_pubsub_subscribes: u64 = 0; - let mut prev_cron_executions: u64 = 0; + // TODO: Re-enable delta metrics once downstream dashboards are ready. + // let mut deltas = DeltaAccumulator::new(); loop { tokio::select! { result = shutdown_rx.changed() => { if result.is_err() || *shutdown_rx.borrow() { - use std::sync::atomic::Ordering; - let uptime_secs = start_time.elapsed().as_secs(); - let invocations_total = acc.invocations_total.load(Ordering::Relaxed); - let invocations_success = acc.invocations_success.load(Ordering::Relaxed); - let invocations_error = acc.invocations_error.load(Ordering::Relaxed); + let snap = collect_engine_snapshot(&engine); - let wd = collect_worker_data(&engine); - let ft = collect_functions_and_triggers(&engine); - let project = resolve_project_context(wd.sdk_telemetry.as_ref()); + let mut props = build_base_properties(&snap); + props.insert("uptime_secs".into(), serde_json::json!(start_time.elapsed().as_secs())); let event = ctx.build_event( "engine_stopped", - serde_json::json!({ - "project_id": project.project_id, - "project_name": project.project_name, - "version": env!("CARGO_PKG_VERSION"), - "uptime_secs": uptime_secs, - "invocations_total": invocations_total, - "invocations_success": invocations_success, - "invocations_error": invocations_error, - "function_count": ft.function_count, - "trigger_count": ft.trigger_count, - "functions": ft.functions, - "trigger_types": ft.trigger_types, - "functions_iii_builtin_count": ft.functions_iii_builtin_count, - "functions_non_iii_builtin_count": ft.functions_non_iii_builtin_count, - "client_type": wd.client_type, - "sdk_languages": wd.sdk_languages, - "worker_count_total": wd.worker_count_total, - "worker_count_motia": wd.worker_count_motia, - "worker_count_non_iii_sdk_framework": wd.worker_count_non_iii_sdk_framework, - "workers": wd.workers, - }), - wd.sdk_telemetry.as_ref(), + serde_json::Value::Object(props), + snap.wd.sdk_telemetry.as_ref(), ); let _ = tokio::time::timeout( @@ -685,80 +768,21 @@ impl Module for TelemetryModule { } } _ = interval.tick() => { - use std::sync::atomic::Ordering; - - let invocations_total = acc.invocations_total.load(Ordering::Relaxed); - let invocations_success = acc.invocations_success.load(Ordering::Relaxed); - let invocations_error = acc.invocations_error.load(Ordering::Relaxed); - let api_requests = collector().api_requests.load(Ordering::Relaxed); - let queue_emits = collector().queue_emits.load(Ordering::Relaxed); - let queue_consumes = collector().queue_consumes.load(Ordering::Relaxed); - let pubsub_publishes = collector().pubsub_publishes.load(Ordering::Relaxed); - let pubsub_subscribes = collector().pubsub_subscribes.load(Ordering::Relaxed); - let cron_executions = collector().cron_executions.load(Ordering::Relaxed); - - let delta_invocations_total = invocations_total.saturating_sub(prev_invocations_total); - let delta_invocations_success = invocations_success.saturating_sub(prev_invocations_success); - let delta_invocations_error = invocations_error.saturating_sub(prev_invocations_error); - let delta_api_requests = api_requests.saturating_sub(prev_api_requests); - let delta_queue_emits = queue_emits.saturating_sub(prev_queue_emits); - let delta_queue_consumes = queue_consumes.saturating_sub(prev_queue_consumes); - let delta_pubsub_publishes = pubsub_publishes.saturating_sub(prev_pubsub_publishes); - let delta_pubsub_subscribes = pubsub_subscribes.saturating_sub(prev_pubsub_subscribes); - let delta_cron_executions = cron_executions.saturating_sub(prev_cron_executions); - - prev_invocations_total = invocations_total; - prev_invocations_success = invocations_success; - prev_invocations_error = invocations_error; - prev_api_requests = api_requests; - prev_queue_emits = queue_emits; - prev_queue_consumes = queue_consumes; - prev_pubsub_publishes = pubsub_publishes; - prev_pubsub_subscribes = pubsub_subscribes; - prev_cron_executions = cron_executions; - - let is_active = delta_invocations_total > 0; - let uptime_secs = start_time.elapsed().as_secs(); - - let ft = collect_functions_and_triggers(&engine); - let wd = collect_worker_data(&engine); - let project = resolve_project_context(wd.sdk_telemetry.as_ref()); - - let properties = serde_json::json!({ - "session_start": false, - "delta_invocations_total": delta_invocations_total, - "delta_invocations_success": delta_invocations_success, - "delta_invocations_error": delta_invocations_error, - "delta_api_requests": delta_api_requests, - "delta_queue_emits": delta_queue_emits, - "delta_queue_consumes": delta_queue_consumes, - "delta_pubsub_publishes": delta_pubsub_publishes, - "delta_pubsub_subscribes": delta_pubsub_subscribes, - "delta_cron_executions": delta_cron_executions, - "function_count": ft.function_count, - "trigger_count": ft.trigger_count, - "functions": ft.functions, - "trigger_types": ft.trigger_types, - "functions_iii_builtin_count": ft.functions_iii_builtin_count, - "functions_non_iii_builtin_count": ft.functions_non_iii_builtin_count, - "worker_count_total": wd.worker_count_total, - "worker_count_motia": wd.worker_count_motia, - "worker_count_non_iii_sdk_framework": wd.worker_count_non_iii_sdk_framework, - "worker_count_by_language": wd.worker_count_by_language, - "workers": wd.workers, - "sdk_languages": wd.sdk_languages, - "client_type": wd.client_type, - "project_id": project.project_id, - "project_name": project.project_name, - "period_secs": interval_secs, - "uptime_secs": uptime_secs, - "is_active": is_active, - }); + // let d = deltas.snapshot(); + let snap = collect_engine_snapshot(&engine); + + let mut props = build_base_properties(&snap); + props.insert("session_start".into(), serde_json::json!(false)); + props.insert("worker_count_by_language".into(), serde_json::json!(snap.wd.worker_count_by_language)); + props.insert("period_secs".into(), serde_json::json!(interval_secs)); + props.insert("uptime_secs".into(), serde_json::json!(start_time.elapsed().as_secs())); + // props.insert("is_active".into(), serde_json::json!(d.invocations_total > 0)); + // d.insert_into(&mut props); let event = ctx.build_event( "heartbeat", - properties, - wd.sdk_telemetry.as_ref(), + serde_json::Value::Object(props), + snap.wd.sdk_telemetry.as_ref(), ); let _ = client.send_event(event).await; @@ -898,12 +922,12 @@ mod tests { fn make_env_info() -> EnvironmentInfo { EnvironmentInfo { machine_id: "machine-1".to_string(), - is_container: false, - container_runtime: "none".to_string(), + iii_execution_context: "user".to_string(), timezone: "UTC".to_string(), cpu_cores: 4, os: "linux".to_string(), arch: "x86_64".to_string(), + host_user_id: None, } } @@ -922,7 +946,7 @@ mod tests { client: Arc::new(AmplitudeClient::new(String::new())), sdk_client: sdk_client.then(|| Arc::new(AmplitudeClient::new(String::new()))), ctx: TelemetryContext { - install_id: "test-install-id".to_string(), + device_id: "test-install-id".to_string(), env_info: make_env_info(), }, start_time: Instant::now(), @@ -1238,20 +1262,19 @@ mod tests { unsafe { env::remove_var("III_PROJECT_ID"); env::remove_var("III_PROJECT_ROOT"); - env::remove_var("III_HOST_USER_ID"); env::remove_var("III_ENV"); } let ctx = TelemetryContext { - install_id: "id-1".to_string(), + device_id: "id-1".to_string(), env_info: EnvironmentInfo { machine_id: "test-machine".to_string(), - is_container: false, - container_runtime: "none".to_string(), + iii_execution_context: "user".to_string(), timezone: "UTC".to_string(), cpu_cores: 4, os: "linux".to_string(), arch: "x86_64".to_string(), + host_user_id: None, }, }; @@ -1262,8 +1285,7 @@ mod tests { assert_eq!(props["environment.cpu_cores"], 4); assert_eq!(props["environment.timezone"], "UTC"); assert_eq!(props["environment.machine_id"], "test-machine"); - assert_eq!(props["environment.is_container"], false); - assert_eq!(props["environment.container_runtime"], "none"); + assert_eq!(props["iii_execution_context"], "user"); assert_eq!(props["iii_version"], env!("CARGO_PKG_VERSION")); assert!(props.get("env").is_some()); assert!(props.get("install_method").is_some()); @@ -1286,7 +1308,7 @@ mod tests { } let ctx = TelemetryContext { - install_id: "id-1".to_string(), + device_id: "id-1".to_string(), env_info: make_env_info(), }; @@ -1303,7 +1325,7 @@ mod tests { } let ctx = TelemetryContext { - install_id: "id-1".to_string(), + device_id: "id-1".to_string(), env_info: make_env_info(), }; @@ -1324,7 +1346,7 @@ mod tests { } let ctx = TelemetryContext { - install_id: "id-1".to_string(), + device_id: "id-1".to_string(), env_info: make_env_info(), }; @@ -1338,46 +1360,6 @@ mod tests { assert_eq!(props["project_name"], "my-project"); } - #[test] - #[serial] - fn test_build_user_properties_host_user_id_included() { - unsafe { - env::set_var("III_HOST_USER_ID", "host-uuid-123"); - env::remove_var("III_PROJECT_ID"); - env::remove_var("III_PROJECT_ROOT"); - } - - let ctx = TelemetryContext { - install_id: "id-1".to_string(), - env_info: make_env_info(), - }; - - let props = ctx.build_user_properties(None); - assert_eq!(props["host_user_id"], "host-uuid-123"); - - unsafe { - env::remove_var("III_HOST_USER_ID"); - } - } - - #[test] - #[serial] - fn test_build_user_properties_host_user_id_absent_when_unset() { - unsafe { - env::remove_var("III_HOST_USER_ID"); - env::remove_var("III_PROJECT_ID"); - env::remove_var("III_PROJECT_ROOT"); - } - - let ctx = TelemetryContext { - install_id: "id-1".to_string(), - env_info: make_env_info(), - }; - - let props = ctx.build_user_properties(None); - assert!(props.get("host_user_id").is_none()); - } - // ========================================================================= // AmplitudeEvent serialization (via TelemetryContext::build_event) // ========================================================================= @@ -1385,22 +1367,22 @@ mod tests { #[test] fn test_build_event_basic_fields() { let ctx = TelemetryContext { - install_id: "test-install-id".to_string(), + device_id: "test-install-id".to_string(), env_info: EnvironmentInfo { machine_id: "abc123".to_string(), - is_container: false, - container_runtime: "none".to_string(), + iii_execution_context: "user".to_string(), timezone: "UTC".to_string(), cpu_cores: 4, os: "linux".to_string(), arch: "x86_64".to_string(), + host_user_id: None, }, }; let event = ctx.build_event("test_event", serde_json::json!({"key": "value"}), None); assert_eq!(event.device_id, "test-install-id"); - assert_eq!(event.user_id, Some("test-install-id".to_string())); + assert_eq!(event.user_id, None); assert_eq!(event.event_type, "test_event"); assert_eq!(event.event_properties["key"], "value"); assert_eq!(event.platform, "iii-engine"); @@ -1413,15 +1395,15 @@ mod tests { #[test] fn test_build_event_with_sdk_telemetry_language() { let ctx = TelemetryContext { - install_id: "id-1".to_string(), + device_id: "id-1".to_string(), env_info: EnvironmentInfo { machine_id: "m1".to_string(), - is_container: false, - container_runtime: "none".to_string(), + iii_execution_context: "user".to_string(), timezone: "UTC".to_string(), cpu_cores: 2, os: "macos".to_string(), arch: "aarch64".to_string(), + host_user_id: None, }, }; @@ -1438,7 +1420,7 @@ mod tests { #[test] fn test_build_event_insert_id_is_unique() { let ctx = TelemetryContext { - install_id: "id-1".to_string(), + device_id: "id-1".to_string(), env_info: make_env_info(), }; @@ -1453,7 +1435,7 @@ mod tests { #[test] fn test_build_event_app_version_matches_cargo_pkg() { let ctx = TelemetryContext { - install_id: "id-test".to_string(), + device_id: "id-test".to_string(), env_info: make_env_info(), }; @@ -1464,7 +1446,7 @@ mod tests { #[test] fn test_build_event_country_is_none() { let ctx = TelemetryContext { - install_id: "id-test".to_string(), + device_id: "id-test".to_string(), env_info: make_env_info(), }; @@ -1475,7 +1457,7 @@ mod tests { #[test] fn test_build_event_user_properties_is_some() { let ctx = TelemetryContext { - install_id: "id-test".to_string(), + device_id: "id-test".to_string(), env_info: make_env_info(), }; @@ -1492,7 +1474,7 @@ mod tests { } let ctx = TelemetryContext { - install_id: "id-test".to_string(), + device_id: "id-test".to_string(), env_info: make_env_info(), }; @@ -1509,7 +1491,7 @@ mod tests { } let ctx = TelemetryContext { - install_id: "id-test".to_string(), + device_id: "id-test".to_string(), env_info: make_env_info(), }; @@ -1524,7 +1506,7 @@ mod tests { #[test] fn test_build_event_timestamp_is_recent() { let ctx = TelemetryContext { - install_id: "id-test".to_string(), + device_id: "id-test".to_string(), env_info: make_env_info(), }; @@ -1540,25 +1522,24 @@ mod tests { #[test] fn test_telemetry_context_clone() { let ctx = TelemetryContext { - install_id: "clone-test-id".to_string(), + device_id: "clone-test-id".to_string(), env_info: EnvironmentInfo { machine_id: "m1".to_string(), - is_container: true, - container_runtime: "docker".to_string(), + iii_execution_context: "docker".to_string(), timezone: "America/Chicago".to_string(), cpu_cores: 16, os: "linux".to_string(), arch: "x86_64".to_string(), + host_user_id: None, }, }; let cloned = ctx.clone(); - assert_eq!(cloned.install_id, ctx.install_id); + assert_eq!(cloned.device_id, ctx.device_id); assert_eq!(cloned.env_info.machine_id, ctx.env_info.machine_id); - assert_eq!(cloned.env_info.is_container, ctx.env_info.is_container); assert_eq!( - cloned.env_info.container_runtime, - ctx.env_info.container_runtime + cloned.env_info.iii_execution_context, + ctx.env_info.iii_execution_context ); assert_eq!(cloned.env_info.timezone, ctx.env_info.timezone); assert_eq!(cloned.env_info.cpu_cores, ctx.env_info.cpu_cores); @@ -1669,8 +1650,7 @@ mod tests { let wd = collect_worker_data(&engine); assert_eq!(wd.worker_count_total, 0); - assert_eq!(wd.worker_count_motia, 0); - assert_eq!(wd.worker_count_non_iii_sdk_framework, 0); + assert!(wd.worker_count_by_framework.is_empty()); assert!(wd.sdk_telemetry.is_none()); assert!(wd.sdk_languages.is_empty()); } @@ -1685,7 +1665,7 @@ mod tests { worker1.telemetry = Some(WorkerTelemetryMeta { language: Some("typescript".to_string()), project_name: Some("proj-a".to_string()), - framework: Some("iii-js".to_string()), + framework: Some("iii-node".to_string()), }); let w1_id = worker1.id; engine.worker_registry.workers.insert(w1_id, worker1); @@ -1700,14 +1680,13 @@ mod tests { let wd = collect_worker_data(&engine); assert_eq!(wd.worker_count_total, 2); - assert_eq!(wd.worker_count_motia, 1); - assert_eq!(wd.worker_count_non_iii_sdk_framework, 1); + assert_eq!(wd.worker_count_by_framework.get("iii-node"), Some(&1)); assert!(wd.sdk_telemetry.is_some()); let telem = wd.sdk_telemetry.unwrap(); assert_eq!(telem.language, Some("typescript".to_string())); assert_eq!(telem.project_name, Some("proj-a".to_string())); - assert_eq!(telem.framework, Some("iii-js".to_string())); + assert_eq!(telem.framework, Some("iii-node".to_string())); } #[test] @@ -1793,8 +1772,7 @@ mod tests { let wd = collect_worker_data(&engine); assert_eq!(wd.worker_count_total, 1); - assert_eq!(wd.worker_count_motia, 1); - assert_eq!(wd.worker_count_non_iii_sdk_framework, 0); + assert_eq!(wd.worker_count_by_framework.get("motia"), Some(&1)); } #[test] @@ -2000,20 +1978,20 @@ mod tests { } // ========================================================================= - // get_or_create_install_id + // get_or_create_device_id // ========================================================================= #[test] - fn test_get_or_create_install_id_returns_nonempty_string() { - let id = get_or_create_install_id(); + fn test_get_or_create_device_id_returns_nonempty_string() { + let id = get_or_create_device_id(); assert!(!id.is_empty()); } #[test] - fn test_get_or_create_install_id_is_stable() { - let id1 = get_or_create_install_id(); - let id2 = get_or_create_install_id(); - assert_eq!(id1, id2, "install_id should be stable across calls"); + fn test_get_or_create_device_id_is_stable() { + let id1 = get_or_create_device_id(); + let id2 = get_or_create_device_id(); + assert_eq!(id1, id2, "device_id should be stable across calls"); } // ========================================================================= @@ -2124,7 +2102,7 @@ mod tests { worker.telemetry = Some(WorkerTelemetryMeta { language: Some("typescript".to_string()), project_name: Some("telemetry-spec".to_string()), - framework: Some("iii-js".to_string()), + framework: Some("iii-node".to_string()), }); engine.worker_registry.register_worker(worker); @@ -2150,9 +2128,9 @@ mod tests { .await .expect("start background tasks"); - tokio::time::sleep(Duration::from_millis(2200)).await; + tokio::time::sleep(Duration::from_millis(200)).await; shutdown_tx.send(true).expect("signal shutdown"); - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(200)).await; module.destroy().await.expect("destroy telemetry module"); reset_telemetry_globals(); diff --git a/sdk/packages/node/iii/src/iii.ts b/sdk/packages/node/iii/src/iii.ts index 162427e9a..f4df59968 100644 --- a/sdk/packages/node/iii/src/iii.ts +++ b/sdk/packages/node/iii/src/iii.ts @@ -555,7 +555,7 @@ class Sdk implements ISdk { telemetry: { language, project_name: telemetryOpts?.project_name, - framework: telemetryOpts?.framework, + framework: telemetryOpts?.framework?.trim() || 'iii-node', amplitude_api_key: telemetryOpts?.amplitude_api_key, }, }, diff --git a/sdk/packages/python/iii/src/iii/iii.py b/sdk/packages/python/iii/src/iii/iii.py index 2b0709ecc..4bfc77edf 100644 --- a/sdk/packages/python/iii/src/iii/iii.py +++ b/sdk/packages/python/iii/src/iii/iii.py @@ -17,7 +17,13 @@ from .channels import ChannelReader, ChannelWriter from .format_utils import extract_request_format, extract_response_format -from .iii_constants import DEFAULT_RECONNECTION_CONFIG, MAX_QUEUE_SIZE, FunctionRef, IIIConnectionState, InitOptions +from .iii_constants import ( + DEFAULT_RECONNECTION_CONFIG, + MAX_QUEUE_SIZE, + FunctionRef, + IIIConnectionState, + InitOptions, +) from .iii_types import ( FunctionInfo, HttpInvocationConfig, @@ -44,7 +50,14 @@ UnregisterTriggerTypeMessage, WorkerInfo, ) -from .stream import IStream, StreamDeleteInput, StreamGetInput, StreamListGroupsInput, StreamListInput, StreamSetInput +from .stream import ( + IStream, + StreamDeleteInput, + StreamGetInput, + StreamListGroupsInput, + StreamListInput, + StreamSetInput, +) from .telemetry_types import OtelConfig from .triggers import Trigger, TriggerConfig, TriggerHandler, TriggerTypeRef from .types import Channel, RemoteFunctionData, RemoteTriggerTypeData, is_channel_ref @@ -61,6 +74,7 @@ def _resolve_format(fmt: Any) -> Any | None: return None if isinstance(fmt, type): from .format_utils import python_type_to_format + return python_type_to_format(fmt) return fmt @@ -100,10 +114,14 @@ def __init__(self, address: str, options: InitOptions | None = None) -> None: self._reconnect_task: asyncio.Task[None] | None = None self._running = False self._receiver_task: asyncio.Task[None] | None = None - self._functions_available_callbacks: set[Callable[[list[FunctionInfo]], None]] = set() + self._functions_available_callbacks: set[ + Callable[[list[FunctionInfo]], None] + ] = set() self._functions_available_trigger: Trigger | None = None self._functions_available_function_id: str | None = None - self._reconnection_config = self._options.reconnection_config or DEFAULT_RECONNECTION_CONFIG + self._reconnection_config = ( + self._options.reconnection_config or DEFAULT_RECONNECTION_CONFIG + ) self._reconnect_attempt = 0 self._connection_state: IIIConnectionState = "disconnected" self._worker_id: str | None = None @@ -138,7 +156,9 @@ def _wait_until_connected(self) -> None: raise ConnectionError(f"Connection to {self._address} failed") self._connected_event.wait(timeout=30) if cast(IIIConnectionState, self._connection_state) == "failed": - raise ConnectionError(f"Connection to {self._address} failed after max retries") + raise ConnectionError( + f"Connection to {self._address} failed after max retries" + ) def shutdown(self) -> None: """Gracefully shut down the client, releasing all resources. @@ -250,18 +270,27 @@ def _schedule_reconnect(self) -> None: async def _reconnect_loop(self) -> None: config = self._reconnection_config while self._running and not self._ws: - if config.max_retries != -1 and self._reconnect_attempt >= config.max_retries: + if ( + config.max_retries != -1 + and self._reconnect_attempt >= config.max_retries + ): self._set_connection_state("failed") - log.error(f"Max reconnection retries ({config.max_retries}) reached, giving up") + log.error( + f"Max reconnection retries ({config.max_retries}) reached, giving up" + ) return - exponential_delay = config.initial_delay_ms * (config.backoff_multiplier**self._reconnect_attempt) + exponential_delay = config.initial_delay_ms * ( + config.backoff_multiplier**self._reconnect_attempt + ) capped_delay = min(exponential_delay, config.max_delay_ms) jitter = capped_delay * config.jitter_factor * (2 * random.random() - 1) delay_ms = max(0, capped_delay + jitter) self._set_connection_state("reconnecting") - log.debug(f"Reconnecting in {delay_ms:.0f}ms (attempt {self._reconnect_attempt + 1})") + log.debug( + f"Reconnecting in {delay_ms:.0f}ms (attempt {self._reconnect_attempt + 1})" + ) await asyncio.sleep(delay_ms / 1000.0) self._reconnect_attempt += 1 @@ -433,7 +462,9 @@ async def _invoke_with_otel_context( carrier["traceparent"] = traceparent if baggage: carrier["baggage"] = baggage - parent_ctx = propagate.extract(carrier) if carrier else otel_context.get_current() + parent_ctx = ( + propagate.extract(carrier) if carrier else otel_context.get_current() + ) tracer = trace.get_tracer("iii-python-sdk") with tracer.start_as_current_span( f"call {handler.__name__}", @@ -454,7 +485,11 @@ async def _invoke_with_otel_context( def _resolve_channels(self, data: Any) -> Any: if is_channel_ref(data): ref = StreamChannelRef(**data) - return ChannelReader(self._address, ref) if ref.direction == "read" else ChannelWriter(self._address, ref) + return ( + ChannelReader(self._address, ref) + if ref.direction == "read" + else ChannelWriter(self._address, ref) + ) if isinstance(data, dict): return {k: self._resolve_channels(v) for k, v in data.items()} if isinstance(data, list): @@ -499,14 +534,20 @@ async def _handle_invoke( InvocationResultMessage( invocation_id=invocation_id, function_id=path, - error={"code": "invocation_failed", "message": str(e), "stacktrace": traceback.format_exc()}, + error={ + "code": "invocation_failed", + "message": str(e), + "stacktrace": traceback.format_exc(), + }, ) ) return if not invocation_id: task = asyncio.create_task( - self._invoke_with_otel_context(func.handler, resolved_data, traceparent, baggage) + self._invoke_with_otel_context( + func.handler, resolved_data, traceparent, baggage + ) ) task.add_done_callback(self._log_task_exception) return @@ -533,7 +574,11 @@ async def _handle_invoke( InvocationResultMessage( invocation_id=invocation_id, function_id=path, - error={"code": "invocation_failed", "message": str(original), "stacktrace": traceback.format_exc()}, + error={ + "code": "invocation_failed", + "message": str(original), + "stacktrace": traceback.format_exc(), + }, traceparent=te.traceparent, ) ) @@ -543,13 +588,19 @@ async def _handle_invoke( InvocationResultMessage( invocation_id=invocation_id, function_id=path, - error={"code": "invocation_failed", "message": str(e), "stacktrace": traceback.format_exc()}, + error={ + "code": "invocation_failed", + "message": str(e), + "stacktrace": traceback.format_exc(), + }, ) ) async def _handle_trigger_registration(self, data: dict[str, Any]) -> None: trigger_type_id = data.get("trigger_type") - handler_data = self._trigger_types.get(trigger_type_id) if trigger_type_id else None + handler_data = ( + self._trigger_types.get(trigger_type_id) if trigger_type_id else None + ) trigger_id = data.get("id", "") function_id = data.get("function_id", "") @@ -573,7 +624,12 @@ async def _handle_trigger_registration(self, data: dict[str, Any]) -> None: await self._send(result_base) except Exception as e: log.exception(f"Error registering trigger {trigger_id}") - await self._send({**result_base, "error": {"code": "trigger_registration_failed", "message": str(e)}}) + await self._send( + { + **result_base, + "error": {"code": "trigger_registration_failed", "message": str(e)}, + } + ) # Connection state management @@ -641,7 +697,11 @@ def register_trigger_type( if isinstance(trigger_type.trigger_request_format, type) else None ) - request_cls = trigger_type.call_request_format if isinstance(trigger_type.call_request_format, type) else None + request_cls = ( + trigger_type.call_request_format + if isinstance(trigger_type.call_request_format, type) + else None + ) msg = RegisterTriggerTypeMessage( id=trigger_type.id, @@ -649,7 +709,9 @@ def register_trigger_type( trigger_request_format=_resolve_format(trigger_type.trigger_request_format), call_request_format=_resolve_format(trigger_type.call_request_format), ) - self._trigger_types[trigger_type.id] = RemoteTriggerTypeData(message=msg, handler=handler) + self._trigger_types[trigger_type.id] = RemoteTriggerTypeData( + message=msg, handler=handler + ) self._send_if_connected(msg) return TriggerTypeRef( @@ -659,7 +721,9 @@ def register_trigger_type( request_cls=request_cls, ) - def unregister_trigger_type(self, trigger_type: "RegisterTriggerTypeInput | dict[str, Any]") -> None: + def unregister_trigger_type( + self, trigger_type: "RegisterTriggerTypeInput | dict[str, Any]" + ) -> None: """Unregister a previously registered trigger type. Args: @@ -676,7 +740,9 @@ def unregister_trigger_type(self, trigger_type: "RegisterTriggerTypeInput | dict self._trigger_types.pop(type_id, None) self._send_if_connected(UnregisterTriggerTypeMessage(id=type_id)) - def register_trigger(self, trigger: RegisterTriggerInput | dict[str, Any]) -> Trigger: + def register_trigger( + self, trigger: RegisterTriggerInput | dict[str, Any] + ) -> Trigger: """Bind a trigger configuration to a registered function. Args: @@ -715,7 +781,9 @@ def register_trigger(self, trigger: RegisterTriggerInput | dict[str, Any]) -> Tr def unregister() -> None: self._triggers.pop(trigger_id, None) - self._send_if_connected(UnregisterTriggerMessage(id=trigger_id, trigger_type=msg.trigger_type)) + self._send_if_connected( + UnregisterTriggerMessage(id=trigger_id, trigger_type=msg.trigger_type) + ) return Trigger(unregister) @@ -788,7 +856,9 @@ def register_function( """ if isinstance(func_or_id, str): # Simplified API: auto-extract formats from handler type hints - handler_for_extraction = handler_or_invocation if callable(handler_or_invocation) else None + handler_for_extraction = ( + handler_or_invocation if callable(handler_or_invocation) else None + ) if request_format is None and handler_for_extraction is not None: request_format = extract_request_format(handler_for_extraction) if response_format is None and handler_for_extraction is not None: @@ -824,7 +894,9 @@ def register_function( else: if not callable(handler_or_invocation): actual_type = type(handler_or_invocation).__name__ - raise TypeError(f"handler_or_invocation must be callable or HttpInvocationConfig, got {actual_type}") + raise TypeError( + f"handler_or_invocation must be callable or HttpInvocationConfig, got {actual_type}" + ) handler = handler_or_invocation msg = RegisterFunctionMessage( id=func.id, @@ -973,7 +1045,9 @@ async def trigger_async(self, request: "dict[str, Any] | TriggerRequest") -> Any self._pending[invocation_id] = future - enqueue_action: TriggerActionEnqueue | None = action if isinstance(action, TriggerActionEnqueue) else None + enqueue_action: TriggerActionEnqueue | None = ( + action if isinstance(action, TriggerActionEnqueue) else None + ) await self._send( InvokeFunctionMessage( @@ -990,7 +1064,9 @@ async def trigger_async(self, request: "dict[str, Any] | TriggerRequest") -> Any return await asyncio.wait_for(future, timeout=timeout_secs) except asyncio.TimeoutError: self._pending.pop(invocation_id, None) - raise TimeoutError(f"Invocation of '{function_id}' timed out after {timeout_ms}ms") + raise TimeoutError( + f"Invocation of '{function_id}' timed out after {timeout_ms}ms" + ) def list_functions(self) -> list[FunctionInfo]: """List all functions registered with the engine across all workers. @@ -1014,7 +1090,9 @@ async def list_functions_async(self) -> list[FunctionInfo]: >>> for fn in await iii.list_functions_async(): ... print(fn.function_id, fn.description) """ - result = await self.trigger_async({"function_id": "engine::functions::list", "payload": {}}) + result = await self.trigger_async( + {"function_id": "engine::functions::list", "payload": {}} + ) functions_data = result.get("functions", []) return [FunctionInfo(**f) for f in functions_data] @@ -1040,7 +1118,9 @@ async def list_workers_async(self) -> list[WorkerInfo]: >>> for w in await iii.list_workers_async(): ... print(w.name, w.worker_id) """ - result = await self.trigger_async({"function_id": "engine::workers::list", "payload": {}}) + result = await self.trigger_async( + {"function_id": "engine::workers::list", "payload": {}} + ) workers_data = result.get("workers", []) return [WorkerInfo(**w) for w in workers_data] @@ -1060,7 +1140,9 @@ def list_triggers(self, include_internal: bool = False) -> list[TriggerInfo]: """ return self._run_on_loop(self.list_triggers_async(include_internal)) - async def list_triggers_async(self, include_internal: bool = False) -> list[TriggerInfo]: + async def list_triggers_async( + self, include_internal: bool = False + ) -> list[TriggerInfo]: """List all triggers registered with the engine. Args: @@ -1083,7 +1165,9 @@ async def list_triggers_async(self, include_internal: bool = False) -> list[Trig triggers_data = result.get("triggers", []) return [TriggerInfo(**t) for t in triggers_data] - def list_trigger_types(self, include_internal: bool = False) -> list[TriggerTypeInfo]: + def list_trigger_types( + self, include_internal: bool = False + ) -> list[TriggerTypeInfo]: """List all trigger types registered with the engine. Args: @@ -1101,7 +1185,9 @@ def list_trigger_types(self, include_internal: bool = False) -> list[TriggerType """ return self._run_on_loop(self.list_trigger_types_async(include_internal)) - async def list_trigger_types_async(self, include_internal: bool = False) -> list[TriggerTypeInfo]: + async def list_trigger_types_async( + self, include_internal: bool = False + ) -> list[TriggerTypeInfo]: """List all trigger types registered with the engine. Args: @@ -1193,14 +1279,18 @@ def _get_worker_metadata(self) -> dict[str, Any]: telemetry_opts = self._options.telemetry language = ( - (telemetry_opts.language if telemetry_opts else None) or os.environ.get("LANG", "").split(".")[0] or None + (telemetry_opts.language if telemetry_opts else None) + or os.environ.get("LANG", "").split(".")[0] + or None ) telemetry: dict[str, Any] = { "language": language, "project_name": telemetry_opts.project_name if telemetry_opts else None, - "framework": telemetry_opts.framework if telemetry_opts else None, - "amplitude_api_key": telemetry_opts.amplitude_api_key if telemetry_opts else None, + "framework": (telemetry_opts.framework if telemetry_opts else None) or "iii-py", + "amplitude_api_key": ( + telemetry_opts.amplitude_api_key if telemetry_opts else None + ), } return { @@ -1222,7 +1312,9 @@ def _register_worker_metadata(self) -> None: ) asyncio.run_coroutine_threadsafe(self._send(msg), self._loop) - def on_functions_available(self, callback: Callable[[list[FunctionInfo]], None]) -> Callable[[], None]: + def on_functions_available( + self, callback: Callable[[list[FunctionInfo]], None] + ) -> Callable[[], None]: """Subscribe to function-availability events from the engine. The callback fires whenever the set of available functions changes @@ -1249,7 +1341,9 @@ def on_functions_available(self, callback: Callable[[list[FunctionInfo]], None]) if not self._functions_available_trigger: if not self._functions_available_function_id: - self._functions_available_function_id = f"iii.on_functions_available.{uuid.uuid4()}" + self._functions_available_function_id = ( + f"iii.on_functions_available.{uuid.uuid4()}" + ) function_id = self._functions_available_function_id if function_id not in self._functions: @@ -1263,12 +1357,19 @@ async def handler(data: dict[str, Any]) -> None: self.register_function({"id": function_id}, handler) self._functions_available_trigger = self.register_trigger( - {"type": "engine::functions-available", "function_id": function_id, "config": {}} + { + "type": "engine::functions-available", + "function_id": function_id, + "config": {}, + } ) def unsubscribe() -> None: self._functions_available_callbacks.discard(callback) - if len(self._functions_available_callbacks) == 0 and self._functions_available_trigger: + if ( + len(self._functions_available_callbacks) == 0 + and self._functions_available_trigger + ): self._functions_available_trigger.unregister() self._functions_available_trigger = None @@ -1316,14 +1417,18 @@ async def list_handler(data: Any) -> list[Any]: return await stream.list(input_data) async def list_groups_handler(data: Any) -> list[str]: - input_data = StreamListGroupsInput(**data) if isinstance(data, dict) else data + input_data = ( + StreamListGroupsInput(**data) if isinstance(data, dict) else data + ) return await stream.list_groups(input_data) self.register_function({"id": f"stream::get({stream_name})"}, get_handler) self.register_function({"id": f"stream::set({stream_name})"}, set_handler) self.register_function({"id": f"stream::delete({stream_name})"}, delete_handler) self.register_function({"id": f"stream::list({stream_name})"}, list_handler) - self.register_function({"id": f"stream::list_groups({stream_name})"}, list_groups_handler) + self.register_function( + {"id": f"stream::list_groups({stream_name})"}, list_groups_handler + ) class TriggerAction: diff --git a/sdk/packages/rust/iii/src/iii.rs b/sdk/packages/rust/iii/src/iii.rs index ee1731b82..95355b35e 100644 --- a/sdk/packages/rust/iii/src/iii.rs +++ b/sdk/packages/rust/iii/src/iii.rs @@ -1377,7 +1377,16 @@ impl III { /// Register this worker's metadata with the engine (called automatically on connect) fn register_worker_metadata(&self) { - if let Some(metadata) = self.inner.worker_metadata.lock_or_recover().clone() { + if let Some(mut metadata) = self.inner.worker_metadata.lock_or_recover().clone() { + let fw = metadata + .telemetry + .as_ref() + .and_then(|t| t.framework.as_deref()) + .unwrap_or(""); + if fw.is_empty() { + let telem = metadata.telemetry.get_or_insert_with(Default::default); + telem.framework = Some("iii-rust".to_string()); + } if let Ok(value) = serde_json::to_value(metadata) { let _ = self.send_message(Message::InvokeFunction { invocation_id: None,