Skip to content

Commit b3d445d

Browse files
committed
dekaf: Introduce transient error retry limit
1 parent 6ecc56f commit b3d445d

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

crates/dekaf/src/read.rs

+5-3
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ impl Read {
8888
let mut buf = bytes::BytesMut::new();
8989

9090
let mut has_had_parsing_error = false;
91+
let mut transient_errors = 0;
9192

9293
while records_bytes < target_bytes {
9394
let read = match tokio::select! {
@@ -115,14 +116,15 @@ impl Read {
115116
continue;
116117
}
117118
},
118-
Err(err) if err.is_transient() => {
119+
Err(err) if err.is_transient() && transient_errors < 5 => {
119120
use rand::Rng;
120121

121-
tracing::warn!(%err, "Retrying transient read error");
122+
transient_errors = transient_errors + 1;
123+
124+
tracing::warn!(error = ?err, "Retrying transient read error");
122125
let delay = Duration::from_millis(rand::thread_rng().gen_range(300..2000));
123126
tokio::time::sleep(delay).await;
124127
// We can retry transient errors just by continuing to poll the stream
125-
// TODO: We might have a counter here and give up after a few attempts
126128
continue;
127129
}
128130
Err(err @ gazette::Error::Parsing { .. }) if !has_had_parsing_error => {

0 commit comments

Comments
 (0)