From dc4dc0745f2f55b19642b41a07f70459bab54524 Mon Sep 17 00:00:00 2001 From: Leo Katzengruber Date: Tue, 29 Jul 2025 14:53:00 +0200 Subject: [PATCH] refactor tcp receiving code --- server/src-backend/server/src/dap_client.rs | 74 ++++++++++++--------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/server/src-backend/server/src/dap_client.rs b/server/src-backend/server/src/dap_client.rs index b5b4495..1c98da7 100644 --- a/server/src-backend/server/src/dap_client.rs +++ b/server/src-backend/server/src/dap_client.rs @@ -1,11 +1,11 @@ -use std::{io::Read, net::Ipv4Addr, sync::atomic::AtomicUsize, time::Duration}; +use std::{net::Ipv4Addr, ops::DerefMut, sync::atomic::AtomicUsize, time::Duration}; use anyhow::Context; use backoff::{ExponentialBackoffBuilder, future::retry}; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use dap_types::types::{ProtocolMessage, RequestArguments}; use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, + io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}, net::{ TcpStream, tcp::{OwnedReadHalf, OwnedWriteHalf}, @@ -100,7 +100,6 @@ impl DapProcess { } async fn send(&self, requests: &[RequestArguments]) -> anyhow::Result<()> { - // TODO: i have no idea which ordering i should use here let first_request_id = self .sequence_id .fetch_add(requests.len(), std::sync::atomic::Ordering::SeqCst); @@ -131,37 +130,48 @@ impl DapProcess { } async fn receive(&self) -> anyhow::Result> { - let mut read_buffer = BytesMut::with_capacity(1024 * 64); - - let bytes = { - self.tcp_read - .lock() - .await - .read_buf(&mut read_buffer) - .await? - }; + let mut messages = Vec::new(); + + let mut tcp_stream = self.tcp_read.lock().await; + let mut reader = BufReader::new(tcp_stream.deref_mut()); + + // while there are messages in the tcp stream + loop { + let mut line_buffer = String::new(); + + let content_length = { + let bytes_read = reader.read_line(&mut line_buffer).await?; + anyhow::ensure!(bytes_read != 0, "tcp stream closed"); + + line_buffer.truncate(line_buffer.len() - 2); // remove \r\n + + line_buffer + .split_once("Content-Length: ") + .map_or(None, |(_, length)| length.parse::().ok()) + .context("could not parse content length header")? + }; - if bytes == 0 { - anyhow::bail!("DAP Connection was closed") + line_buffer.clear(); + let bytes_read = reader.read_line(&mut line_buffer).await?; + anyhow::ensure!(bytes_read != 0, "tcp stream closed"); + + // NOTE: a dap message can have multiple headers, but only Content-Length is required + // and standardised. for simplicity only check for this header for now. + anyhow::ensure!(line_buffer == "\r\n", "expected end of headers"); + + let mut message = vec![0u8; content_length]; + reader.read_exact(&mut message).await?; + + let parsed = serde_json::from_slice::(&message)?; + + messages.push(parsed); + + if reader.buffer().is_empty() { + break; + } } - let message = read_buffer.bytes().collect::, _>>()?; - let message = std::str::from_utf8(&message)?.to_owned(); - - Ok(message - .split("Content-Length: ") - .skip(1) - .filter_map(|body| { - let first_index = body.find("{")?; - let json = &body[first_index..]; - - serde_json::from_str(json) - .inspect_err(|err| { - tracing::error!("Failed parsing DAP response: {}\n{}", json, err) - }) - .ok() - }) - .collect::>()) + Ok(messages) } }