Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 42 additions & 32 deletions server/src-backend/server/src/dap_client.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use std::{io::Read, net::Ipv4Addr, sync::atomic::AtomicUsize, time::Duration};
use std::{net::Ipv4Addr, ops::DerefMut, sync::atomic::AtomicUsize, time::Duration};

use anyhow::Context;
use backoff::{ExponentialBackoffBuilder, future::retry};
use bytes::{Bytes, BytesMut};
use bytes::Bytes;
use dap_types::types::{ProtocolMessage, RequestArguments};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader},
net::{
TcpStream,
tcp::{OwnedReadHalf, OwnedWriteHalf},
Expand Down Expand Up @@ -100,7 +100,6 @@ impl DapProcess {
}

async fn send(&self, requests: &[RequestArguments]) -> anyhow::Result<()> {
// TODO: i have no idea which ordering i should use here
let first_request_id = self
.sequence_id
.fetch_add(requests.len(), std::sync::atomic::Ordering::SeqCst);
Expand Down Expand Up @@ -131,37 +130,48 @@ impl DapProcess {
}

async fn receive(&self) -> anyhow::Result<Vec<ProtocolMessage>> {
let mut read_buffer = BytesMut::with_capacity(1024 * 64);

let bytes = {
self.tcp_read
.lock()
.await
.read_buf(&mut read_buffer)
.await?
};
let mut messages = Vec::new();

let mut tcp_stream = self.tcp_read.lock().await;
let mut reader = BufReader::new(tcp_stream.deref_mut());

// while there are messages in the tcp stream
loop {
let mut line_buffer = String::new();

let content_length = {
let bytes_read = reader.read_line(&mut line_buffer).await?;
anyhow::ensure!(bytes_read != 0, "tcp stream closed");

line_buffer.truncate(line_buffer.len() - 2); // remove \r\n

line_buffer
.split_once("Content-Length: ")
.map_or(None, |(_, length)| length.parse::<usize>().ok())
.context("could not parse content length header")?
};

if bytes == 0 {
anyhow::bail!("DAP Connection was closed")
line_buffer.clear();
let bytes_read = reader.read_line(&mut line_buffer).await?;
anyhow::ensure!(bytes_read != 0, "tcp stream closed");

// NOTE: a dap message can have multiple headers, but only Content-Length is required
// and standardised. for simplicity only check for this header for now.
anyhow::ensure!(line_buffer == "\r\n", "expected end of headers");

let mut message = vec![0u8; content_length];
reader.read_exact(&mut message).await?;

let parsed = serde_json::from_slice::<ProtocolMessage>(&message)?;

messages.push(parsed);

if reader.buffer().is_empty() {
break;
}
}

let message = read_buffer.bytes().collect::<Result<Vec<u8>, _>>()?;
let message = std::str::from_utf8(&message)?.to_owned();

Ok(message
.split("Content-Length: ")
.skip(1)
.filter_map(|body| {
let first_index = body.find("{")?;
let json = &body[first_index..];

serde_json::from_str(json)
.inspect_err(|err| {
tracing::error!("Failed parsing DAP response: {}\n{}", json, err)
})
.ok()
})
.collect::<Vec<_>>())
Ok(messages)
}
}

Expand Down
Loading