Skip to content

Commit 21967bd

Browse files
committed
wrap timed streams and iterators in tracing::Spans
1 parent 10bec32 commit 21967bd

File tree

17 files changed

+114
-21
lines changed

17 files changed

+114
-21
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ mac_address = "1.1.5"
142142
rust_decimal = { version = "1.26.1", default-features = false, features = ["std"] }
143143
time = { version = "0.3.36", features = ["formatting", "parsing", "macros"] }
144144
uuid = "1.1.2"
145+
tracing-futures = { version = "0.2.5", features = ["futures-03", "std-future"] }
145146

146147
# Common utility crates
147148
dotenvy = { version = "0.15.0", default-features = false }

sqlx-core/src/any/database.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ impl Database for Any {
3131
type Statement<'q> = AnyStatement<'q>;
3232

3333
const NAME: &'static str = "Any";
34+
const NAME_LOWERCASE: &'static str = "any";
3435

3536
const URL_SCHEMES: &'static [&'static str] = &[];
3637
}

sqlx-core/src/connection.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,10 @@ impl LogSettings {
181181
self.slow_statements_level = level;
182182
self.slow_statements_duration = duration;
183183
}
184+
185+
pub fn tracing_span_level(&self) -> LevelFilter {
186+
std::cmp::max(self.slow_statements_level, self.statements_level)
187+
}
184188
}
185189

186190
pub trait ConnectOptions: 'static + Send + Sync + FromStr<Err = Error> + Debug + Clone {

sqlx-core/src/database.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ pub trait Database: 'static + Sized + Send + Debug {
105105

106106
/// The display name for this database driver.
107107
const NAME: &'static str;
108+
const NAME_LOWERCASE: &'static str;
108109

109110
/// The schemes for database URLs that should match this driver.
110111
const URL_SCHEMES: &'static [&'static str];

sqlx-core/src/logger.rs

Lines changed: 47 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,23 @@ macro_rules! private_tracing_dynamic_event {
3838
}};
3939
}
4040

41+
#[doc(hidden)]
42+
#[macro_export]
43+
macro_rules! private_tracing_dynamic_span {
44+
(target: $target:expr, $level:expr, $($args:tt)*) => {{
45+
use ::tracing::Level;
46+
47+
match $level {
48+
Level::ERROR => ::tracing::span!(target: $target, Level::ERROR, $($args)*),
49+
Level::WARN => ::tracing::span!(target: $target, Level::WARN, $($args)*),
50+
Level::INFO => ::tracing::span!(target: $target, Level::INFO, $($args)*),
51+
Level::DEBUG => ::tracing::span!(target: $target, Level::DEBUG, $($args)*),
52+
Level::TRACE => ::tracing::span!(target: $target, Level::TRACE, $($args)*),
53+
}
54+
}};
55+
}
56+
57+
4158
#[doc(hidden)]
4259
pub fn private_level_filter_to_levels(
4360
filter: log::LevelFilter,
@@ -68,16 +85,33 @@ pub struct QueryLogger<'q> {
6885
rows_affected: u64,
6986
start: Instant,
7087
settings: LogSettings,
88+
pub span: tracing::Span,
7189
}
7290

7391
impl<'q> QueryLogger<'q> {
74-
pub fn new(sql: &'q str, settings: LogSettings) -> Self {
92+
pub fn new(sql: &'q str, db_system: &'q str, settings: LogSettings) -> Self {
93+
use tracing::field::Empty;
94+
95+
let level = private_level_filter_to_trace_level(settings.tracing_span_level()).unwrap_or(tracing::Level::TRACE);
96+
97+
let span = private_tracing_dynamic_span!(
98+
target: "sqlx::query",
99+
level,
100+
"sqlx::query",
101+
summary = Empty,
102+
db.statement = Empty,
103+
db.system = db_system,
104+
rows_affected = Empty,
105+
rows_returned = Empty,
106+
);
107+
75108
Self {
76109
sql,
77110
rows_returned: 0,
78111
rows_affected: 0,
79112
start: Instant::now(),
80113
settings,
114+
span,
81115
}
82116
}
83117

@@ -100,6 +134,7 @@ impl<'q> QueryLogger<'q> {
100134
self.settings.statements_level
101135
};
102136

137+
// only create an event if the level filter is set
103138
if let Some((tracing_level, log_level)) = private_level_filter_to_levels(lvl) {
104139
// The enabled level could be set from either tracing world or log world, so check both
105140
// to see if logging should be enabled for our level
@@ -122,37 +157,35 @@ impl<'q> QueryLogger<'q> {
122157
String::new()
123158
};
124159

125-
if was_slow {
160+
self.span.record("summary", summary);
161+
self.span.record("db.statement", sql);
162+
self.span.record("rows_affected", self.rows_affected);
163+
self.span.record("rows_returned", self.rows_returned);
164+
165+
let _e = self.span.enter();
166+
if was_slow {
126167
private_tracing_dynamic_event!(
127168
target: "sqlx::query",
128169
tracing_level,
129-
summary,
130-
db.statement = sql,
131-
rows_affected = self.rows_affected,
132-
rows_returned = self.rows_returned,
170+
// When logging to JSON, one can trigger alerts from the presence of this field.
171+
slow_threshold=?self.settings.slow_statements_duration,
133172
// Human-friendly - includes units (usually ms). Also kept for backward compatibility
134173
?elapsed,
135174
// Search friendly - numeric
136175
elapsed_secs = elapsed.as_secs_f64(),
137-
// When logging to JSON, one can trigger alerts from the presence of this field.
138-
slow_threshold=?self.settings.slow_statements_duration,
139176
// Make sure to use "slow" in the message as that's likely
140177
// what people will grep for.
141178
"slow statement: execution time exceeded alert threshold"
142-
);
179+
)
143180
} else {
144181
private_tracing_dynamic_event!(
145182
target: "sqlx::query",
146183
tracing_level,
147-
summary,
148-
db.statement = sql,
149-
rows_affected = self.rows_affected,
150-
rows_returned = self.rows_returned,
151184
// Human-friendly - includes units (usually ms). Also kept for backward compatibility
152185
?elapsed,
153186
// Search friendly - numeric
154187
elapsed_secs = elapsed.as_secs_f64(),
155-
);
188+
)
156189
}
157190
}
158191
}

sqlx-core/src/pool/inner.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,9 @@ use crate::connection::Connection;
44
use crate::database::Database;
55
use crate::error::Error;
66
use crate::pool::{deadline_as_timeout, CloseEvent, Pool, PoolOptions};
7+
use crate::private_tracing_dynamic_span;
78
use crossbeam_queue::ArrayQueue;
9+
use tracing::Instrument;
810

911
use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser};
1012

@@ -245,6 +247,10 @@ impl<DB: Database> PoolInner<DB> {
245247
}
246248
}
247249

250+
pub fn tracing_span_level(&self) -> Level {
251+
std::cmp::max(self.acquire_slow_level, self.acquire_time_level).unwrap_or(Level::TRACE)
252+
}
253+
248254
pub(super) async fn acquire(self: &Arc<Self>) -> Result<Floating<DB, Live<DB>>, Error> {
249255
if self.is_closed() {
250256
return Err(Error::PoolClosed);
@@ -253,6 +259,13 @@ impl<DB: Database> PoolInner<DB> {
253259
let acquire_started_at = Instant::now();
254260
let deadline = acquire_started_at + self.options.acquire_timeout;
255261

262+
let span = private_tracing_dynamic_span!(
263+
target: "sqlx::pool::acquire",
264+
self.tracing_span_level(),
265+
"sqlx::pool::acquire",
266+
db.system = DB::NAME_LOWERCASE,
267+
);
268+
256269
let acquired = crate::rt::timeout(
257270
self.options.acquire_timeout,
258271
async {
@@ -295,6 +308,7 @@ impl<DB: Database> PoolInner<DB> {
295308
}
296309
}
297310
)
311+
.instrument(span.clone())
298312
.await
299313
.map_err(|_| Error::PoolTimedOut)??;
300314

@@ -304,6 +318,8 @@ impl<DB: Database> PoolInner<DB> {
304318
.acquire_slow_level
305319
.filter(|_| acquired_after > self.options.acquire_slow_threshold);
306320

321+
let _e = span.enter();
322+
307323
if let Some(level) = acquire_slow_level {
308324
private_tracing_dynamic_event!(
309325
target: "sqlx::pool::acquire",

sqlx-mysql/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ smallvec = "1.7.0"
6868
stringprep = "0.1.2"
6969
thiserror = "1.0.35"
7070
tracing = { version = "0.1.37", features = ["log"] }
71+
tracing-futures.workspace = true
7172
whoami = { version = "1.2.1", default-features = false }
7273

7374
serde = { version = "1.0.144", optional = true }

sqlx-mysql/src/connection/executor.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ use futures_core::future::BoxFuture;
2222
use futures_core::stream::BoxStream;
2323
use futures_core::Stream;
2424
use futures_util::{pin_mut, TryStreamExt};
25+
use sqlx_core::database::Database;
26+
use tracing_futures::Instrument;
2527
use std::{borrow::Cow, sync::Arc};
2628

2729
impl MySqlConnection {
@@ -106,7 +108,8 @@ impl MySqlConnection {
106108
persistent: bool,
107109
) -> Result<impl Stream<Item = Result<Either<MySqlQueryResult, MySqlRow>, Error>> + 'e, Error>
108110
{
109-
let mut logger = QueryLogger::new(sql, self.inner.log_settings.clone());
111+
let mut logger = QueryLogger::new(sql, MySql::NAME_LOWERCASE, self.inner.log_settings.clone());
112+
let span_handle = logger.span.clone();
110113

111114
self.inner.stream.wait_until_ready().await?;
112115
self.inner.stream.waiting.push_back(Waiting::Result);
@@ -240,7 +243,7 @@ impl MySqlConnection {
240243
r#yield!(v);
241244
}
242245
}
243-
}))
246+
}).instrument(span_handle))
244247
}
245248
}
246249

sqlx-mysql/src/database.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ impl Database for MySql {
3131
type Statement<'q> = MySqlStatement<'q>;
3232

3333
const NAME: &'static str = "MySQL";
34+
const NAME_LOWERCASE: &'static str = "mysql";
3435

3536
const URL_SCHEMES: &'static [&'static str] = &["mysql", "mariadb"];
3637
}

sqlx-postgres/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ futures-channel = { version = "0.3.19", default-features = false, features = ["s
3131
futures-core = { version = "0.3.19", default-features = false }
3232
futures-io = "0.3.24"
3333
futures-util = { version = "0.3.19", default-features = false, features = ["alloc", "sink", "io"] }
34+
tracing-futures.workspace = true
3435

3536
# Cryptographic Primitives
3637
crc = "3.0.0"

sqlx-postgres/src/connection/executor.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ use futures_core::stream::BoxStream;
1717
use futures_core::Stream;
1818
use futures_util::{pin_mut, TryStreamExt};
1919
use sqlx_core::arguments::Arguments;
20+
use sqlx_core::database::Database;
2021
use sqlx_core::Either;
22+
use tracing_futures::Instrument;
2123
use std::{borrow::Cow, sync::Arc};
2224

2325
async fn prepare(
@@ -195,7 +197,8 @@ impl PgConnection {
195197
persistent: bool,
196198
metadata_opt: Option<Arc<PgStatementMetadata>>,
197199
) -> Result<impl Stream<Item = Result<Either<PgQueryResult, PgRow>, Error>> + 'e, Error> {
198-
let mut logger = QueryLogger::new(query, self.log_settings.clone());
200+
let mut logger = QueryLogger::new(query, Postgres::NAME_LOWERCASE, self.log_settings.clone());
201+
let span_handle = logger.span.clone();
199202

200203
// before we continue, wait until we are "ready" to accept more queries
201204
self.wait_until_ready().await?;
@@ -362,7 +365,7 @@ impl PgConnection {
362365
}
363366

364367
Ok(())
365-
})
368+
}.instrument(span_handle))
366369
}
367370
}
368371

sqlx-postgres/src/connection/stream.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use futures_channel::mpsc::UnboundedSender;
66
use futures_util::SinkExt;
77
use log::Level;
88
use sqlx_core::bytes::Buf;
9+
use sqlx_core::database::Database;
910

1011
use crate::connection::tls::MaybeUpgradeTls;
1112
use crate::error::Error;
@@ -14,7 +15,7 @@ use crate::message::{
1415
ParameterStatus, ReceivedMessage,
1516
};
1617
use crate::net::{self, BufferedSocket, Socket};
17-
use crate::{PgConnectOptions, PgDatabaseError, PgSeverity};
18+
use crate::{PgConnectOptions, PgDatabaseError, PgSeverity, Postgres};
1819

1920
// the stream is a separate type from the connection to uphold the invariant where an instantiated
2021
// [PgConnection] is a **valid** connection to postgres
@@ -185,6 +186,7 @@ impl PgStream {
185186
sqlx_core::private_tracing_dynamic_event!(
186187
target: "sqlx::postgres::notice",
187188
tracing_level,
189+
db.system = Postgres::NAME_LOWERCASE,
188190
message = notice.message()
189191
);
190192
}

sqlx-postgres/src/database.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ impl Database for Postgres {
3333
type Statement<'q> = PgStatement<'q>;
3434

3535
const NAME: &'static str = "PostgreSQL";
36+
const NAME_LOWERCASE: &'static str = "postgresql";
3637

3738
const URL_SCHEMES: &'static [&'static str] = &["postgres", "postgresql"];
3839
}

sqlx-sqlite/src/connection/execute.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@ use crate::connection::{ConnectionHandle, ConnectionState};
22
use crate::error::Error;
33
use crate::logger::QueryLogger;
44
use crate::statement::{StatementHandle, VirtualStatement};
5-
use crate::{SqliteArguments, SqliteQueryResult, SqliteRow};
5+
use crate::{Sqlite, SqliteArguments, SqliteQueryResult, SqliteRow};
6+
use sqlx_core::database::Database;
67
use sqlx_core::Either;
8+
use tracing::Span;
79

810
pub struct ExecuteIter<'a> {
911
handle: &'a mut ConnectionHandle,
@@ -16,6 +18,7 @@ pub struct ExecuteIter<'a> {
1618
args_used: usize,
1719

1820
goto_next: bool,
21+
span: Span,
1922
}
2023

2124
pub(crate) fn iter<'a>(
@@ -27,7 +30,8 @@ pub(crate) fn iter<'a>(
2730
// fetch the cached statement or allocate a new one
2831
let statement = conn.statements.get(query, persistent)?;
2932

30-
let logger = QueryLogger::new(query, conn.log_settings.clone());
33+
let logger = QueryLogger::new(query, Sqlite::NAME_LOWERCASE, conn.log_settings.clone());
34+
let span = logger.span.clone();
3135

3236
Ok(ExecuteIter {
3337
handle: &mut conn.handle,
@@ -36,6 +40,7 @@ pub(crate) fn iter<'a>(
3640
args,
3741
args_used: 0,
3842
goto_next: true,
43+
span
3944
})
4045
}
4146

@@ -67,6 +72,8 @@ impl Iterator for ExecuteIter<'_> {
6772
type Item = Result<Either<SqliteQueryResult, SqliteRow>, Error>;
6873

6974
fn next(&mut self) -> Option<Self::Item> {
75+
let _e = self.span.enter();
76+
7077
let statement = if self.goto_next {
7178
let statement = match self.statement.prepare_next(self.handle) {
7279
Ok(Some(statement)) => statement,

0 commit comments

Comments
 (0)