Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,46 +2,62 @@
# In alphabetical order
members = [
"arrow_util",
"authz",
"backoff",
"catalog_cache",
"client_util",
"data_types",
"datafusion_util",
"error_reporting",
"executor",
"flightsql",
"futures_test_utils",
"generated_types",
"influxdb_influxql_parser",
"influxdb_iox_client",
"influxdb_line_protocol",
"influxdb2_client",
"ingester_query_grpc",
"iox_http",
"iox_http_util",
"iox_query",
"iox_query_influxql",
"iox_query_influxql_rewrite",
"iox_query",
"iox_query_params",
"iox_system_tables",
"iox_time",
"iox_v1_query_api",
"jemalloc_stats",
"linear_buffer",
"logfmt",
"meta_data_cache",
"metric_exporters",
"metric",
"metric_exporters",
"mutable_batch",
"mutable_batch_lp",
"mutable_batch_lp/fuzz",
"mutable_batch",
"object_store_mem_cache",
"object_store_metrics",
"object_store_mock",
"object_store_size_hinting",
"observability_deps",
"panic_logging",
"parquet_file",
"partition",
"predicate",
"query_functions",
"schema",
"service_common",
"service_grpc_flight",
"sharder",
"tokio_metrics_bridge",
"test_helpers",
"test_helpers_authz",
"tokio_metrics_bridge",
"tokio_watchdog",
"tower_trailer",
"trace",
"trace_exporters",
"trace_http",
"trace",
"tracker",
"trogging",
"workspace-hack",
Expand Down Expand Up @@ -76,6 +92,7 @@ arrow-schema = { version = "55" }
bincode = { version = "2", default-features = false, features = ["alloc", "derive"] }
# Use DataFusion fork (see below)
datafusion = { version = "49" }
datafusion-proto = { version = "49" }
hashbrown = { version = "0.14.5" }
http = { version = "1" }
http-body = { version = "1" }
Expand Down Expand Up @@ -164,6 +181,10 @@ opt-level = 3
# version. Should you ever encounter a `datafusion-*` crate that is duplicated (i.e. `Cargo.lock` lists both the
# crates.io release and our fork), just add it to this list here.
datafusion = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "ee81b1cc652bde6c131973d091b178836692112d" }
datafusion-common = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "ee81b1cc652bde6c131973d091b178836692112d" }
datafusion-expr = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "ee81b1cc652bde6c131973d091b178836692112d" }
datafusion-proto = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "ee81b1cc652bde6c131973d091b178836692112d" }
datafusion-sql = { git = "https://github.com/influxdata/arrow-datafusion.git", rev = "ee81b1cc652bde6c131973d091b178836692112d" }

# Patching Arrow
#
Expand Down
4 changes: 2 additions & 2 deletions catalog_cache/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license.workspace = true
workspace = true

[dependencies]
bytes = "1.10"
bytes = "1.11"
dashmap = "6.1.0"
futures = "0.3"
generated_types = { path = "../generated_types" }
Expand All @@ -29,7 +29,7 @@ workspace-hack = { version = "0.1", path = "../workspace-hack" }
metric = { path = "../metric" }

[dev-dependencies]
criterion = "0.7.0"
criterion = "0.8.1"
data_types = { path = "../data_types" }

[[bench]]
Expand Down
5 changes: 5 additions & 0 deletions catalog_cache/src/api/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,11 @@ impl CatalogCacheClient {
}
}

/// Get endpoint.
pub fn endpoint(&self) -> &Url {
&self.endpoint
}

/// URL to given key.
fn url(&self, path: RequestPath) -> Url {
// try to construct URL rather cheaply, a true builder doesn't really exists yet, see https://github.com/servo/rust-url/issues/835
Expand Down
91 changes: 84 additions & 7 deletions catalog_cache/src/api/quorum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,47 @@ use futures::future::{Either, select};
use futures::{StreamExt, pin_mut};
use snafu::{ResultExt, Snafu};
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
use tokio::task::JoinError;
use tokio_util::sync::CancellationToken;
use tracing::info;
use url::Url;

fn display_remote_generations(g: &[RemoteGeneration; 2]) -> impl std::fmt::Display {
let mut out = String::new();

out.push('[');

for (idx, (url, res)) in g.iter().enumerate() {
if idx > 0 {
write!(&mut out, ", ").unwrap();
}

write!(&mut out, "'{url}'=>").unwrap();

match res.as_ref() {
Ok(Some(generation)) => {
write!(&mut out, "Ok({generation})").unwrap();
}
Ok(None) => {
write!(&mut out, "Ok(MISSING)").unwrap();
}
Err(e) => {
write!(&mut out, "Err({e})").unwrap();
}
}
}

out.push(']');

out
}

/// Generation that we got from a remote endpoint.
///
/// Used for [error reporting](Error).
pub type RemoteGeneration = (Url, Result<Option<u64>, ClientError>);

/// Error for [`QuorumCatalogCache`]
#[expect(missing_docs)]
Expand All @@ -26,9 +63,14 @@ pub enum Error {
#[snafu(display("Join Error: {source}"))]
Join { source: JoinError },

#[snafu(display("Failed to establish a read quorum: {generations:?}"))]
#[snafu(display(
"Failed to establish a read quorum: local={}, remote={}",
local_generation.map(|g| g.to_string()).unwrap_or_else(|| "MISSING".to_owned()),
display_remote_generations(remote_generations),
))]
Quorum {
generations: [Result<Option<u64>, ClientError>; 3],
local_generation: Option<u64>,
remote_generations: Box<[RemoteGeneration; 2]>,
},

#[snafu(display("Failed to list replica: {source}"))]
Expand Down Expand Up @@ -110,11 +152,17 @@ impl QuorumCatalogCache {
Ok(Some(l))
}
(l, r1, r2) => Err(Error::Quorum {
generations: [
Ok(l.map(|x| x.generation)),
r1.map(|x| x.map(|x| x.generation)),
r2.map(|x| x.map(|x| x.generation)),
],
local_generation: l.map(|x| x.generation()),
remote_generations: Box::new([
(
self.replicas[0].endpoint().clone(),
r1.map(|x| x.map(|x| x.generation)),
),
(
self.replicas[1].endpoint().clone(),
r2.map(|x| x.map(|x| x.generation)),
),
]),
}),
}
}
Expand Down Expand Up @@ -696,4 +744,33 @@ mod tests {
"Expected Quorum error, got: {err}"
);
}

#[test]
fn quorum_err_display() {
assert_eq!(
Error::Quorum {
local_generation: None,
remote_generations: Box::new([
("http://foo".parse().unwrap(), Ok(None)),
("http://bar".parse().unwrap(), Ok(None)),
]),
}
.to_string(),
"Failed to establish a read quorum: local=MISSING, remote=['http://foo/'=>Ok(MISSING), 'http://bar/'=>Ok(MISSING)]",
);
assert_eq!(
Error::Quorum {
local_generation: Some(1),
remote_generations: Box::new([
("http://foo".parse().unwrap(), Ok(Some(1))),
(
"http://bar".parse().unwrap(),
Err(ClientError::MissingGeneration)
),
]),
}
.to_string(),
"Failed to establish a read quorum: local=1, remote=['http://foo/'=>Ok(1), 'http://bar/'=>Err(Missing generation header)]",
);
}
}
2 changes: 1 addition & 1 deletion data_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ workspace = true
[dependencies]
arrow = { workspace = true }
arrow-buffer = { workspace = true }
bytes = "1.10"
bytes = "1.11"
chrono = { version = "0.4", default-features = false }
croaring = "2.5.1"
influxdb-line-protocol = { path = "../influxdb_line_protocol" }
Expand Down
4 changes: 4 additions & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ ignore = [
# paste is unmaintained but does not have any known issues;
# there is no replacement at this time
"RUSTSEC-2024-0436",
# rustls-pemfile is unmaintained but is a thin wrapper around rustls-pki-types;
# fixed in object_store main: https://github.com/apache/arrow-rs-object-store/pull/565
# TODO: remove once object_store > 0.12.4 is released
"RUSTSEC-2025-0134",
]
git-fetch-with-cli = true

Expand Down
21 changes: 14 additions & 7 deletions executor/src/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,19 @@ pub fn register_current_runtime_for_io() {
register_io_runtime(Some(Handle::current()));
}

/// Get handle to IO runtime.
///
/// # Panic
/// Needs a IO runtime [registered](register_io_runtime).
pub fn get_io_runtime() -> Handle {
IO_RUNTIME.with_borrow(|h| h.clone()).expect(
"No IO runtime registered. If you hit this panic, it likely \
means a DataFusion plan or other CPU bound work is running on the \
a tokio threadpool used for IO. Try spawning the work using \
`DedicatedExcutor::spawn` or for tests `register_current_runtime_for_io`",
)
}

/// Runs `fut` on the runtime registered by [`register_io_runtime`] if any,
/// otherwise awaits on the current thread
///
Expand All @@ -35,13 +48,7 @@ where
Fut: Future + Send + 'static,
Fut::Output: Send,
{
let h = IO_RUNTIME.with_borrow(|h| h.clone()).expect(
"No IO runtime registered. If you hit this panic, it likely \
means a DataFusion plan or other CPU bound work is running on the \
a tokio threadpool used for IO. Try spawning the work using \
`DedicatedExcutor::spawn` or for tests `register_current_runtime_for_io`",
);
DropGuard(h.spawn(fut)).await
DropGuard(get_io_runtime().spawn(fut)).await
}

struct DropGuard<T>(JoinHandle<T>);
Expand Down
2 changes: 1 addition & 1 deletion executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use futures::{

use tracing::warn;

pub use io::{register_current_runtime_for_io, register_io_runtime, spawn_io};
pub use io::{get_io_runtime, register_current_runtime_for_io, register_io_runtime, spawn_io};

const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(60 * 5);

Expand Down
2 changes: 1 addition & 1 deletion flightsql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ generated_types = { path = "../generated_types" }
iox_query = { path = "../iox_query" }
iox_query_params = { path = "../iox_query_params/" }
# Crates.io dependencies, in alphabetical order
bytes = "1.10"
bytes = "1.11"
futures = "0.3"
snafu = "0.8"
prost = { workspace = true }
Expand Down
2 changes: 1 addition & 1 deletion generated_types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ uuid = { version = "1" }
workspace-hack = { version = "0.1", path = "../workspace-hack" }

[dev-dependencies]
bytes = "1.10"
bytes = "1.11"

[build-dependencies] # In alphabetical order
tonic-build = { version = "0.12" }
Expand Down
2 changes: 1 addition & 1 deletion influxdb2_client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ license.workspace = true
workspace = true

[dependencies] # In alphabetical order
bytes = "1.10"
bytes = "1.11"
futures = { version = "0.3", default-features = false }
reqwest = { workspace = true, features = ["stream", "json", "rustls-tls-native-roots"] }
serde = { version = "1.0", features = ["derive"] }
Expand Down
24 changes: 12 additions & 12 deletions influxdb2_client/src/models/ast/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,46 @@ pub struct Expression {
pub operator: Option<String>,
/// Left leaf
#[serde(skip_serializing_if = "Option::is_none")]
pub left: Option<Box<crate::models::ast::Expression>>,
pub left: Option<Box<Self>>,
/// Right leaf
#[serde(skip_serializing_if = "Option::is_none")]
pub right: Option<Box<crate::models::ast::Expression>>,
pub right: Option<Box<Self>>,
/// Parent Expression
#[serde(skip_serializing_if = "Option::is_none")]
pub callee: Option<Box<crate::models::ast::Expression>>,
pub callee: Option<Box<Self>>,
/// Function arguments
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub arguments: Vec<crate::models::ast::Expression>,
pub arguments: Vec<Self>,
/// Test Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub test: Option<Box<crate::models::ast::Expression>>,
pub test: Option<Box<Self>>,
/// Alternate Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub alternate: Option<Box<crate::models::ast::Expression>>,
pub alternate: Option<Box<Self>>,
/// Consequent Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub consequent: Option<Box<crate::models::ast::Expression>>,
pub consequent: Option<Box<Self>>,
/// Object Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub object: Option<Box<crate::models::ast::Expression>>,
pub object: Option<Box<Self>>,
/// PropertyKey Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub property: Option<Box<crate::models::ast::PropertyKey>>,
/// Array Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub array: Option<Box<crate::models::ast::Expression>>,
pub array: Option<Box<Self>>,
/// Index Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub index: Option<Box<crate::models::ast::Expression>>,
pub index: Option<Box<Self>>,
/// Properties
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub properties: Vec<crate::models::ast::Property>,
/// Expression
#[serde(skip_serializing_if = "Option::is_none")]
pub expression: Option<Box<crate::models::ast::Expression>>,
pub expression: Option<Box<Self>>,
/// Argument
#[serde(skip_serializing_if = "Option::is_none")]
pub argument: Option<Box<crate::models::ast::Expression>>,
pub argument: Option<Box<Self>>,
/// Call Expr
#[serde(skip_serializing_if = "Option::is_none")]
pub call: Option<crate::models::ast::CallExpression>,
Expand Down
2 changes: 1 addition & 1 deletion influxdb2_client/src/models/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ pub struct HealthCheck {
pub message: Option<String>,
/// Checks
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub checks: Vec<crate::models::HealthCheck>,
pub checks: Vec<Self>,
/// Status
pub status: Status,
/// Version
Expand Down
Loading