Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ rotonda-store = { workspace = true }
serde_with = "3"
smallvec = { version = "1.11", features = ["const_generics", "const_new", "union"] }
tokio-metrics = { version = "0.3", default-features = false }
tokio-stream = "0.1.17"
uuid = { version = "1.4", features = ["v4", "fast-rng"] }
sha2 = "0.10.8"
csv = "1.3.1"
Expand All @@ -92,6 +93,7 @@ futures-util = "0.3.31"
micromap = "0.0.19"
paste = "1.0.15"
axum = { version = "0.8.4", features = ["query", "http1", "http2", "tokio"], default-features = false }
tower-http = { version = "0.5.2", features = ["compression-gzip"], optional = true }
serde_qs = "0.15.0"
rshtml = "0.2.0"

Expand All @@ -115,7 +117,7 @@ strip = true
default = ["http-api-gzip"]

# Enable GZIP compression of the HTTP /metrics response
http-api-gzip = ["flate2"]
http-api-gzip = ["flate2", "tower-http"]

[build-dependencies]
rshtml = "0.2.0"
Expand Down
9 changes: 7 additions & 2 deletions src/http_ng.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::{net::SocketAddr, sync::{Arc, OnceLock}};

use axum::routing::get;
#[cfg(feature = "http-api-gzip")]
use tower_http::compression::CompressionLayer;
use log::{debug, error};
use tokio::{sync::mpsc, task::JoinHandle};

Expand Down Expand Up @@ -146,9 +148,13 @@ impl Api {
let (signal_tx, signal_rx) = mpsc::channel::<()>(1);
self.signal_txs.push(signal_tx);

let app = self.router.clone().with_state(
let mut app = self.router.clone().with_state(
self.cloned_api_state()
);
#[cfg(feature = "http-api-gzip")]
{
app = app.layer(CompressionLayer::new());
}

let h = tokio::spawn(async move {
let listener = match tokio::net::TcpListener::bind(interface).await {
Expand Down Expand Up @@ -221,4 +227,3 @@ impl axum::response::IntoResponse for ApiError {
).into_response()
}
}

15 changes: 14 additions & 1 deletion src/representation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ macro_rules! genoutput_json {
($type:ty $(, $val:tt )? ) => {
impl<W: std::io::Write> GenOutput<Json<W>> for $type {
fn write(&self, target: &mut Json<W>) -> Result<(), crate::representation::OutputError> {
let _ = serde_json::to_writer(&mut target.0, &self$(.$val)?).unwrap();
serde_json::to_writer(&mut target.0, &self$(.$val)?)?;
Ok(())
}
fn write_seq_start(target: &mut Json<W>) -> Result<(), crate::representation::OutputError> {
Expand All @@ -71,5 +71,18 @@ pub struct OutputError {
}

enum OutputErrorType {
Io(std::io::Error),
Other,
}

impl From<serde_json::Error> for OutputError {
fn from(err: serde_json::Error) -> Self {
Self {
error_type: if err.is_io() {
OutputErrorType::Io(err.into())
} else {
OutputErrorType::Other
},
}
}
}
106 changes: 77 additions & 29 deletions src/units/rib_unit/http_ng.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use std::{collections::HashMap, fmt::Display, net::{Ipv4Addr, Ipv6Addr}};
use std::{fmt::Display, io, net::{Ipv4Addr, Ipv6Addr}};
use std::io::Write;

use axum::{extract::{Path, Query, State}, response::IntoResponse};
use axum::{body::Body, extract::{Path, Query, State}, response::IntoResponse};
use bytes::Bytes;
use inetnum::{addr::Prefix, asn::Asn};
use log::{debug, warn};
use routecore::{bgp::{communities::{LargeCommunity, StandardCommunity}, path_attributes::PathAttributeType, types::AfiSafiType}, bmp::message::RibType};
use serde::Deserialize;
use serde_with::serde_as;
use serde_with::formats::CommaSeparator;
use serde_with::StringWithSeparator;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;

use crate::{http_ng::{Api, ApiError, ApiState}, ingress::IngressId, roto_runtime::types::PeerRibType, units::rib_unit::rpki::RovStatus};
use crate::{http_ng::{Api, ApiError, ApiState}, ingress::IngressId, representation::{GenOutput, Json}, roto_runtime::types::PeerRibType, units::rib_unit::rpki::RovStatus};

// XXX The actual querying of the store should be similar to how we query the ingress register,
// i.e. with the Rib unit constructing one type of responses (so a wrapper around the
Expand Down Expand Up @@ -146,6 +150,62 @@ pub enum Include {
LessSpecifics,
}

const STREAM_CHUNK_SIZE: usize = 256 * 1024;

struct ChannelWriter {
sender: mpsc::Sender<Result<Bytes, io::Error>>,
buffer: Vec<u8>,
}

impl ChannelWriter {
fn new(sender: mpsc::Sender<Result<Bytes, io::Error>>) -> Self {
Self {
sender,
buffer: Vec::with_capacity(STREAM_CHUNK_SIZE),
}
}

fn send_buffer(&mut self) -> io::Result<()> {
if self.buffer.is_empty() {
return Ok(());
}
let chunk = Bytes::copy_from_slice(&self.buffer);
self.buffer.clear();
self.sender
.blocking_send(Ok(chunk))
.map_err(|_| io::Error::new(io::ErrorKind::BrokenPipe, "receiver dropped"))
}
}

impl io::Write for ChannelWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buffer.extend_from_slice(buf);
if self.buffer.len() >= STREAM_CHUNK_SIZE {
self.send_buffer()?;
}
Ok(buf.len())
}

fn flush(&mut self) -> io::Result<()> {
self.send_buffer()
}
}

fn stream_search_result(
search_result: super::rib::SearchResult,
) -> impl IntoResponse {
let (tx, rx) = mpsc::channel::<Result<Bytes, io::Error>>(64);
let stream = ReceiverStream::new(rx);

tokio::task::spawn_blocking(move || {
let mut writer = ChannelWriter::new(tx);
let _ = search_result.write(&mut Json(&mut writer));
let _ = writer.flush();
});

([("content-type", "application/json")], Body::from_stream(stream))
}

#[derive(Debug)]
pub struct UnknownInclude;
impl Display for UnknownInclude {
Expand Down Expand Up @@ -184,19 +244,13 @@ async fn search_ipv4unicast(

let prefix = Prefix::new_v4(prefix, prefix_len).map_err(|e| ApiError::BadRequest(e.to_string()))?;
let s = state.store.clone();
let mut res = Vec::new();
match s.get().map(|store|
store.search_and_output_routes(
crate::representation::Json(&mut res),
AfiSafiType::Ipv4Unicast,
prefix,
filter
)
) {
Some(Ok(())) => Ok(([("content-type", "application/json")], res)),
Some(Err(e)) => Err(ApiError::BadRequest(e)),
None => Err(ApiError::InternalServerError("store unavailable".into()))
}
let search_result = match s.get() {
Some(store) => store.search_routes(AfiSafiType::Ipv4Unicast, prefix, filter)
.map_err(ApiError::BadRequest)?,
None => return Err(ApiError::InternalServerError("store unavailable".into())),
};

Ok(stream_search_result(search_result))
}

// Search all routes, we mimic a 0.0.0.0/0 search, but most (or all) results will actually be
Expand All @@ -217,19 +271,13 @@ async fn search_ipv6unicast(

let prefix = Prefix::new_v6(prefix, prefix_len).map_err(|e| ApiError::BadRequest(e.to_string()))?;
let s = state.store.clone();
let mut res = Vec::new();
match s.get().map(|store|
store.search_and_output_routes(
crate::representation::Json(&mut res),
AfiSafiType::Ipv6Unicast,
prefix,
filter
)
) {
Some(Ok(())) => Ok(([("content-type", "application/json")], res)),
Some(Err(e)) => Err(ApiError::BadRequest(e)),
None => Err(ApiError::InternalServerError("store unavailable".into()))
}
let search_result = match s.get() {
Some(store) => store.search_routes(AfiSafiType::Ipv6Unicast, prefix, filter)
.map_err(ApiError::BadRequest)?,
None => return Err(ApiError::InternalServerError("store unavailable".into())),
};

Ok(stream_search_result(search_result))
}

// Search all routes, we mimic a ::/0 search, but most (or all) results will actually be
Expand Down