diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index 29ea860..9c551a2 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -99,6 +99,13 @@ jobs: sleep 1; done + - name: Create artifacts directory + run: mkdir artifacts + + ##### + # Test without local disk cache + ##### + - name: Run active storage container run: make run @@ -108,9 +115,6 @@ jobs: sleep 1; done - - name: Create artifacts directory - run: mkdir artifacts - - name: Run compliance test suite run: pytest -s > artifacts/pytest.log @@ -118,6 +122,38 @@ jobs: run: docker logs reductionist > artifacts/reductionist.log if: always() + - name: Stop active storage container + run: make stop + if: always() + + ##### + # Test with local disk cache + ##### + + - name: Run active storage container with local disk cache + run: make run-with-cache + + - name: Wait for active storage server to start + run: | + until curl -if http://localhost:8080/.well-known/reductionist-schema; do + sleep 1; + done + + - name: Run compliance test suite + run: pytest -s > artifacts/pytest-with-cache.log + + - name: Get active storage logs + run: docker logs reductionist > artifacts/reductionist-with-cache.log + if: always() + + - name: Stop active storage container + run: make stop + if: always() + + ##### + # Clean up steps + ##### + - name: Upload artifacts uses: actions/upload-artifact@v4 with: @@ -129,9 +165,6 @@ jobs: run: scripts/minio-stop if: always() - - name: Stop active storage container - run: make stop - if: always() deployment-test: runs-on: ubuntu-latest steps: @@ -183,6 +216,7 @@ jobs: sudo ip a sudo ip r if: failure() + dependency-review: runs-on: ubuntu-latest if: github.event_name == 'pull_request' diff --git a/Cargo.lock b/Cargo.lock index 6316ec0..28654d8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,17 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" +[[package]] +name = "ahash" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" +dependencies = [ + "getrandom 0.2.15", + "once_cell", + "version_check", +] + [[package]] name = "ahash" version = "0.8.11" @@ -117,6 +128,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "23b62fc65de8e4e7f52534fb52b0f3ed04746ae267519eef2a83941e8085068b" +[[package]] +name = "arrayvec" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" + [[package]] name = "async-trait" version = "0.1.82" @@ -189,7 +206,7 @@ version = "1.49.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e518950d4ac43508c8bfc2fe4e24b0752d99eab80134461d5e162dcda0214b55" dependencies = [ - "ahash", + "ahash 0.8.11", "aws-credential-types", "aws-runtime", "aws-sigv4", @@ -544,6 +561,18 @@ version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" +[[package]] +name = "bitvec" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bc2832c24239b0141d5674bb9174f9d68a8b5b3f2753311927c172ca46f7e9c" +dependencies = [ + "funty", + "radium", + "tap", + "wyz", +] + [[package]] name = "blake2b_simd" version = "0.5.11" @@ -551,7 +580,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "afa748e348ad3be8263be728124b24a24f268266f6f5d58af9d75f6a40b5c587" dependencies = [ "arrayref", - "arrayvec", + "arrayvec 0.5.2", "constant_time_eq", ] @@ -564,12 +593,68 @@ dependencies = [ "generic-array", ] +[[package]] +name = "borsh" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5327f6c99920069d1fe374aa743be1af0031dea9f250852cdf1ae6a0861ee24" +dependencies = [ + "borsh-derive", + "cfg_aliases", +] + +[[package]] +name = "borsh-derive" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10aedd8f1a81a8aafbfde924b0e3061cd6fedd6f6bbcfc6a76e6fd426d7bfe26" +dependencies = [ + "once_cell", + "proc-macro-crate", + "proc-macro2", + "quote", + "syn 2.0.77", +] + [[package]] name = "bumpalo" version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +[[package]] +name = "byte-unit" +version = "5.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1cd29c3c585209b0cbc7309bfe3ed7efd8c84c21b7af29c8bfae908f8777174" +dependencies = [ + "rust_decimal", + "serde", + "utf8-width", +] + +[[package]] +name = "bytecheck" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23cdc57ce23ac53c931e88a43d06d070a6fd142f2617be5855eb75efc9beb1c2" +dependencies = [ + "bytecheck_derive", + "ptr_meta", + "simdutf8", +] + +[[package]] +name = "bytecheck_derive" +version = "0.6.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3db406d29fbcd95542e92559bed4d8ad92636d1ca8b3b72ede10b4bcc010e659" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "byteorder" version = "1.5.0" @@ -578,9 +663,12 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.7.1" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8318a53db07bb3f8dca91a600466bdb3f2eaadeedfdbcf02e1accbad9271ba50" +checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" +dependencies = [ + "serde", +] [[package]] name = "bytes-utils" @@ -613,6 +701,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "ciborium" version = "0.2.2" @@ -950,12 +1044,12 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.9" +version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "534c5cf6194dfab3db3242765c03bbe257cf92f22b38f6bc0c58d59108a820ba" +checksum = "33d852cb9b869c2a9b3df2f71a3074817f01e1844f839a144f5fcef059a4eb5d" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -1010,6 +1104,12 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "funty" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" + [[package]] name = "futures" version = "0.3.30" @@ -1127,6 +1227,18 @@ dependencies = [ "wasi 0.11.0+wasi-snapshot-preview1", ] +[[package]] +name = "getrandom" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73fea8450eea4bac3940448fb7ae50d91f034f941199fcd9d909a5a07aa455f0" +dependencies = [ + "cfg-if", + "libc", + "r-efi", + "wasi 0.14.2+wasi-0.2.4", +] + [[package]] name = "gimli" version = "0.31.0" @@ -1178,6 +1290,9 @@ name = "hashbrown" version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" +dependencies = [ + "ahash 0.7.8", +] [[package]] name = "hashbrown" @@ -1185,7 +1300,7 @@ version = "0.14.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" dependencies = [ - "ahash", + "ahash 0.8.11", "allocator-api2", ] @@ -1459,9 +1574,9 @@ checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" [[package]] name = "libc" -version = "0.2.158" +version = "0.2.171" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8adc4bb1803a324070e64a98ae98f38934d91957a99cfb3a43dcbc01bc56439" +checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" [[package]] name = "linux-raw-sys" @@ -1469,6 +1584,12 @@ version = "0.4.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78b3ae25bc7c8c38cec158d1f2757ee79e9b3740fbc7ccf0e59e4b08d793fa89" +[[package]] +name = "linux-raw-sys" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe7db12097d22ec582439daf8618b8fdd1a7bef6270e9af3b1ebcd30893cf413" + [[package]] name = "lock_api" version = "0.4.12" @@ -1535,6 +1656,12 @@ dependencies = [ "digest", ] +[[package]] +name = "md5" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" + [[package]] name = "memchr" version = "2.7.4" @@ -1906,6 +2033,15 @@ dependencies = [ "zerocopy 0.7.35", ] +[[package]] +name = "proc-macro-crate" +version = "3.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8ecf48c7ca261d60b74ab1a7b20da18bede46776b2e55535cb958eb595c5fa7b" +dependencies = [ + "toml_edit", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -1949,7 +2085,7 @@ dependencies = [ "hex", "lazy_static", "procfs-core", - "rustix", + "rustix 0.38.37", ] [[package]] @@ -1985,6 +2121,26 @@ version = "2.28.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "106dd99e98437432fed6519dedecfade6a06a73bb7b2a1e019fdd2bee5778d94" +[[package]] +name = "ptr_meta" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0738ccf7ea06b608c10564b31debd4f5bc5e197fc8bfe088f68ae5ce81e7a4f1" +dependencies = [ + "ptr_meta_derive", +] + +[[package]] +name = "ptr_meta_derive" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "16b845dbfca988fa33db069c0e230574d15a3088f147a87b64c7589eb662c9ac" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "pwd" version = "1.4.0" @@ -2004,6 +2160,18 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "r-efi" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "74765f6d916ee2faa39bc8e68e4f3ed8949b48cccdac59983d287a7cb71ce9c5" + +[[package]] +name = "radium" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" + [[package]] name = "rand" version = "0.8.5" @@ -2099,6 +2267,8 @@ dependencies = [ "aws-types", "axum", "axum-server", + "byte-unit", + "bytes", "clap 4.5.17", "criterion", "expanduser", @@ -2108,6 +2278,7 @@ dependencies = [ "hyper", "lazy_static", "maligned", + "md5", "mime", "ndarray", "ndarray-stats", @@ -2122,6 +2293,7 @@ dependencies = [ "serde_json", "serde_test", "strum_macros", + "tempfile", "thiserror", "time", "tokio", @@ -2133,6 +2305,7 @@ dependencies = [ "tracing-opentelemetry", "tracing-subscriber", "url", + "uuid", "validator", "zerocopy 0.6.6", "zune-inflate", @@ -2188,6 +2361,15 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a66a03ae7c801facd77a29370b4faec201768915ac14a721ba36f20bc9c209b" +[[package]] +name = "rend" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71fe3824f5629716b1589be05dacd749f6aa084c87e00e016714a8cdfccc997c" +dependencies = [ + "bytecheck", +] + [[package]] name = "rfc6979" version = "0.3.1" @@ -2229,6 +2411,35 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "rkyv" +version = "0.7.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9008cd6385b9e161d8229e1f6549dd23c3d022f132a2ea37ac3a10ac4935779b" +dependencies = [ + "bitvec", + "bytecheck", + "bytes", + "hashbrown 0.12.3", + "ptr_meta", + "rend", + "rkyv_derive", + "seahash", + "tinyvec", + "uuid", +] + +[[package]] +name = "rkyv_derive" +version = "0.7.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "503d1d27590a2b0a3a4ca4c94755aa2875657196ecbf401a42eff41d7de532c0" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "rust-argon2" version = "0.8.3" @@ -2241,6 +2452,22 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "rust_decimal" +version = "1.36.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b082d80e3e3cc52b2ed634388d436fe1f4de6af5786cc2de9ba9737527bdf555" +dependencies = [ + "arrayvec 0.7.6", + "borsh", + "bytes", + "num-traits", + "rand", + "rkyv", + "serde", + "serde_json", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -2265,10 +2492,23 @@ dependencies = [ "bitflags 2.6.0", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.4.14", "windows-sys 0.52.0", ] +[[package]] +name = "rustix" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e56a18552996ac8d29ecc3b190b4fdbb2d91ca4ec396de7bbffaf43f3d637e96" +dependencies = [ + "bitflags 2.6.0", + "errno", + "libc", + "linux-raw-sys 0.9.3", + "windows-sys 0.59.0", +] + [[package]] name = "rustls" version = "0.20.9" @@ -2370,6 +2610,12 @@ dependencies = [ "untrusted 0.9.0", ] +[[package]] +name = "seahash" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c107b6f4780854c8b126e228ea8869f4d7b71260f962fefb57b996b8959ba6b" + [[package]] name = "sec1" version = "0.3.0" @@ -2538,6 +2784,12 @@ version = "0.3.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d66dc143e6b11c1eddc06d5c423cfc97062865baf299914ab64caa38182078fe" +[[package]] +name = "simdutf8" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" + [[package]] name = "slab" version = "0.4.9" @@ -2638,6 +2890,25 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "tap" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" + +[[package]] +name = "tempfile" +version = "3.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "488960f40a3fd53d72c2a29a58722561dee8afdd175bd88e3db4677d7b2ba600" +dependencies = [ + "fastrand", + "getrandom 0.3.2", + "once_cell", + "rustix 1.0.3", + "windows-sys 0.59.0", +] + [[package]] name = "textwrap" version = "0.16.1" @@ -2831,6 +3102,23 @@ dependencies = [ "tokio", ] +[[package]] +name = "toml_datetime" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0dd7358ecb8fc2f8d014bf86f6f638ce72ba252a2c3a2572f2a795f1d23efb41" + +[[package]] +name = "toml_edit" +version = "0.22.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474" +dependencies = [ + "indexmap 2.5.0", + "toml_datetime", + "winnow", +] + [[package]] name = "tower" version = "0.4.13" @@ -3031,6 +3319,12 @@ version = "2.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" +[[package]] +name = "utf8-width" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86bd8d4e895da8537e5315b8254664e6b769c4ff3db18321b297a1e7004392e3" + [[package]] name = "utf8parse" version = "0.2.2" @@ -3039,9 +3333,12 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.10.0" +version = "1.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +checksum = "b3758f5e68192bb96cc8f9b7e2c2cfdabb435499a28499a42f8f984092adad4b" +dependencies = [ + "getrandom 0.2.15", +] [[package]] name = "validator" @@ -3134,6 +3431,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.14.2+wasi-0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9683f9a5a998d873c0d21fcbe3c083009670149a8fab228644b8bd36b2c48cb3" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasm-bindgen" version = "0.2.93" @@ -3322,6 +3628,33 @@ version = "0.52.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" +[[package]] +name = "winnow" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7f4ea97f6f78012141bcdb6a216b2609f0979ada50b20ca5b52dde2eac2bb1" +dependencies = [ + "memchr", +] + +[[package]] +name = "wit-bindgen-rt" +version = "0.39.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f42320e61fe2cfd34354ecb597f86f413484a798ba44a8ca1165c58d42da6c1" +dependencies = [ + "bitflags 2.6.0", +] + +[[package]] +name = "wyz" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f360fc0b24296329c78fda852a1e9ae82de9cf7b27dae4b7f62f118f77b9ed" +dependencies = [ + "tap", +] + [[package]] name = "xmlparser" version = "0.13.6" diff --git a/Cargo.toml b/Cargo.toml index 9e2cc2f..06de868 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,8 @@ aws-smithy-types = "1.2" aws-types = "1.3" axum = { version = "0.6", features = ["headers"] } axum-server = { version = "0.4.7", features = ["tls-rustls"] } +byte-unit = "5.1.6" +bytes = { version = "1.9.0", features = ["serde"] } clap = { version = "~4.5", features = ["derive", "env"] } expanduser = "1.2.2" flate2 = "1.0" @@ -42,6 +44,7 @@ http = "1.1" hyper = { version = "0.14", features = ["full"] } lazy_static = "1.5" maligned = "0.2.1" +md5 = "0.7.0" mime = "0.3" ndarray = "0.15" ndarray-stats = "0.5" @@ -54,6 +57,7 @@ rayon = "1.7" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" strum_macros = "0.24" +tempfile = "3.19.0" thiserror = "1.0" time = "= 0.3.23" tokio = { version = "1.28", features = ["full"] } @@ -65,6 +69,7 @@ tracing = "0.1" tracing-opentelemetry = "0.21" tracing-subscriber = { version = "0.3", features = ["env-filter"] } url = { version = "2", features = ["serde"] } +uuid = { version = "1.12.1", features = ["v4"] } validator = { version = "0.16", features = ["derive"] } zerocopy = { version = "0.6.1", features = ["alloc", "simd"] } zune-inflate = "0.2.54" diff --git a/Makefile b/Makefile index d066719..e2eb351 100644 --- a/Makefile +++ b/Makefile @@ -23,6 +23,10 @@ test: run: @docker run -it --detach --rm --net=host --name reductionist reductionist +.PHONY: run-with-cache +run-with-cache: + @docker run -it --detach --rm --net=host --name reductionist reductionist reductionist --use-chunk-cache --chunk-cache-path ./ + .PHONY: stop stop: @docker stop reductionist diff --git a/deployment/group_vars/all b/deployment/group_vars/all index 82a37a7..50c185a 100644 --- a/deployment/group_vars/all +++ b/deployment/group_vars/all @@ -5,6 +5,11 @@ reductionist_build_image: false reductionist_src_url: "http://github.com/stackhpc/reductionist-rs" # Source version. reductionist_src_version: "main" +# Location for repo checkout if using a local build. +reductionist_repo_location: "{{ ansible_facts.env.HOME }}/reductionist-rs" +# With the following "reductionist_clone_repo" set true +# an existing checkout in "reductionist_repo_location" will lose any local changes. +reductionist_clone_repo: "true" # Container name. reductionist_name: "reductionist" # Container image name. @@ -20,13 +25,25 @@ reductionist_env: REDUCTIONIST_ENABLE_JAEGER: "{{ (groups['jaeger'] | default([]) | length > 0) | string | lower }}" REDUCTIONIST_HTTPS: "true" REDUCTIONIST_PORT: "8081" + REDUCTIONIST_USE_CHUNK_CACHE: "true" + REDUCTIONIST_CHUNK_CACHE_PATH: "{{ reductionist_container_cache_path }}" + REDUCTIONIST_CHUNK_CACHE_AGE: "86400" + REDUCTIONIST_CHUNK_CACHE_PRUNE_INTERVAL: "3600" + REDUCTIONIST_CHUNK_CACHE_SIZE_LIMIT: "10GB" + REDUCTIONIST_CHUNK_CACHE_QUEUE_SIZE: "32" + REDUCTIONIST_CHUNK_CACHE_BYPASS_AUTH: "false" # Path to certificates directory on remote host. reductionist_remote_certs_path: "{{ ansible_facts.env.HOME }}/certs" # Path to certificates directory in container. reductionist_container_certs_path: "/root/.config/reductionist/certs" +# Path to cache directory on remote host. +reductionist_remote_cache_path: "/tmp" +# Path to cache directory in container. +reductionist_container_cache_path: "/cache" # List of container volume mounts. reductionist_volumes: - "{{ reductionist_remote_certs_path }}:{{ reductionist_container_certs_path }}" + - "{{ reductionist_remote_cache_path }}:{{ reductionist_container_cache_path }}" # Host on which HAProxy frontend is exposed. reductionist_host: "{{ hostvars[groups['haproxy'][0]].ansible_facts.default_ipv4.address }}" # Certificate validity. diff --git a/deployment/site.yml b/deployment/site.yml index 2343e97..7cd3d0d 100644 --- a/deployment/site.yml +++ b/deployment/site.yml @@ -369,15 +369,15 @@ - name: Clone reductionist repo ansible.builtin.git: repo: "{{ reductionist_src_url }}" - dest: "{{ ansible_env.HOME }}/reductionist-rs" + dest: "{{ reductionist_repo_location }}" version: "{{ reductionist_src_version }}" - when: reductionist_build_image | bool + when: reductionist_build_image | bool and reductionist_clone_repo | bool - name: Ensure reductionist image is built containers.podman.podman_image: name: "{{ reductionist_image }}" tag: "{{ reductionist_tag }}" - path: "{{ ansible_env.HOME }}/reductionist-rs" + path: "{{ reductionist_repo_location }}" when: reductionist_build_image | bool - name: Ensure reductionist container is running diff --git a/docs/architecture.md b/docs/architecture.md index e52640e..fbd750c 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -56,6 +56,15 @@ This is implemented using the `S3ClientMap` in `src/s3_client.rs` and benchmarke Downloaded storage chunk data is returned to the request handler as a [Bytes](https://docs.rs/bytes/latest/bytes/struct.Bytes.html) object, which is a wrapper around a `u8` (byte) array. +## S3 object caching + +A cache can be optionally enabled to store downloaded S3 objects to disk, this allows the Reductionist to repeat operations on already downloaded data objects utilising faster disk I/O over network I/O. +Authentication is passed through to the S3 object store and access to cached data by users other than the original requestor is allowed if S3 authentication permits. Authentication can be optionally disabled for further cache speedup in trusted environments. + +A [Tokio MPSC channel](https://docs.rs/tokio/latest/tokio/sync/mpsc/index.html) bridges write access between the requests of the asynchronous [Axum](https://docs.rs/axum) web framework and synchronous writes to the disk cache; this allows requests to the Reductionist to continue unblocked along their operation pipeline whilst being queued for cache storage. + +The disk cache can be managed overall by size and by time to live (TTL) on individual data objects with automatic pruning removing expired objects. Cache state is maintained on-disk allowing the cache to be reused across restarts of the Reductionist server. + ## Filters and compression When a variable in a netCDF, HDF5 or Zarr dataset is created, it may be compressed to reduce storage requirements. diff --git a/docs/deployment.md b/docs/deployment.md index 5e2912c..0a4fe00 100644 --- a/docs/deployment.md +++ b/docs/deployment.md @@ -193,13 +193,64 @@ Note, this is the default. Create a `certs` directory under the home directory of the non-privileged deployment user, this will be done automatically and the following files will be added if Step is deployed. If using third party certificates the following files must be added manually using the file names shown: -| Filename | Description | +| Filename | Description | | -------- | ------- | | certs/key.pem | Private key file | | certs/cert.pem | Certificate file including any intermediates | Certificates can be added post Reductionist deployment but the Reductionist's container will need to be restarted afterwards. +## Reductionist Configuration + +In addition to the `certs` configuration above the file `deployment/group_vars/all` covers the following configuration. + +| Ansible Parameter | Description | +| - | - | +| reductionist_build_image | Whether to locally build the Reductionist container | +| reductionist_src_url | Source URL for the Reductionist repository | +| reductionist_src_version | Repository branch to use for local builds | +| reductionist_repo_location | Where to clone the Reductionist repository | +| reductionist_clone_repo | By default the repository cloning overwrites local changes, this disables | +| reductionist_name | Name for Reductionist container | +| reductionist_image | Container URL if downloading and not building | +| reductionist_tag | Container tag | +| reductionist_networks | List of container networks | +| reductionist_env | Configures the Reductionist environment, see table of environment variables below | +| reductionist_remote_certs_path | Path to certificates on the host | +| reductionist_container_certs_path | Path to certificates within the container | +| reductionist_remote_cache_path | Path to cache on host filesystem | +| reductionist_container_cache_path | Path to cache within the container | +| reductionist_volumes | Volumes to map from host to container | +| reductionist_host | Used when deploying HAProxy to test connectivity to backend Reductionist(s) | +| reductionist_cert_not_after | Certificate validity | + +The ``reductionist_env`` parameter allows configuration of the environment variables passed to the Reductionist at runtime: + +| Environment Variable | Description | +| - | - | +| REDUCTIONIST_HOST | The IP address on which to listen on, default "0.0.0.0" | +| REDUCTIONIST_PORT | Port to listen on | +| REDUCTIONIST_HTTPS | Whether to enable https connections | +| REDUCTIONIST_CERT_FILE | Path to the certificate file used for https | +| REDUCTIONIST_KEY_FILE | Path to the key file used for https | +| REDUCTIONIST_SHUTDOWN_TIMEOUT | Maximum time in seconds to wait for operations to complete after receiving the 'ctrl+c' signal | +| REDUCTIONIST_ENABLE_JAEGER | Whether to enable sending traces to Jaeger | +| REDUCTIONIST_USE_RAYON | Whether to use Rayon for execution of CPU-bound tasks | +| REDUCTIONIST_MEMORY_LIMIT | Memory limit in bytes | +| REDUCTIONIST_S3_CONNECTION_LIMIT | S3 connection limit | +| REDUCTIONIST_THREAD_LIMIT | Thread limit for CPU-bound tasks | +| REDUCTIONIST_USE_CHUNK_CACHE | Whether to enable caching of downloaded data objects to disk | +| REDUCTIONIST_CHUNK_CACHE_PATH | Absolute filesystem path used for the cache. Defaults to container cache path, see Ansible Parameters above | +| REDUCTIONIST_CHUNK_CACHE_AGE | Time in seconds a chunk is kept in the cache | +| REDUCTIONIST_CHUNK_CACHE_PRUNE_INTERVAL | Time in seconds between periodic pruning of the cache | +| REDUCTIONIST_CHUNK_CACHE_SIZE_LIMIT | Maximum cache size, i.e. "100GB" | +| REDUCTIONIST_CHUNK_CACHE_QUEUE_SIZE | Tokio MPSC buffer size used to queue downloaded objects between the asynchronous web engine and the synchronous cache | +| REDUCTIONIST_CHUNK_CACHE_BYPASS_AUTH | Allow bypassing of S3 authentication when accessing cached data | + + +Note, after changing any of the above parameters the Reductionist must be deployed, or redeployed, using the ansible playbook for the change to take effect. +The idempotent nature of ansible necessitates that if redeploying then a running Reductionist container must be removed first. + ## Usage Once deployed, the Reductionist API is accessible on port 8080 by HAProxy. The Prometheus UI is accessible on port 9090 on the host running Prometheus. The Jaeger UI is accessible on port 16686 on the host running Jaeger. diff --git a/docs/img/reductionist-request-pipeline.svg b/docs/img/reductionist-request-pipeline.svg index 7e1eeaf..4d75144 100644 --- a/docs/img/reductionist-request-pipeline.svg +++ b/docs/img/reductionist-request-pipeline.svg @@ -1,4 +1,4 @@ - + -Receive API requestReceive API requestWeb server accepts connectionWeb server accepts c...Deserialise and validate request dataDeserialise and vali...Web server routes request to handlerWeb server routes re...Download storage chunk data from object storeDownload storage chu...DecompressDecompressDecode filtersDecode filtersPerform numerical operationPerform numerical op...Return API responseReturn API responseIs data compressed?Is data compressed?YesYesNoNoIs data filtered?Is data filtered?YesYesNoNoText is not SVG - cannot display \ No newline at end of file +Receive API requestReceive API requestWeb server accepts connectionWeb server accepts c...Deserialise and validate request dataDeserialise and vali...Web server routes request to handlerWeb server routes re...DecompressDecompressDecode filtersDecode filtersPerform numerical operationPerform numerical op...Return API responseReturn API responseIs data compressed?Is data compressed?YesYesNoNoIs data filtered?Is data filtered?YesYesNoNoDownloadchunk data from object storeDownload...Is chunk datacached?Is chunk data...NoNoYesYesSubmit chunk datato cacheSubmit chunk data...Retrieve chunk dataRetrieve chunk dataText is not SVG - cannot display \ No newline at end of file diff --git a/docs/index.md b/docs/index.md index cad09e5..9621f6e 100644 --- a/docs/index.md +++ b/docs/index.md @@ -19,6 +19,7 @@ Reductionist provides the following features: * HTTP(S) API with JSON request data * Access to data stored in S3-compatible storage +* On-disk caching of downloaded data to speed up repeat data requests * Basic numerical operations on multi-dimensional arrays (count, min, max, select, sum) * Perform calculations on a selection/slice of an array * Perform calculations allowing for missing data diff --git a/src/app.rs b/src/app.rs index 6e97d39..6edbd5b 100644 --- a/src/app.rs +++ b/src/app.rs @@ -1,9 +1,10 @@ //! Active Storage server API +use crate::chunk_cache::ChunkCache; use crate::cli::CommandLineArgs; use crate::error::ActiveStorageError; use crate::filter_pipeline; -use crate::metrics::{metrics_handler, track_metrics}; +use crate::metrics::{metrics_handler, track_metrics, LOCAL_CACHE_MISSES}; use crate::models; use crate::operation; use crate::operations; @@ -14,7 +15,6 @@ use crate::validated_json::ValidatedJson; use axum::middleware; use axum::{ - body::Bytes, extract::{Path, State}, headers::authorization::{Authorization, Basic}, http::header, @@ -22,9 +22,10 @@ use axum::{ routing::{get, post}, Router, TypedHeader, }; +use bytes::Bytes; +use tokio::sync::SemaphorePermit; use std::sync::Arc; -use tokio::sync::SemaphorePermit; use tower::Layer; use tower::ServiceBuilder; use tower_http::normalize_path::NormalizePathLayer; @@ -56,6 +57,9 @@ struct AppState { /// Resource manager. resource_manager: ResourceManager, + + /// Object chunk cache + chunk_cache: Option, } impl AppState { @@ -64,10 +68,27 @@ impl AppState { let task_limit = args.thread_limit.or_else(|| Some(num_cpus::get() - 1)); let resource_manager = ResourceManager::new(args.s3_connection_limit, args.memory_limit, task_limit); + let chunk_cache = if args.use_chunk_cache { + let path = args + .chunk_cache_path + .as_ref() + .expect("The chunk cache path must be specified when the chunk cache is enabled"); + Some(ChunkCache::new( + path, + args.chunk_cache_age, + args.chunk_cache_prune_interval, + args.chunk_cache_size_limit.clone(), + args.chunk_cache_buffer_size, + )) + } else { + None + }; + Self { args: args.clone(), s3_client_map: s3_client::S3ClientMap::new(), resource_manager, + chunk_cache, } } } @@ -167,29 +188,118 @@ async fn schema() -> &'static str { /// /// * `client`: S3 client object /// * `request_data`: RequestData object for the request -#[tracing::instrument( - level = "DEBUG", - skip(client, request_data, resource_manager, mem_permits) -)] -async fn download_object<'a>( +/// * `resource_manager`: ResourceManager object +#[tracing::instrument(level = "DEBUG", skip(client, request_data, resource_manager))] +async fn download_s3_object<'a>( client: &s3_client::S3Client, request_data: &models::RequestData, resource_manager: &'a ResourceManager, - mem_permits: &mut Option>, + mut mem_permits: Option>, ) -> Result { + // Convert request data to byte range for S3 request let range = s3_client::get_range(request_data.offset, request_data.size); + // Acquire connection permit to be freed via drop when this function returns let _conn_permits = resource_manager.s3_connection().await?; + client .download_object( &request_data.bucket, &request_data.object, range, resource_manager, - mem_permits, + &mut mem_permits, ) .await } +/// Download and cache an object from S3 +/// +/// Requests a byte range if `offset` or `size` is specified in the request. +/// +/// # Arguments +/// +/// * `client`: S3 client object +/// * `request_data`: RequestData object for the request +/// * `resource_manager`: ResourceManager object +/// * `chunk_cache`: ChunkCache object +#[tracing::instrument( + level = "DEBUG", + skip(client, request_data, resource_manager, mem_permits, chunk_cache) +)] +async fn download_and_cache_s3_object<'a>( + client: &s3_client::S3Client, + request_data: &models::RequestData, + resource_manager: &'a ResourceManager, + mut mem_permits: Option>, + chunk_cache: &ChunkCache, + allow_cache_auth_bypass: bool, +) -> Result { + // We chose a cache key such that any changes to request data + // which may feasibly indicate a change to the upstream object + // lead to a new cache key. + let key = format!( + "{}-{}-{}-{}-{:?}-{:?}", + request_data.source.as_str(), + request_data.bucket, + request_data.object, + request_data.dtype, + request_data.byte_order, + request_data.compression, + ); + + if let Some(metadata) = chunk_cache.get_metadata(&key).await { + if !allow_cache_auth_bypass { + // To avoid having to include the S3 client ID as part of the cache key + // (which means we'd have a separate cache for each authorised user and + // waste storage space) we instead make a lightweight check against the + // object store to ensure the user is authorised, even if the object data + // is already in the local cache. + let authorised = client + .is_authorised(&request_data.bucket, &request_data.object) + .await?; + if !authorised { + return Err(ActiveStorageError::Forbidden); + } + } + + // Update memory requested from resource manager to account for actual + // size of data if we were previously unable to guess the size from request + // data's size + offset parameters. + // FIXME: how to account for compressed data? + let mem_permits = &mut mem_permits; + match mem_permits { + None => { + *mem_permits = resource_manager.memory(metadata.size_bytes).await?; + } + Some(permits) => { + if permits.num_permits() == 0 { + *mem_permits = resource_manager.memory(metadata.size_bytes).await?; + } + } + } + // We only want to get chunks for which the metadata check succeeded too, + // otherwise chunks which are missing metadata could bypass the resource + // manager and exhaust system resources + let cache_value = chunk_cache + .get(&key) + .instrument(tracing::Span::current()) + .await?; + if let Some(bytes) = cache_value { + return Ok(bytes); + } + } + + let data = download_s3_object(client, request_data, resource_manager, mem_permits).await?; + + // Write data to cache + chunk_cache.set(&key, data.clone()).await?; + + // Increment the prometheus metric for cache misses + LOCAL_CACHE_MISSES.with_label_values(&["disk"]).inc(); + + Ok(data) +} + /// Handler for Active Storage operations /// /// Downloads object data from S3 storage and executes the requested reduction operation. @@ -209,8 +319,15 @@ async fn operation_handler( auth: Option>>, ValidatedJson(request_data): ValidatedJson, ) -> Result { + // NOTE(sd109): We acquire memory permits semaphore here so that + // they are owned by this top-level function and not freed until + // the permits are dropped when the this function returns. + + // If we're given a size in the request data then use this to + // get an initial guess at the required memory resources. let memory = request_data.size.unwrap_or(0); let mut _mem_permits = state.resource_manager.memory(memory).await?; + let credentials = if let Some(TypedHeader(auth)) = auth { s3_client::S3Credentials::access_key(auth.username(), auth.password()) } else { @@ -221,14 +338,28 @@ async fn operation_handler( .get(&request_data.source, credentials) .instrument(tracing::Span::current()) .await; - let data = download_object( - &s3_client, - &request_data, - &state.resource_manager, - &mut _mem_permits, - ) - .instrument(tracing::Span::current()) - .await?; + + let data = match (&state.args.use_chunk_cache, &state.chunk_cache) { + (false, _) => { + download_s3_object(&s3_client, &request_data, &state.resource_manager, _mem_permits) + .instrument(tracing::Span::current()) + .await? + } + (true, Some(cache)) => { + download_and_cache_s3_object( + &s3_client, + &request_data, + &state.resource_manager, + _mem_permits, + cache, + state.args.chunk_cache_bypass_auth + ).await? + } + (true, None) => panic!( + "Chunk cache enabled but no chunk cache provided.\nThis is a bug. Please report it to the application developers." + ), + }; + // All remaining work is synchronous. If the use_rayon argument was specified, delegate to the // Rayon thread pool. Otherwise, execute as normal using Tokio. if state.args.use_rayon { diff --git a/src/chunk_cache.rs b/src/chunk_cache.rs new file mode 100644 index 0000000..6c9710a --- /dev/null +++ b/src/chunk_cache.rs @@ -0,0 +1,818 @@ +use crate::error::ActiveStorageError; + +use byte_unit::Byte; +use bytes::Bytes; +use serde::{Deserialize, Serialize}; +use std::{ + collections::HashMap, + fs as std_fs, + path::PathBuf, + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; +use tokio::{fs, spawn, sync::mpsc}; + +/// ChunkCacheEntry stores a chunk ready to be cached. +struct ChunkCacheEntry { + /// Key to uniquely identify the chunk in the cache. + key: String, + /// Bytes to be cached. + value: Bytes, +} + +impl ChunkCacheEntry { + /// Return a ChunkCacheEntry object + fn new(key: &str, value: Bytes) -> Self { + let key = key.to_owned(); + // Make sure we own the `Bytes` so we don't see unexpected, but not incorrect, + // behaviour caused by the zero copy of `Bytes`. i.e. let us choose when to copy. + let value = Bytes::copy_from_slice(&value); + Self { key, value } + } +} + +/// ChunkCache wraps a SimpleDiskCache object +/// and makes it async multi-thread safe by buffering all write operations +/// through an async MPSC channel. +/// SimpleDiskCache reads are inherently thread safe +/// and the ChunkCache passes these through unbuffered. +/// Cache writes are MPSC buffered. The task caching the chunk will not be blocked +/// unless the buffer is full, upon which the task will be blocked until +/// buffered space becomes available. +/// Buffer size is configurable. +pub struct ChunkCache { + /// The underlying cache object. + cache: Arc, + /// Sync primitive for managing write access to the cache. + sender: mpsc::Sender, +} + +impl ChunkCache { + /// Returns a ChunkCache object. + /// + /// # Arguments + /// + /// * `path`: Filesystem path where the "chunk_cache" folder is created, such as "/tmp" + /// * `ttl_seconds`: Time in seconds to keep a chunk in the cache + /// * `prune_interval_seconds`: Interval in seconds to routinely check and prune the cache of expired chunks + /// * `max_size_bytes`: An optional maximum cache size expressed as a string, i.e. "100GB" + /// * `buffer_size`: An optional size for the chunk write buffer + pub fn new( + path: &str, + ttl_seconds: u64, + prune_interval_seconds: u64, + max_size_bytes: Option, + buffer_size: Option, + ) -> Self { + let max_size_bytes = if let Some(size_limit) = max_size_bytes { + let bytes = Byte::parse_str(size_limit, /* ignore case */ true) + .expect("Invalid cache size limit") + .as_u64(); + Some(usize::try_from(bytes).unwrap()) + } else { + None + }; + let cache = Arc::new(SimpleDiskCache::new( + "chunk_cache", + path, + ttl_seconds, + prune_interval_seconds, + max_size_bytes, + )); + // Clone the cache, i.e. increment the Arc's reference counter, + // give this to an async task we spawn for handling all cache writes. + let cache_clone = cache.clone(); + // Create a MPSC channel, give the single consumer receiving end to the write task + // and store the sending end for use in our `set` method. + // A download request storing to the cache need only wait for the chunk + // to be sent to the channel. + let buffer_size = buffer_size.unwrap_or(num_cpus::get() - 1); + let (sender, mut receiver) = mpsc::channel::(buffer_size); + spawn(async move { + while let Some(message) = receiver.recv().await { + cache_clone.set(&message.key, message.value).await.unwrap(); + } + }); + + Self { cache, sender } + } + + /// Stores chunk `Bytes` in the cache against an unique key. + /// + /// # Arguments + /// + /// * `key`: Unique key identifying the chunk + /// * `value`: Chunk `Bytes` to be cached + pub async fn set(&self, key: &str, value: Bytes) -> Result<(), ActiveStorageError> { + match self.sender.send(ChunkCacheEntry::new(key, value)).await { + Ok(_) => Ok(()), + Err(e) => Err(ActiveStorageError::ChunkCacheError { + error: format!("{}", e), + }), + } + } + + /// Retrieves chunk metadata from the cache for an unique key. + /// + /// # Arguments + /// + /// * `key`: Unique key identifying the chunk + pub async fn get_metadata(&self, key: &str) -> Option { + let state = self.cache.load_state().await; + state.metadata.get(key).cloned() + } + + /// Retrieves chunk `Bytes` from the cache for an unique key. + /// + /// # Arguments + /// + /// * `key`: Unique key identifying the chunk + pub async fn get(&self, key: &str) -> Result, ActiveStorageError> { + match self.cache.get(key).await { + Ok(value) => Ok(value), + Err(e) => Err(ActiveStorageError::ChunkCacheError { + error: format!("{:?}", e), + }), + } + } +} + +/// Metadata stored against each cache chunk. +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct Metadata { + /// Seconds after unix epoch for cache item expiry. + pub expires: u64, + /// Cache value size. + pub size_bytes: usize, +} + +impl Metadata { + /// Returns a Metadata object. + fn new(size: usize, ttl: u64) -> Self { + let expires = SystemTime::now() + .duration_since(UNIX_EPOCH) + // Only panics if 'now' is before epoch + .expect("System time to be set correctly") + .as_secs() + + ttl; + Metadata { + expires, + size_bytes: size, + } + } +} + +type CacheKeys = HashMap; + +/// State stores the metadata for all cached chunks, +/// the total size of the cache, +/// and the time, as seconds from epoch, when the cache will next be checked +/// and pruned of chunks whose ttl has expired. +#[derive(Debug, Serialize, Deserialize)] +struct State { + /// Per cached chunk metadata indexed by chunk key. + metadata: CacheKeys, + /// Current cache size in bytes. + current_size_bytes: usize, + /// When to next check and prune the cache for expired chunks, as seconds from epoch. + next_prune: u64, +} + +impl State { + /// Returns a State object. + fn new(prune_interval_secs: u64) -> Self { + let next_prune = SystemTime::now() + .duration_since(UNIX_EPOCH) + // Only panics if 'now' is before epoch + .expect("System time to be set correctly") + .as_secs() + + prune_interval_secs; + State { + metadata: CacheKeys::new(), + current_size_bytes: 0, + next_prune, + } + } +} + +/// The SimpleDiskCache takes chunks of `Bytes` data, identified by an unique key, +/// storing each chunk as a separate file on disk. Keys are stored in a hashmap +/// serialised to a JSON state file on disk. +/// Each chunk stored has a 'time to live' (TTL) stored as a number seconds from +/// the unix epoch, after which the chunk will have expired and can be pruned from +/// the cache. +/// Pruning takes place periodically or when the total size of the cache +/// reaches a maximum size threshold. +/// The decision whether to prune the cache is made when chunks are stored. +#[derive(Debug)] +struct SimpleDiskCache { + /// Cache folder name. + name: String, + /// Cache parent directory for the cache folder, such as "/tmp", which must exist. + dir: PathBuf, + /// Max time to live for a single cache entry. + ttl_seconds: u64, + /// Interval in seconds to routinely check and prune the cache of expired chunks. + prune_interval_seconds: u64, + /// Optional, a maximum size for the cache. + max_size_bytes: Option, +} + +impl SimpleDiskCache { + /// Names the JSON file used to store all cache keys and metadata. + const STATE_FILE: &'static str = "state.json"; + + /// Returns a SimpleDiskCache object. + pub fn new( + name: &str, + dir: &str, + ttl_seconds: u64, + prune_interval_seconds: u64, + max_size_bytes: Option, + ) -> Self { + let name = name.to_string(); + let dir = PathBuf::from(dir); + let path = dir.join(&name); + if !dir.as_path().exists() { + panic!("Cache parent dir {} must exist", dir.to_str().unwrap()) + } else if path.exists() { + let file = path.join(SimpleDiskCache::STATE_FILE); + if file.exists() { + let state = std_fs::read_to_string(file).expect("Failed to read cache state file"); + let _: State = serde_json::from_str(state.as_str()) + .expect("Failed to deserialise cache state"); + } else { + panic!("Cache directory {} already exists", dir.to_str().unwrap()) + } + } else { + std::fs::create_dir(&path).expect("Failed to create cache dir"); + } + SimpleDiskCache { + name, + dir, + ttl_seconds, + prune_interval_seconds, + max_size_bytes, + } + } + + /// Loads the cache state information from disk. + /// + /// Returns a `State` object. + async fn load_state(&self) -> State { + let file = self.dir.join(&self.name).join(SimpleDiskCache::STATE_FILE); + if file.exists() { + serde_json::from_str(fs::read_to_string(file).await.unwrap().as_str()).unwrap() + } else { + State::new(self.prune_interval_seconds) + } + } + + /// Saves the cache state information to disk. + /// + /// # Arguments + /// + /// * `state`: Cache `State` object. + async fn save_state(&self, data: State) { + let file = self.dir.join(&self.name).join(SimpleDiskCache::STATE_FILE); + fs::write(file, serde_json::to_string(&data).unwrap()) + .await + .unwrap(); + } + + /// Converts a chunk key into a string that can be used for a filename. + /// Keys must be unique but if too long may overstep the file name limits + /// of the underlying filesystem used to store the chunk. + /// + /// Returns a String. + /// + /// # Arguments + /// + /// * `key`: Unique key identifying the chunk + async fn filename_for_key(&self, key: &str) -> String { + // Cater for long URL keys causing filename too long filesystem errors. + format!("{:?}", md5::compute(key)) + } + + /// Retrieves chunk `Bytes` from the cache for an unique key. + /// The chunk simply needs to exist on disk to be returned. + /// For performance, metadata, including TTL, isn't checked and it's possible + /// to retrieve an expired chunk within the time window between the chunk expiring + /// and the next cache pruning. + /// This function does not modify the state of the cache and is thread safe. + /// + /// # Arguments + /// + /// * `key`: Unique key identifying the chunk + async fn get(&self, key: &str) -> Result, String> { + match fs::read( + self.dir + .join(&self.name) + .join(self.filename_for_key(key).await), + ) + .await + { + Ok(val) => Ok(Some(Bytes::from(val))), + Err(err) => match err.kind() { + std::io::ErrorKind::NotFound => Ok(None), + _ => Err(format!("{}", err)), + }, + } + } + + /// Stores chunk `Bytes` in the cache against an unique key. + /// The cache is checked and if necessary pruned before storing the chunk. + /// Where a maximum size limit has been set the check will take into account the size + /// of the chunk being stored and ensure sufficient storage space is available. + /// This function modifies the state of the cache and is not thread safe. + /// + /// # Arguments + /// + /// * `key`: Unique key identifying the chunk + /// * `value`: Chunk `Bytes` to be cached + async fn set(&self, key: &str, value: Bytes) -> Result<(), String> { + let size = value.len(); + // Run the prune before storing to ensure we have sufficient space + self.prune(/* headroom */ size).await?; + // Write the cache value and then update the metadata + let path = self + .dir + .join(&self.name) + .join(self.filename_for_key(key).await); + if let Err(e) = fs::write(path, value).await { + return Err(format!("{:?}", e)); + } + let mut state = self.load_state().await; + state + .metadata + .insert(key.to_owned(), Metadata::new(size, self.ttl_seconds)); + state.current_size_bytes += size; + self.save_state(state).await; + Ok(()) + } + + /// Removes a chunk from the cache, identified by its key. + /// + /// # Arguments + /// + /// * `key`: Unique key identifying the chunk + async fn remove(&self, key: &str) { + let mut state = self.load_state().await; + if let Some(data) = state.metadata.remove(key) { + let path = self + .dir + .join(&self.name) + .join(self.filename_for_key(key).await); + fs::remove_file(path).await.unwrap(); + state.current_size_bytes -= data.size_bytes; + self.save_state(state).await; + } + } + + /// Removes all cache entries whose TTL has expired. + async fn prune_expired(&self) { + let state = self.load_state().await; + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("System time to be set correctly") + .as_secs(); + for (key, data) in state.metadata.iter() { + if data.expires <= timestamp { + self.remove(key).await; + } + } + } + + /// If the optional maximum cache size has been set, this function removes cache entries + /// to ensure the total size of the cache is within the size limit. + /// Entries are removed in order of TTL, oldest first. + /// Entries whose TTL hasn't yet expired can be removed to make space. + /// A value of `headroom_byres` can be specified and this ensures the specified number + /// of bytes are left available after pruning, to ensure the next chunk can be saved. + /// + /// # Arguments + /// + /// * `headroom_bytes`: specifies additional free space that must be left available + async fn prune_disk_space(&self, headroom_bytes: usize) -> Result<(), String> { + if let Some(max_size_bytes) = self.max_size_bytes { + // TODO: Make this a std::io::ErrorKind::QuotaExceeded error once MSRV is 1.85 + if headroom_bytes > max_size_bytes { + return Err("Chunk cannot fit within cache maximum size threshold".to_string()); + } + let state = self.load_state().await; + let mut current_size_bytes: usize = + state.metadata.values().map(|value| value.size_bytes).sum(); + current_size_bytes += headroom_bytes; + if current_size_bytes >= max_size_bytes { + let mut metadata = state.metadata.iter().collect::>(); + metadata.sort_by_key(|key_value_tuple| key_value_tuple.1.expires); + for (key, data) in metadata { + self.remove(key).await; + // Repeat size calculation (outside of `remove`) to avoid reloading state. + current_size_bytes -= data.size_bytes; + if current_size_bytes < max_size_bytes { + break; + } + } + } + } + Ok(()) + } + + /// Prune the cache, this will be called before storing a chunk. + /// First, entries will be expired based on their TTL. + /// Second, if there's a maximum size limit on the cache it will be checked. + /// A value of `headroom_byres` can be specified and this ensures the specified number + /// of bytes are left available after pruning, to ensure the next chunk can be saved. + /// + /// # Arguments + /// + /// * `headroom_bytes`: specifies additional free space that must be left available + async fn prune(&self, headroom_bytes: usize) -> Result<(), String> { + let mut state = self.load_state().await; + // Prune when we go over the size threshold - this is optional. + let mut prune_expired = false; + if let Some(max_size_bytes) = self.max_size_bytes { + prune_expired = state.current_size_bytes + headroom_bytes >= max_size_bytes; + } + // We also prune at time intervals. + let timestamp = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("System time to be set correctly") + .as_secs(); + prune_expired |= state.next_prune <= timestamp; + // Prune if either of the above thresholds were crossed. + if prune_expired { + // First prune on TTL. + self.prune_expired().await; + // Do we need to prune further to keep within a maximum size threshold? + state = self.load_state().await; + if let Some(max_size_bytes) = self.max_size_bytes { + if state.current_size_bytes + headroom_bytes >= max_size_bytes { + self.prune_disk_space(headroom_bytes).await?; + } + } + // Update state with the time of the next periodic pruning. + state = self.load_state().await; + state.next_prune = timestamp + self.prune_interval_seconds; + self.save_state(state).await; + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + use tempfile::TempDir; + use tokio::time::sleep; + + #[tokio::test] + async fn test_simple_disk_cache() { + // Arrange + let tmp_dir = TempDir::new().unwrap(); + let cache = SimpleDiskCache::new( + "test-cache-1", + tmp_dir.path().to_str().unwrap(), + 10, // ttl + 60, // purge period + None, // max size + ); + + // Act + let key_1 = "item-1"; + let value_1 = Bytes::from(vec![1, 2, 3, 4]); + cache.set(key_1, value_1.clone()).await.unwrap(); + let cache_item_1 = cache.get(key_1).await; + + // Assert + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + assert_eq!(metadata.get(key_1).unwrap().size_bytes, value_1.len()); + assert_eq!(cache_item_1.unwrap(), Some(value_1)); + + // Act + let key_2 = "item-2"; + let value_2 = Bytes::from("Test123"); + cache.set(key_2, value_2.clone()).await.unwrap(); + let cache_item_2 = cache.get(key_2).await; + + // Assert + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 2); + assert_eq!(metadata.get(key_2).unwrap().size_bytes, value_2.len()); + assert_eq!(cache_item_2.unwrap(), Some(value_2)); + + // Act + cache.remove(key_1).await; + let cache_item_1 = cache.get(key_1).await; + + // Assert + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + assert!(!metadata.contains_key(key_1)); + assert!(metadata.contains_key(key_2)); + assert_eq!(cache_item_1.unwrap(), None); + } + + #[tokio::test] + async fn test_simple_disk_cache_prune_expired_all() { + let ttl = 1; + let time_between_inserts = 1; + let tmp_dir = TempDir::new().unwrap(); + let cache = SimpleDiskCache::new( + "test-cache-2", + tmp_dir.path().to_str().unwrap(), + ttl, // ttl for cache entries + 1000, // purge expired interval set large to not trigger expiry on "set" + None, // max cache size unset + ); + + // Action: populate cache + let key_1 = "item-1"; + let value_1 = Bytes::from(vec![1, 2, 3, 4]); + cache.set(key_1, value_1).await.unwrap(); + + // Assert: cache populated + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + + // Action: prune expired + cache.prune_expired().await; + + // Assert: nothing expired + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + + // Action: sleep past expiry time then prune expired + sleep(Duration::from_secs(time_between_inserts)).await; + cache.prune_expired().await; + + // Assert: cache empty + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 0); + } + + #[tokio::test] + async fn test_simple_disk_cache_prune_expired_stepped() { + let ttl = 1; + let time_between_inserts = ttl; + let tmp_dir = TempDir::new().unwrap(); + let cache = SimpleDiskCache::new( + "test-cache-3", + tmp_dir.path().to_str().unwrap(), + ttl, // ttl for cache entries + 1000, // purge expired interval set large to not trigger expiry on "set" + None, // max cache size unset + ); + + // Action: populate cache with 2 entries ttl seconds apart + let key_1 = "item-1"; + let value_1 = Bytes::from(vec![1, 2, 3, 4]); + cache.set(key_1, value_1).await.unwrap(); + sleep(Duration::from_secs(time_between_inserts)).await; + // The first entry should have already expired + // but we're not hitting any thresholds for "set" to kick off pruning. + let key_2 = "item-2"; + let value_2 = Bytes::from(vec![5, 6, 7, 8]); + cache.set(key_2, value_2).await.unwrap(); + + // Assert: cache populated + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 2); + + // Action: prune expired + cache.prune_expired().await; + + // Assert: first entry pruned + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + assert!(!metadata.contains_key(key_1)); + assert!(metadata.contains_key(key_2)); + + // Action: sleep ttl then prune expired + sleep(Duration::from_secs(time_between_inserts)).await; + cache.prune_expired().await; + + // Assert: cache empty + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 0); + } + + #[tokio::test] + async fn test_simple_disk_cache_prune_size_triggered() { + // It's the size threshold that triggers a prune on "set". + // The prune expires entries by ttl first and this comes within the size threshold. + // set() -> prune() -> [size threshold hit] -> prune_expired() -> [within size limit] + let ttl = 1; + let time_between_inserts = ttl; + let size = 1000; + let chunk = vec![0; size]; + let tmp_dir = TempDir::new().unwrap(); + let cache = SimpleDiskCache::new( + "test-cache-4", + tmp_dir.path().to_str().unwrap(), + ttl, // ttl for cache entries + 1000, // purge expired interval set large to not trigger expiry on "set" + Some(size * 2), // max cache size accommodates two entries + ); + + // Action: populate cache with large entry + let key_1 = "item-1"; + let value_1 = Bytes::from(chunk.clone()); + cache.set(key_1, value_1).await.unwrap(); + + // Assert: cache populated + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + + // Action: wait ttl and populate cache with 2nd large entry + sleep(Duration::from_secs(time_between_inserts)).await; + let key_2 = "item-2"; + let value_2 = Bytes::from(chunk.clone()); + cache.set(key_2, value_2).await.unwrap(); + + // Assert: 1st entry has been pruned + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + assert!(!metadata.contains_key(key_1)); + assert!(metadata.contains_key(key_2)); + } + + #[tokio::test] + async fn test_simple_disk_cache_prune_size_triggered_2() { + // It's the size threshold that triggers a prune on "set". + // The prune expires entries by ttl first but this doesn't reduce to within the size threshold, + // nothing has expired! + // The size pruning brings the cache to within the threshold. + // set() -> prune() -> [size threshold hit] -> prune_expired() -> [size threshold hit] -> prune_disk_space() -> [within size limit] + let ttl = 10; + let time_between_inserts = 1; + let size = 1000; + let chunk = vec![0; size]; + let tmp_dir = TempDir::new().unwrap(); + let cache = SimpleDiskCache::new( + "test-cache-5", + tmp_dir.path().to_str().unwrap(), + ttl, // ttl for cache entries + 1000, // purge expired interval set large to not trigger expiry on "set" + Some(size * 2), // max cache size accommodates two entries + ); + + // Action: populate cache with large entry + let key_1 = "item-1"; + let value_1 = Bytes::from(chunk.clone()); + cache.set(key_1, value_1).await.unwrap(); + + // Assert: cache populated + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + + // Action: wait 1 sec (less than ttl) and populate cache with 2nd large entry + sleep(Duration::from_secs(time_between_inserts)).await; + let key_2 = "item-2"; + let value_2 = Bytes::from(chunk.clone()); + cache.set(key_2, value_2).await.unwrap(); + + // Assert: 1st entry has been pruned + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + assert!(!metadata.contains_key(key_1)); + assert!(metadata.contains_key(key_2)); + } + + #[tokio::test] + async fn test_simple_disk_cache_prune_periodic_expiry_triggered() { + // It's the periodic expiry check that triggers a prune on "set". + // The expiry time is set so low it should expire the 1st as we add the 2nd entry, and so on. + // set(1st) -> prune() -> [no threshold hit] -> set(2nd) -> [periodic expiry hit] -> prune() -> prune_expired() -> [1st removed] + let ttl = 1; + let time_between_inserts = ttl; + let tmp_dir = TempDir::new().unwrap(); + let cache = SimpleDiskCache::new( + "test-cache-6", + tmp_dir.path().to_str().unwrap(), + ttl, // ttl for cache entries + ttl, // purge expired interval + None, + ); + + // Action: populate cache with 1st entry + let key_1 = "item-1"; + let value_1 = Bytes::from(vec![1, 2, 3, 4]); + cache.set(key_1, value_1).await.unwrap(); + + // Assert: cache populated + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + + // Action: wait ttl sec and populate cache with 2nd entry + sleep(Duration::from_secs(time_between_inserts)).await; + let key_2 = "item-2"; + let value_2 = Bytes::from(vec![1, 2, 3, 4]); + cache.set(key_2, value_2).await.unwrap(); + + // Assert: 1st entry has been pruned + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + assert!(!metadata.contains_key(key_1)); + assert!(metadata.contains_key(key_2)); + + // Action: wait ttl sec and populate cache with 3rd entry + sleep(Duration::from_secs(time_between_inserts)).await; + let key_3 = "item-3"; + let value_3 = Bytes::from(vec![1, 2, 3, 4]); + cache.set(key_3, value_3).await.unwrap(); + + // Assert: 2nd entry has been pruned + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + assert!(!metadata.contains_key(key_2)); + assert!(metadata.contains_key(key_3)); + } + + #[tokio::test] + async fn test_simple_disk_cache_prune_disk_space_headroom() { + // Setup the cache with time and size limits that won't trigger pruning + // when we insert some test data. + // Check we have the content then prune on disk space with a headroom + // equal to the cache size, the cache should preemptively clear. + let max_size_bytes = 10000; + let tmp_dir = TempDir::new().unwrap(); + let cache = SimpleDiskCache::new( + "test-cache-7", + tmp_dir.path().to_str().unwrap(), + 1000, // ttl for cache entries that we shouldn't hit + 1000, // purge expired interval, too infrequent for us to hit + Some(max_size_bytes), // a max size threshold our test data shouldn't hit + ); + + // Action: populate cache with 1st entry + let key_1 = "item-1"; + let value_1 = Bytes::from(vec![1, 2, 3, 4]); + cache.set(key_1, value_1).await.unwrap(); + let key_2 = "item-2"; + let value_2 = Bytes::from(vec![1, 2, 3, 4]); + cache.set(key_2, value_2).await.unwrap(); + + // Assert: no entries should have been purged + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 2); + + // Action: prune disk space setting the headroom to the cache size + assert_eq!(cache.prune_disk_space(max_size_bytes).await, Ok(())); + + // Assert: cache is empty + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 0); + } + + #[tokio::test] + async fn test_simple_disk_cache_chunk_too_big() { + // Setup the cache with a size limit so small it can't accommodate our test data. + let max_size_bytes = 100; + let tmp_dir = TempDir::new().unwrap(); + let cache = SimpleDiskCache::new( + "test-cache-8", + tmp_dir.path().to_str().unwrap(), + 1, // ttl irrelevant for test + 60, // purge interval irrelevant for test + Some(max_size_bytes), // a max size threshold too restrictive + ); + + // Action: populate cache with a chunk that just fits + let key_1 = "item-1"; + let value_1 = Bytes::from(vec![0; max_size_bytes - 1]); + cache.set(key_1, value_1).await.unwrap(); + + // Assert: cache populated + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + + // Action: populate cache with a chunk that fits exactly if the previously stored chunk is removed + let key_2 = "item-2"; + let value_2 = Bytes::from(vec![0; max_size_bytes]); + cache.set(key_2, value_2).await.unwrap(); + + // Assert: cache content replaced + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + assert!(metadata.contains_key(key_2)); + + // Action: populate cache with a chunk that can't fit + let key_3 = "item-3"; + let value_3 = Bytes::from(vec![0; max_size_bytes + 1]); + assert_eq!( + cache.set(key_3, value_3).await, + Err(String::from( + "Chunk cannot fit within cache maximum size threshold" + )) + ); + + // Assert: cache content hasn't changed + let metadata = cache.load_state().await.metadata; + assert_eq!(metadata.len(), 1); + assert!(metadata.contains_key(key_2)); + } +} diff --git a/src/cli.rs b/src/cli.rs index ba6f450..dea982a 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -47,6 +47,48 @@ pub struct CommandLineArgs { /// when use_rayon is false. #[arg(long, env = "REDUCTIONIST_THREAD_LIMIT")] pub thread_limit: Option, + + /// Whether to enable caching of downloaded chunks. + /// Default is disabled. + #[arg(long, default_value_t = false, env = "REDUCTIONIST_USE_CHUNK_CACHE")] + pub use_chunk_cache: bool, + /// Path to the chunk cache store. + /// This is required when the chunk cache is enabled. + #[arg(long, env = "REDUCTIONIST_CHUNK_CACHE_PATH")] + pub chunk_cache_path: Option, + /// Lifespan of cached chunks in seconds. + /// Default is 1 day. + #[arg(long, default_value_t = 86400, env = "REDUCTIONIST_CHUNK_CACHE_AGE")] + pub chunk_cache_age: u64, + /// Minimum interval in seconds between checking for expired chunks based on ttl. + /// Default is 1 hour. + #[arg( + long, + default_value_t = 3600, + env = "REDUCTIONIST_CHUNK_CACHE_PRUNE_INTERVAL" + )] + pub chunk_cache_prune_interval: u64, + /// Whether to apply an upper size limit to the cache. + /// Example values: "300GB", "1TB". + /// Default when unset is unlimited. + #[arg(long, env = "REDUCTIONIST_CHUNK_CACHE_SIZE_LIMIT")] + pub chunk_cache_size_limit: Option, + /// Optional buffer size for queuing commits to the cache. + /// Defaults to the number of CPUs detected. + #[arg(long, env = "REDUCTIONIST_CHUNK_CACHE_QUEUE_SIZE")] + pub chunk_cache_buffer_size: Option, + /// Whether to bypass the upstream S3 auth checks to improve performance + /// when operating on cached chunks. Auth bypass should only be enabled + /// if the server is running on a private network with sufficient access + /// controls since it allows anyone with access to the server to operate + /// on any cached chunk, even if they do not have permission to fetch the + /// original object from the upstream S3 storage server. + #[arg( + long, + default_value_t = false, + env = "REDUCTIONIST_CHUNK_CACHE_BYPASS_AUTH" + )] + pub chunk_cache_bypass_auth: bool, } /// Returns parsed command line arguments. diff --git a/src/error.rs b/src/error.rs index 4f8d7a8..6142687 100644 --- a/src/error.rs +++ b/src/error.rs @@ -3,6 +3,7 @@ use aws_sdk_s3::error::ProvideErrorMetadata; use aws_sdk_s3::error::SdkError; use aws_sdk_s3::operation::get_object::GetObjectError; +use aws_sdk_s3::operation::head_object::HeadObjectError; use aws_smithy_types::byte_stream::error::Error as ByteStreamError; use axum::{ extract::rejection::JsonRejection, @@ -74,6 +75,14 @@ pub enum ActiveStorageError { #[error("error retrieving object from S3 storage")] S3GetObject(#[from] SdkError), + /// Error while retrieving object head from S3 + #[error("error retrieving object metadata from S3 storage")] + S3HeadObject(#[from] SdkError), + + /// HTTP 403 from object store + #[error("error receiving object metadata from S3 storage")] + Forbidden, + /// Error acquiring a semaphore #[error("error acquiring resources")] SemaphoreAcquireError(#[from] AcquireError), @@ -89,6 +98,10 @@ pub enum ActiveStorageError { /// Unsupported operation requested #[error("unsupported operation {operation}")] UnsupportedOperation { operation: String }, + + /// Error using chunk cache + #[error("chunk cache error {error}")] + ChunkCacheError { error: String }, } impl IntoResponse for ActiveStorageError { @@ -221,13 +234,20 @@ impl From for ErrorResponse { | ActiveStorageError::ShapeInvalid(_) => Self::bad_request(&error), // Not found - ActiveStorageError::UnsupportedOperation { operation: _ } => Self::not_found(&error), + ActiveStorageError::UnsupportedOperation { operation: _ } + // If we receive a forbidden from object store, return a + // not found to client to avoid leaking information about + // bucket contents. + | ActiveStorageError::Forbidden => Self::not_found(&error), // Internal server error ActiveStorageError::FromBytes { type_name: _ } | ActiveStorageError::TryFromInt(_) | ActiveStorageError::S3ByteStream(_) - | ActiveStorageError::SemaphoreAcquireError(_) => Self::internal_server_error(&error), + | ActiveStorageError::SemaphoreAcquireError(_) + | ActiveStorageError::ChunkCacheError { error: _ } => { + Self::internal_server_error(&error) + } ActiveStorageError::S3GetObject(sdk_error) => { // Tailor the response based on the specific SdkError variant. @@ -270,6 +290,31 @@ impl From for ErrorResponse { _ => Self::internal_server_error(&error), } } + + ActiveStorageError::S3HeadObject(sdk_error) => { + // Tailor the response based on the specific SdkError variant. + match &sdk_error { + // These are generic SdkError variants. + // Internal server error + SdkError::ConstructionFailure(_) + | SdkError::DispatchFailure(_) + | SdkError::ResponseError(_) + | SdkError::TimeoutError(_) => Self::internal_server_error(&error), + + // This is a more specific ServiceError variant, + // with HeadObjectError as the inner error. + SdkError::ServiceError(head_obj_error) => { + let head_obj_error = head_obj_error.err(); + match head_obj_error { + HeadObjectError::NotFound(_) => Self::bad_request(&error), + // Enum is marked as non-exhaustive + _ => Self::internal_server_error(&error), + } + } + // Enum is marked as non-exhaustive + _ => Self::internal_server_error(&error), + } + } }; // Log server errors. diff --git a/src/lib.rs b/src/lib.rs index b086b69..aa15692 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,7 @@ pub mod app; pub mod array; +pub mod chunk_cache; pub mod cli; pub mod compression; pub mod error; diff --git a/src/metrics.rs b/src/metrics.rs index 5bd3de5..38f1791 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -25,6 +25,11 @@ lazy_static! { }, &["status_code", "http_method", "path"], ).expect("Prometheus metric options should be valid"); + // Disk cache hit counter + pub static ref LOCAL_CACHE_MISSES: IntCounterVec = IntCounterVec::new( + Opts::new("cache_miss", "The number of times the requested chunk was not available in then local chunk cache"), + &["cache"] + ).expect("Prometheus metric options should be valid"); } /// Registers various prometheus metrics with the global registry @@ -39,6 +44,9 @@ pub fn register_metrics() { registry .register(Box::new(RESPONSE_TIME_COLLECTOR.clone())) .expect("Prometheus metrics registration should not fail during initialization"); + registry + .register(Box::new(LOCAL_CACHE_MISSES.clone())) + .expect("Prometheus metrics registration should not fail during initialization"); } /// Returns currently gathered prometheus metrics diff --git a/src/s3_client.rs b/src/s3_client.rs index 9fd715d..c783b73 100644 --- a/src/s3_client.rs +++ b/src/s3_client.rs @@ -1,12 +1,16 @@ //! A simplified S3 client that supports downloading objects. //! It attempts to hide the complexities of working with the AWS SDK for S3. +use std::fmt::Display; + use crate::error::ActiveStorageError; use crate::resource_manager::ResourceManager; use aws_credential_types::Credentials; -use aws_sdk_s3::config::BehaviorVersion; +use aws_sdk_s3::operation::head_object::HeadObjectError; use aws_sdk_s3::Client; +use aws_sdk_s3::{config::BehaviorVersion, error::SdkError}; +use aws_smithy_runtime_api::http::Response; use aws_types::region::Region; use axum::body::Bytes; use hashbrown::HashMap; @@ -92,6 +96,20 @@ impl S3ClientMap { pub struct S3Client { /// Underlying AWS SDK S3 client object. client: Client, + /// A unique identifier for the client + // TODO: Make this a hash of url + access key + secret key + // using https://github.com/RustCrypto/hashes?tab=readme-ov-file + // This will be more urgently required once an ageing mechanism + // is implemented for [crate::S3ClientMap]. + id: String, +} + +// Required so that client can be used as part of the lookup +// key for a local chunk cache. +impl Display for S3Client { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{}", self.id) + } } impl S3Client { @@ -120,7 +138,48 @@ impl S3Client { .force_path_style(true) .build(); let client = Client::from_conf(s3_config); - Self { client } + Self { + client, + id: uuid::Uuid::new_v4().to_string(), + } + } + + /// Checks whether the client is authorised to download an + /// object from object storage. + /// + /// # Arguments + /// + /// * `bucket`: Name of the bucket + /// * `key`: Name of the object in the bucket + pub async fn is_authorised( + &self, + bucket: &str, + key: &str, + ) -> Result> { + let response = self + .client + .head_object() + .bucket(bucket) + .key(key) + .send() + .instrument(tracing::Span::current()) + .await; + + // Strategy here is to return true if client is authorised to download object, + // false if explicitly not authorised (HTTP 403) and pass any other errors or + // responses back to the caller. + match response { + Ok(_) => Ok(true), + Err(err) => match &err { + aws_smithy_runtime_api::client::result::SdkError::ServiceError(inner) => { + match inner.raw().status().as_u16() { + 403 => Ok(false), // HTTP 403 == Forbidden + _ => Err(err), + } + } + _ => Err(err), + }, + } } /// Downloads an object from object storage and returns the data as Bytes @@ -133,7 +192,7 @@ impl S3Client { /// * `resource_manager`: ResourceManager object /// * `mem_permits`: Optional SemaphorePermit for any memory resources reserved pub async fn download_object<'a>( - self: &S3Client, + &self, bucket: &str, key: &str, range: Option, @@ -155,10 +214,21 @@ impl S3Client { .ok_or(ActiveStorageError::S3ContentLengthMissing)? .try_into()?; + // Update memory requested from resource manager to account for actual + // size of data if we were previously unable to guess the size from request + // data's size + offset parameters. // FIXME: how to account for compressed data? - if mem_permits.is_none() { - *mem_permits = resource_manager.memory(content_length).await?; - }; + match mem_permits { + None => { + *mem_permits = resource_manager.memory(content_length).await?; + } + Some(permits) => { + if permits.num_permits() == 0 { + *mem_permits = resource_manager.memory(content_length).await?; + } + } + } + // The data returned by the S3 client does not have any alignment guarantees. In order to // reinterpret the data as an array of numbers with a higher alignment than 1, we need to // return the data in Bytes object in which the underlying data has a higher alignment.