Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@ pub struct ServerConfig {
pub max_in_flight_requests_per_connection: usize,
pub max_request_bytes: usize,
pub max_statements_per_request: usize,
pub statement_timeout_ms: Option<u64>,
pub max_memory_intensive_requests: usize,
pub max_scan_rows: usize,
pub max_sort_rows: usize,
pub max_join_rows: usize,
pub max_query_result_rows: usize,
pub max_query_result_bytes: usize,
pub max_concurrent_queries_per_identity: Option<usize>,
}

impl Default for ServerConfig {
Expand All @@ -94,11 +97,14 @@ impl Default for ServerConfig {
max_in_flight_requests_per_connection: defaults.max_in_flight_requests_per_connection,
max_request_bytes: defaults.max_request_bytes,
max_statements_per_request: defaults.max_statements_per_request,
statement_timeout_ms: defaults.statement_timeout_ms,
max_memory_intensive_requests: defaults.max_memory_intensive_requests,
max_scan_rows: defaults.max_scan_rows,
max_sort_rows: defaults.max_sort_rows,
max_join_rows: defaults.max_join_rows,
max_query_result_rows: defaults.max_query_result_rows,
max_query_result_bytes: defaults.max_query_result_bytes,
max_concurrent_queries_per_identity: defaults.max_concurrent_queries_per_identity,
}
}
}
Expand Down Expand Up @@ -278,11 +284,14 @@ pub struct StartupDiagnostics {
pub server_max_in_flight_requests_per_connection: usize,
pub server_max_request_bytes: usize,
pub server_max_statements_per_request: usize,
pub server_statement_timeout_ms: Option<u64>,
pub server_max_memory_intensive_requests: usize,
pub server_max_scan_rows: usize,
pub server_max_sort_rows: usize,
pub server_max_join_rows: usize,
pub server_max_query_result_rows: usize,
pub server_max_query_result_bytes: usize,
pub server_max_concurrent_queries_per_identity: Option<usize>,
pub wal_segment_size_bytes: u64,
pub wal_sync_mode: SyncModeConfig,
pub sstable_data_block_size_bytes: usize,
Expand All @@ -309,6 +318,10 @@ impl StartupDiagnostics {
),
format!("server.max_request_bytes={}", self.server_max_request_bytes),
format!("server.max_statements_per_request={}", self.server_max_statements_per_request),
format!(
"server.statement_timeout_ms={}",
format_optional_u64(self.server_statement_timeout_ms)
),
format!(
"server.max_memory_intensive_requests={}",
self.server_max_memory_intensive_requests
Expand All @@ -317,6 +330,11 @@ impl StartupDiagnostics {
format!("server.max_sort_rows={}", self.server_max_sort_rows),
format!("server.max_join_rows={}", self.server_max_join_rows),
format!("server.max_query_result_rows={}", self.server_max_query_result_rows),
format!("server.max_query_result_bytes={}", self.server_max_query_result_bytes),
format!(
"server.max_concurrent_queries_per_identity={}",
format_optional_usize(self.server_max_concurrent_queries_per_identity)
),
format!("wal.segment_size_bytes={}", self.wal_segment_size_bytes),
format!("wal.sync_mode={}", self.wal_sync_mode.as_str()),
format!("sstable.data_block_size_bytes={}", self.sstable_data_block_size_bytes),
Expand Down Expand Up @@ -411,6 +429,9 @@ impl LsmdbConfig {
if self.server.max_statements_per_request == 0 {
return Err(invalid("server.max_statements_per_request", "must be > 0"));
}
if matches!(self.server.statement_timeout_ms, Some(0)) {
return Err(invalid("server.statement_timeout_ms", "must be > 0 when set"));
}
if self.server.max_memory_intensive_requests == 0 {
return Err(invalid("server.max_memory_intensive_requests", "must be > 0"));
}
Expand All @@ -426,6 +447,15 @@ impl LsmdbConfig {
if self.server.max_query_result_rows == 0 {
return Err(invalid("server.max_query_result_rows", "must be > 0"));
}
if self.server.max_query_result_bytes == 0 {
return Err(invalid("server.max_query_result_bytes", "must be > 0"));
}
if matches!(self.server.max_concurrent_queries_per_identity, Some(0)) {
return Err(invalid(
"server.max_concurrent_queries_per_identity",
"must be > 0 when set",
));
}
if self.wal.segment_size_bytes < MIN_WAL_SEGMENT_SIZE_BYTES {
return Err(invalid(
"wal.segment_size_bytes",
Expand Down Expand Up @@ -491,13 +521,18 @@ impl LsmdbConfig {
.max_in_flight_requests_per_connection,
server_max_request_bytes: runtime.server_limits.max_request_bytes,
server_max_statements_per_request: runtime.server_limits.max_statements_per_request,
server_statement_timeout_ms: runtime.server_limits.statement_timeout_ms,
server_max_memory_intensive_requests: runtime
.server_limits
.max_memory_intensive_requests,
server_max_scan_rows: runtime.server_limits.max_scan_rows,
server_max_sort_rows: runtime.server_limits.max_sort_rows,
server_max_join_rows: runtime.server_limits.max_join_rows,
server_max_query_result_rows: runtime.server_limits.max_query_result_rows,
server_max_query_result_bytes: runtime.server_limits.max_query_result_bytes,
server_max_concurrent_queries_per_identity: runtime
.server_limits
.max_concurrent_queries_per_identity,
wal_segment_size_bytes: storage.wal_options.segment_size_bytes,
wal_sync_mode: SyncModeConfig::from(storage.wal_options.sync_mode),
sstable_data_block_size_bytes: storage.sstable_builder_options.data_block_size_bytes,
Expand Down Expand Up @@ -579,11 +614,14 @@ impl LsmdbConfig {
.max_in_flight_requests_per_connection,
max_request_bytes: self.server.max_request_bytes,
max_statements_per_request: self.server.max_statements_per_request,
statement_timeout_ms: self.server.statement_timeout_ms,
max_memory_intensive_requests: self.server.max_memory_intensive_requests,
max_scan_rows: self.server.max_scan_rows,
max_sort_rows: self.server.max_sort_rows,
max_join_rows: self.server.max_join_rows,
max_query_result_rows: self.server.max_query_result_rows,
max_query_result_bytes: self.server.max_query_result_bytes,
max_concurrent_queries_per_identity: self.server.max_concurrent_queries_per_identity,
}
}
}
Expand All @@ -592,6 +630,14 @@ fn invalid(field: &'static str, message: impl Into<String>) -> ConfigError {
ConfigError::InvalidValue { field, message: message.into() }
}

fn format_optional_u64(value: Option<u64>) -> String {
value.map(|value| value.to_string()).unwrap_or_else(|| "none".to_string())
}

fn format_optional_usize(value: Option<usize>) -> String {
value.map(|value| value.to_string()).unwrap_or_else(|| "none".to_string())
}

fn bloom_params_for_fpr(fpr: f64) -> (usize, u8) {
let ln2 = std::f64::consts::LN_2;
let bits = (-(fpr.ln()) / (ln2 * ln2)).ceil().max(1.0) as usize;
Expand Down
11 changes: 8 additions & 3 deletions src/executor/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,36 @@ use crate::catalog::Catalog;
use crate::mvcc::Transaction;
use crate::planner::DeleteNode;

use super::ExecutionError;
use super::filter::evaluate_predicate;
use super::scan::scan_table_rows;
use super::{ExecutionContext, ExecutionError, apply_staged_writes};

pub(crate) fn execute_delete(
catalog: &Catalog,
tx: &mut Transaction,
node: &DeleteNode,
context: &ExecutionContext<'_>,
) -> Result<u64, ExecutionError> {
context.checkpoint()?;
let table = catalog
.get_table(&node.table)
.ok_or_else(|| ExecutionError::TableNotFound(node.table.clone()))?;
let (_, rows) = scan_table_rows(catalog, tx, &table.name, usize::MAX)?;
let (_, rows) = scan_table_rows(catalog, tx, &table.name, usize::MAX, context)?;

let mut affected = 0_u64;
let mut staged = std::collections::BTreeMap::new();
for stored in rows {
context.checkpoint()?;
if let Some(predicate) = &node.predicate {
if !evaluate_predicate(predicate, &stored.values, Some(&table.name))? {
continue;
}
}

tx.delete(&stored.key)?;
staged.insert(stored.key, None);
affected = affected.saturating_add(1);
}

apply_staged_writes(tx, staged)?;
Ok(affected)
}
39 changes: 27 additions & 12 deletions src/executor/filter.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,24 @@
use crate::sql::ast::{BinaryOp, Expr, UnaryOp};

use super::{ExecutionError, Row, RowSet, ScalarValue, literal_to_scalar, scalar_type_name};

pub(crate) fn apply_filter(input: RowSet, predicate: &Expr) -> Result<RowSet, ExecutionError> {
use super::{
ExecutionContext, ExecutionError, Row, RowSet, ScalarValue, literal_to_scalar, scalar_type_name,
};

pub(crate) fn apply_filter(
input: RowSet,
predicate: &Expr,
context: &ExecutionContext<'_>,
) -> Result<RowSet, ExecutionError> {
let RowSet { columns, rows, table_name } = input;

let filtered_rows = rows
.into_iter()
.filter_map(|row| match evaluate_predicate(predicate, &row, table_name.as_deref()) {
Ok(true) => Some(Ok(row)),
Ok(false) => None,
Err(err) => Some(Err(err)),
})
.collect::<Result<Vec<_>, _>>()?;
let mut filtered_rows = Vec::new();
for row in rows {
context.checkpoint()?;
match evaluate_predicate(predicate, &row, table_name.as_deref())? {
true => filtered_rows.push(row),
false => {}
}
}

Ok(RowSet { columns, rows: filtered_rows, table_name })
}
Expand Down Expand Up @@ -312,8 +318,17 @@ fn as_numeric(value: &ScalarValue) -> Result<Numeric, ExecutionError> {
#[cfg(test)]
mod tests {
use super::*;
use crate::executor::ExecutionLimits;
use crate::executor::governance::ExecutionGovernance;
use crate::sql::ast::LiteralValue;

fn test_context() -> ExecutionContext<'static> {
ExecutionContext {
limits: Box::leak(Box::new(ExecutionLimits::default())),
governance: Box::leak(Box::new(ExecutionGovernance::default())),
}
}

#[test]
fn evaluates_arithmetic_expression() {
let expr = Expr::Binary {
Expand Down Expand Up @@ -353,7 +368,7 @@ mod tests {
right: Box::new(Expr::Literal(LiteralValue::Integer(2))),
};

let filtered = apply_filter(row_set, &predicate).expect("filter");
let filtered = apply_filter(row_set, &predicate, &test_context()).expect("filter");
assert_eq!(filtered.rows.len(), 1);
}
}
125 changes: 125 additions & 0 deletions src/executor/governance.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use std::sync::Arc;
use std::sync::atomic::{AtomicU8, Ordering};
use std::time::{Duration, Instant};

use super::ExecutionError;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CancellationReason {
UserRequested = 1,
}

impl CancellationReason {
pub fn as_str(self) -> &'static str {
match self {
CancellationReason::UserRequested => "user requested cancellation",
}
}
}

#[derive(Debug, Default)]
struct CancellationState {
reason: AtomicU8,
}

#[derive(Debug, Clone)]
pub struct StatementCancellation {
state: Arc<CancellationState>,
}

impl Default for StatementCancellation {
fn default() -> Self {
Self::new()
}
}

impl StatementCancellation {
pub fn new() -> Self {
Self { state: Arc::new(CancellationState::default()) }
}

pub fn cancel(&self) -> bool {
self.state
.reason
.compare_exchange(
0,
CancellationReason::UserRequested as u8,
Ordering::SeqCst,
Ordering::SeqCst,
)
.is_ok()
}

pub fn reason(&self) -> Option<CancellationReason> {
match self.state.reason.load(Ordering::SeqCst) {
0 => None,
1 => Some(CancellationReason::UserRequested),
other => {
debug_assert_eq!(other, CancellationReason::UserRequested as u8);
Some(CancellationReason::UserRequested)
}
}
}
}

#[derive(Debug, Clone, Copy)]
pub struct StatementDeadline {
deadline: Instant,
timeout: Duration,
}

impl StatementDeadline {
pub fn after(timeout: Duration) -> Self {
Self { deadline: Instant::now() + timeout, timeout }
}

pub fn is_elapsed(self) -> bool {
Instant::now() >= self.deadline
}

pub fn timeout_ms(self) -> u64 {
let millis = self.timeout.as_millis();
u64::try_from(millis).unwrap_or(u64::MAX)
}
}

#[derive(Debug, Clone, Default)]
pub struct ExecutionGovernance {
deadline: Option<StatementDeadline>,
cancellation: Option<StatementCancellation>,
}

impl ExecutionGovernance {
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.deadline = Some(StatementDeadline::after(timeout));
self
}

pub fn with_deadline(mut self, deadline: StatementDeadline) -> Self {
self.deadline = Some(deadline);
self
}

pub fn with_cancellation(mut self, cancellation: StatementCancellation) -> Self {
self.cancellation = Some(cancellation);
self
}

pub fn checkpoint(&self) -> Result<(), ExecutionError> {
if let Some(cancellation) = &self.cancellation {
if let Some(reason) = cancellation.reason() {
return Err(ExecutionError::StatementCanceled { reason: reason.as_str() });
}
}

if let Some(deadline) = self.deadline {
if deadline.is_elapsed() {
return Err(ExecutionError::StatementTimedOut {
timeout_ms: deadline.timeout_ms(),
});
}
}

Ok(())
}
}
Loading
Loading