diff --git a/Cargo.lock b/Cargo.lock index 6ca68e5c..797ee8a9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -171,6 +171,19 @@ dependencies = [ "wait-timeout", ] +[[package]] +name = "async-compression" +version = "0.4.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98ec5f6c2f8bc326c994cb9e241cc257ddaba9afa8555a43cffbb5dd86efaa37" +dependencies = [ + "compression-codecs", + "compression-core", + "futures-core", + "pin-project-lite", + "tokio", +] + [[package]] name = "async-trait" version = "0.1.89" @@ -535,6 +548,23 @@ version = "0.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" +[[package]] +name = "compression-codecs" +version = "0.4.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0f7ac3e5b97fdce45e8922fb05cae2c37f7bbd63d30dd94821dacfd8f3f2bf2" +dependencies = [ + "compression-core", + "flate2", + "memchr", +] + +[[package]] +name = "compression-core" +version = "0.4.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75984efb6ed102a0d42db99afb6c1948f0380d1d91808d5529916e6c08b49d8d" + [[package]] name = "config" version = "0.13.4" @@ -1088,9 +1118,9 @@ checksum = "1ced73b1dacfc750a6db6c0a0c3a3853c8b41997e2e2c563dc90804ae6867959" [[package]] name = "flate2" -version = "1.1.2" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d" +checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb" dependencies = [ "crc32fast", "miniz_oxide", @@ -2055,6 +2085,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fa76a2c86f704bdb222d66965fb3d63269ce38518b83cb0575fca855ebb6316" dependencies = [ "adler2", + "simd-adler32", ] [[package]] @@ -2767,7 +2798,9 @@ dependencies = [ "syslog", "tokio", "tokio-metrics", + "tokio-stream", "toml 0.8.23", + "tower-http", "url", "uuid", ] @@ -3183,6 +3216,12 @@ dependencies = [ "libc", ] +[[package]] +name = "simd-adler32" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e320a6c5ad31d271ad523dcf3ad13e2767ad8b1cb8f047f75a8aeaf8da139da2" + [[package]] name = "similar" version = "2.7.0" @@ -3611,6 +3650,26 @@ dependencies = [ "tower-service", ] +[[package]] +name = "tower-http" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e9cd434a998747dd2c4276bc96ee2e0c7a2eadf3cae88e52be55a05fa9053f5" +dependencies = [ + "async-compression", + "bitflags 2.9.4", + "bytes", + "futures-core", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "pin-project-lite", + "tokio", + "tokio-util", + "tower-layer", + "tower-service", +] + [[package]] name = "tower-layer" version = "0.3.3" diff --git a/Cargo.toml b/Cargo.toml index a9a77379..36c75924 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,6 +83,7 @@ rotonda-store = { workspace = true } serde_with = "3" smallvec = { version = "1.11", features = ["const_generics", "const_new", "union"] } tokio-metrics = { version = "0.3", default-features = false } +tokio-stream = "0.1.17" uuid = { version = "1.4", features = ["v4", "fast-rng"] } sha2 = "0.10.8" csv = "1.3.1" @@ -92,6 +93,7 @@ futures-util = "0.3.31" micromap = "0.0.19" paste = "1.0.15" axum = { version = "0.8.4", features = ["query", "http1", "http2", "tokio"], default-features = false } +tower-http = { version = "0.5.2", features = ["compression-gzip"], optional = true } serde_qs = "0.15.0" rshtml = "0.2.0" @@ -115,7 +117,7 @@ strip = true default = ["http-api-gzip"] # Enable GZIP compression of the HTTP /metrics response -http-api-gzip = ["flate2"] +http-api-gzip = ["flate2", "tower-http"] [build-dependencies] rshtml = "0.2.0" diff --git a/src/http_ng.rs b/src/http_ng.rs index 22dc1eb9..92a0bde8 100644 --- a/src/http_ng.rs +++ b/src/http_ng.rs @@ -1,6 +1,8 @@ use std::{net::SocketAddr, sync::{Arc, OnceLock}}; use axum::routing::get; +#[cfg(feature = "http-api-gzip")] +use tower_http::compression::CompressionLayer; use log::{debug, error}; use tokio::{sync::mpsc, task::JoinHandle}; @@ -146,9 +148,13 @@ impl Api { let (signal_tx, signal_rx) = mpsc::channel::<()>(1); self.signal_txs.push(signal_tx); - let app = self.router.clone().with_state( + let mut app = self.router.clone().with_state( self.cloned_api_state() ); + #[cfg(feature = "http-api-gzip")] + { + app = app.layer(CompressionLayer::new()); + } let h = tokio::spawn(async move { let listener = match tokio::net::TcpListener::bind(interface).await { @@ -221,4 +227,3 @@ impl axum::response::IntoResponse for ApiError { ).into_response() } } - diff --git a/src/representation.rs b/src/representation.rs index 9d206617..0a5c30ec 100644 --- a/src/representation.rs +++ b/src/representation.rs @@ -46,7 +46,7 @@ macro_rules! genoutput_json { ($type:ty $(, $val:tt )? ) => { impl GenOutput> for $type { fn write(&self, target: &mut Json) -> Result<(), crate::representation::OutputError> { - let _ = serde_json::to_writer(&mut target.0, &self$(.$val)?).unwrap(); + serde_json::to_writer(&mut target.0, &self$(.$val)?)?; Ok(()) } fn write_seq_start(target: &mut Json) -> Result<(), crate::representation::OutputError> { @@ -71,5 +71,18 @@ pub struct OutputError { } enum OutputErrorType { + Io(std::io::Error), Other, } + +impl From for OutputError { + fn from(err: serde_json::Error) -> Self { + Self { + error_type: if err.is_io() { + OutputErrorType::Io(err.into()) + } else { + OutputErrorType::Other + }, + } + } +} diff --git a/src/units/rib_unit/http_ng.rs b/src/units/rib_unit/http_ng.rs index 088109c0..ea88d479 100644 --- a/src/units/rib_unit/http_ng.rs +++ b/src/units/rib_unit/http_ng.rs @@ -1,6 +1,8 @@ -use std::{collections::HashMap, fmt::Display, net::{Ipv4Addr, Ipv6Addr}}; +use std::{fmt::Display, io, net::{Ipv4Addr, Ipv6Addr}}; +use std::io::Write; -use axum::{extract::{Path, Query, State}, response::IntoResponse}; +use axum::{body::Body, extract::{Path, Query, State}, response::IntoResponse}; +use bytes::Bytes; use inetnum::{addr::Prefix, asn::Asn}; use log::{debug, warn}; use routecore::{bgp::{communities::{LargeCommunity, StandardCommunity}, path_attributes::PathAttributeType, types::AfiSafiType}, bmp::message::RibType}; @@ -8,8 +10,10 @@ use serde::Deserialize; use serde_with::serde_as; use serde_with::formats::CommaSeparator; use serde_with::StringWithSeparator; +use tokio::sync::mpsc; +use tokio_stream::wrappers::ReceiverStream; -use crate::{http_ng::{Api, ApiError, ApiState}, ingress::IngressId, roto_runtime::types::PeerRibType, units::rib_unit::rpki::RovStatus}; +use crate::{http_ng::{Api, ApiError, ApiState}, ingress::IngressId, representation::{GenOutput, Json}, roto_runtime::types::PeerRibType, units::rib_unit::rpki::RovStatus}; // XXX The actual querying of the store should be similar to how we query the ingress register, // i.e. with the Rib unit constructing one type of responses (so a wrapper around the @@ -146,6 +150,62 @@ pub enum Include { LessSpecifics, } +const STREAM_CHUNK_SIZE: usize = 256 * 1024; + +struct ChannelWriter { + sender: mpsc::Sender>, + buffer: Vec, +} + +impl ChannelWriter { + fn new(sender: mpsc::Sender>) -> Self { + Self { + sender, + buffer: Vec::with_capacity(STREAM_CHUNK_SIZE), + } + } + + fn send_buffer(&mut self) -> io::Result<()> { + if self.buffer.is_empty() { + return Ok(()); + } + let chunk = Bytes::copy_from_slice(&self.buffer); + self.buffer.clear(); + self.sender + .blocking_send(Ok(chunk)) + .map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "receiver dropped")) + } +} + +impl io::Write for ChannelWriter { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.buffer.extend_from_slice(buf); + if self.buffer.len() >= STREAM_CHUNK_SIZE { + self.send_buffer()?; + } + Ok(buf.len()) + } + + fn flush(&mut self) -> io::Result<()> { + self.send_buffer() + } +} + +fn stream_search_result( + search_result: super::rib::SearchResult, +) -> impl IntoResponse { + let (tx, rx) = mpsc::channel::>(64); + let stream = ReceiverStream::new(rx); + + tokio::task::spawn_blocking(move || { + let mut writer = ChannelWriter::new(tx); + let _ = search_result.write(&mut Json(&mut writer)); + let _ = writer.flush(); + }); + + ([("content-type", "application/json")], Body::from_stream(stream)) +} + #[derive(Debug)] pub struct UnknownInclude; impl Display for UnknownInclude { @@ -184,19 +244,13 @@ async fn search_ipv4unicast( let prefix = Prefix::new_v4(prefix, prefix_len).map_err(|e| ApiError::BadRequest(e.to_string()))?; let s = state.store.clone(); - let mut res = Vec::new(); - match s.get().map(|store| - store.search_and_output_routes( - crate::representation::Json(&mut res), - AfiSafiType::Ipv4Unicast, - prefix, - filter - ) - ) { - Some(Ok(())) => Ok(([("content-type", "application/json")], res)), - Some(Err(e)) => Err(ApiError::BadRequest(e)), - None => Err(ApiError::InternalServerError("store unavailable".into())) - } + let search_result = match s.get() { + Some(store) => store.search_routes(AfiSafiType::Ipv4Unicast, prefix, filter) + .map_err(ApiError::BadRequest)?, + None => return Err(ApiError::InternalServerError("store unavailable".into())), + }; + + Ok(stream_search_result(search_result)) } // Search all routes, we mimic a 0.0.0.0/0 search, but most (or all) results will actually be @@ -217,19 +271,13 @@ async fn search_ipv6unicast( let prefix = Prefix::new_v6(prefix, prefix_len).map_err(|e| ApiError::BadRequest(e.to_string()))?; let s = state.store.clone(); - let mut res = Vec::new(); - match s.get().map(|store| - store.search_and_output_routes( - crate::representation::Json(&mut res), - AfiSafiType::Ipv6Unicast, - prefix, - filter - ) - ) { - Some(Ok(())) => Ok(([("content-type", "application/json")], res)), - Some(Err(e)) => Err(ApiError::BadRequest(e)), - None => Err(ApiError::InternalServerError("store unavailable".into())) - } + let search_result = match s.get() { + Some(store) => store.search_routes(AfiSafiType::Ipv6Unicast, prefix, filter) + .map_err(ApiError::BadRequest)?, + None => return Err(ApiError::InternalServerError("store unavailable".into())), + }; + + Ok(stream_search_result(search_result)) } // Search all routes, we mimic a ::/0 search, but most (or all) results will actually be