Skip to content

Commit 32207ea

Browse files
authored
Added LogBuffering validation. (#729)
* Added LogBuffering validation. The validation will provide a meaningful error in case the LogBuffering is not configured according to https://docs.aws.amazon.com/lambda/latest/dg/telemetry-api.html\#telemetry-api-buffering * Amended the code based on the PR review: #729 * fixing formatting and linting issues
1 parent 25acb4a commit 32207ea

File tree

2 files changed

+118
-1
lines changed

2 files changed

+118
-1
lines changed

lambda-extension/src/extension.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,9 @@ where
225225

226226
if let Some(mut log_processor) = self.logs_processor {
227227
trace!("Log processor found");
228+
229+
validate_buffering_configuration(self.log_buffering)?;
230+
228231
// Spawn task to run processor
229232
let addr = SocketAddr::from(([0, 0, 0, 0], self.log_port_number));
230233
let make_service = service_fn(move |_socket: &AddrStream| {
@@ -261,6 +264,9 @@ where
261264

262265
if let Some(mut telemetry_processor) = self.telemetry_processor {
263266
trace!("Telemetry processor found");
267+
268+
validate_buffering_configuration(self.telemetry_buffering)?;
269+
264270
// Spawn task to run processor
265271
let addr = SocketAddr::from(([0, 0, 0, 0], self.telemetry_port_number));
266272
let make_service = service_fn(move |_socket: &AddrStream| {

lambda-extension/src/logs.rs

Lines changed: 112 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ use tokio::sync::Mutex;
55
use tower::Service;
66
use tracing::{error, trace};
77

8+
use crate::{Error, ExtensionError};
9+
810
/// Payload received from the Lambda Logs API
911
/// See: https://docs.aws.amazon.com/lambda/latest/dg/runtimes-logs-api.html#runtimes-logs-api-msg
1012
#[derive(Clone, Debug, Deserialize, PartialEq)]
@@ -110,7 +112,7 @@ pub struct LogPlatformReportMetrics {
110112

111113
/// Log buffering configuration.
112114
/// Allows Lambda to buffer logs before deliverying them to a subscriber.
113-
#[derive(Debug, Serialize)]
115+
#[derive(Debug, Serialize, Copy, Clone)]
114116
#[serde(rename_all = "camelCase")]
115117
pub struct LogBuffering {
116118
/// The maximum time (in milliseconds) to buffer a batch.
@@ -124,6 +126,40 @@ pub struct LogBuffering {
124126
pub max_items: usize,
125127
}
126128

129+
static LOG_BUFFERING_MIN_TIMEOUT_MS: usize = 25;
130+
static LOG_BUFFERING_MAX_TIMEOUT_MS: usize = 30_000;
131+
static LOG_BUFFERING_MIN_BYTES: usize = 262_144;
132+
static LOG_BUFFERING_MAX_BYTES: usize = 1_048_576;
133+
static LOG_BUFFERING_MIN_ITEMS: usize = 1_000;
134+
static LOG_BUFFERING_MAX_ITEMS: usize = 10_000;
135+
136+
impl LogBuffering {
137+
fn validate(&self) -> Result<(), Error> {
138+
if self.timeout_ms < LOG_BUFFERING_MIN_TIMEOUT_MS || self.timeout_ms > LOG_BUFFERING_MAX_TIMEOUT_MS {
139+
let error = format!(
140+
"LogBuffering validation error: Invalid timeout_ms: {}. Allowed values: Minumun: {}. Maximum: {}",
141+
self.timeout_ms, LOG_BUFFERING_MIN_TIMEOUT_MS, LOG_BUFFERING_MAX_TIMEOUT_MS
142+
);
143+
return Err(ExtensionError::boxed(error));
144+
}
145+
if self.max_bytes < LOG_BUFFERING_MIN_BYTES || self.max_bytes > LOG_BUFFERING_MAX_BYTES {
146+
let error = format!(
147+
"LogBuffering validation error: Invalid max_bytes: {}. Allowed values: Minumun: {}. Maximum: {}",
148+
self.max_bytes, LOG_BUFFERING_MIN_BYTES, LOG_BUFFERING_MAX_BYTES
149+
);
150+
return Err(ExtensionError::boxed(error));
151+
}
152+
if self.max_items < LOG_BUFFERING_MIN_ITEMS || self.max_items > LOG_BUFFERING_MAX_ITEMS {
153+
let error = format!(
154+
"LogBuffering validation error: Invalid max_items: {}. Allowed values: Minumun: {}. Maximum: {}",
155+
self.max_items, LOG_BUFFERING_MIN_ITEMS, LOG_BUFFERING_MAX_ITEMS
156+
);
157+
return Err(ExtensionError::boxed(error));
158+
}
159+
Ok(())
160+
}
161+
}
162+
127163
impl Default for LogBuffering {
128164
fn default() -> Self {
129165
LogBuffering {
@@ -134,6 +170,18 @@ impl Default for LogBuffering {
134170
}
135171
}
136172

173+
/// Validate the `LogBuffering` configuration (if present)
174+
///
175+
/// # Errors
176+
///
177+
/// This function will return an error if `LogBuffering` is present and configured incorrectly
178+
pub(crate) fn validate_buffering_configuration(log_buffering: Option<LogBuffering>) -> Result<(), Error> {
179+
match log_buffering {
180+
Some(log_buffering) => log_buffering.validate(),
181+
None => Ok(()),
182+
}
183+
}
184+
137185
/// Wrapper function that sends logs to the subscriber Service
138186
///
139187
/// This takes an `hyper::Request` and transforms it into `Vec<LambdaLog>` for the
@@ -303,4 +351,67 @@ mod tests {
303351
},
304352
),
305353
}
354+
355+
macro_rules! log_buffering_configuration_tests {
356+
($($name:ident: $value:expr,)*) => {
357+
$(
358+
#[test]
359+
fn $name() {
360+
let (input, expected) = $value;
361+
let result = validate_buffering_configuration(input);
362+
363+
if let Some(expected) = expected {
364+
assert!(result.is_err());
365+
assert_eq!(result.unwrap_err().to_string(), expected.to_string());
366+
} else {
367+
assert!(result.is_ok());
368+
}
369+
370+
}
371+
)*
372+
}
373+
}
374+
375+
log_buffering_configuration_tests! {
376+
log_buffer_configuration_none_success: (
377+
None,
378+
None::<ExtensionError>
379+
),
380+
log_buffer_configuration_default_success: (
381+
Some(LogBuffering::default()),
382+
None::<ExtensionError>
383+
),
384+
log_buffer_configuration_min_success: (
385+
Some(LogBuffering { timeout_ms: LOG_BUFFERING_MIN_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MIN_BYTES, max_items: LOG_BUFFERING_MIN_ITEMS }),
386+
None::<ExtensionError>
387+
),
388+
log_buffer_configuration_max_success: (
389+
Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS }),
390+
None::<ExtensionError>
391+
),
392+
min_timeout_ms_error: (
393+
Some(LogBuffering { timeout_ms: LOG_BUFFERING_MIN_TIMEOUT_MS-1, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS }),
394+
Some(ExtensionError::boxed("LogBuffering validation error: Invalid timeout_ms: 24. Allowed values: Minumun: 25. Maximum: 30000"))
395+
),
396+
max_timeout_ms_error: (
397+
Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS+1, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS }),
398+
Some(ExtensionError::boxed("LogBuffering validation error: Invalid timeout_ms: 30001. Allowed values: Minumun: 25. Maximum: 30000"))
399+
),
400+
min_max_bytes_error: (
401+
Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MIN_BYTES-1, max_items: LOG_BUFFERING_MAX_ITEMS }),
402+
Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_bytes: 262143. Allowed values: Minumun: 262144. Maximum: 1048576"))
403+
),
404+
max_max_bytes_error: (
405+
Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES+1, max_items: LOG_BUFFERING_MAX_ITEMS }),
406+
Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_bytes: 1048577. Allowed values: Minumun: 262144. Maximum: 1048576"))
407+
),
408+
min_max_items_error: (
409+
Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MIN_ITEMS-1 }),
410+
Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_items: 999. Allowed values: Minumun: 1000. Maximum: 10000"))
411+
),
412+
max_max_items_error: (
413+
Some(LogBuffering { timeout_ms: LOG_BUFFERING_MAX_TIMEOUT_MS, max_bytes: LOG_BUFFERING_MAX_BYTES, max_items: LOG_BUFFERING_MAX_ITEMS+1 }),
414+
Some(ExtensionError::boxed("LogBuffering validation error: Invalid max_items: 10001. Allowed values: Minumun: 1000. Maximum: 10000"))
415+
),
416+
}
306417
}

0 commit comments

Comments
 (0)