Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 159 additions & 10 deletions app/src/remote_server/server_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use ::ai::index::full_source_code_embedding::manager::{
use ::ai::index::full_source_code_embedding::{
ContentHash, FragmentMetadata as LocalFragmentMetadata, NodeHash,
};
use futures::StreamExt as _;
use remote_server::proto::OpenBufferSuccess;
use repo_metadata::repositories::{DetectedRepositories, RepoDetectionSource};
use repo_metadata::{RepoMetadataEvent, RepoMetadataModel, RepositoryIdentifier};
Expand Down Expand Up @@ -37,13 +38,14 @@ use super::proto::{
get_fragment_metadata_from_hash_response, git_commit_chain_response, git_create_pr_response,
git_generate_commit_message_response, git_get_committed_branch_files_response,
git_get_pr_info_response, git_push_response, host_scoped_request, notification,
resolve_conflict_response, run_command_response, save_buffer_response, server_message,
session_scoped_request, write_file_response, Abort, Authenticate, BranchInfo, BufferEdit,
BufferUpdatedPush, ClientMessage, CloseBuffer, CodebaseIndexLimits, CodebaseIndexStatus,
CodebaseIndexStatusUpdated, CodebaseIndexStatusesSnapshot, CodebaseResyncMode, DeleteFile,
DeleteFileResponse, DeleteFileSuccess, DiscardFilesError, DiscardFilesResponse,
DiscardFilesSuccess, DropCodebaseIndex, ErrorCode, ErrorResponse, FailedFileRead,
FileContextProto, FileOperationError, FragmentMetadata as ProtoFragmentMetadata,
resolve_conflict_response, ripgrep_search_response, run_command_response, save_buffer_response,
server_message, session_scoped_request, write_file_response, Abort, Authenticate, BranchInfo,
BufferEdit, BufferUpdatedPush, ClientMessage, CloseBuffer, CodebaseIndexLimits,
CodebaseIndexStatus, CodebaseIndexStatusUpdated, CodebaseIndexStatusesSnapshot,
CodebaseResyncMode, DeleteFile, DeleteFileResponse, DeleteFileSuccess, DiscardFilesError,
DiscardFilesResponse, DiscardFilesSuccess, DropCodebaseIndex, ErrorCode, ErrorResponse,
FailedFileRead, FileContextProto, FileOperationError,
FragmentMetadata as ProtoFragmentMetadata,
FragmentMetadataLookupError as ProtoFragmentMetadataLookupError,
FragmentMetadataLookupErrorCode, GetBranchesError, GetBranchesResponse, GetBranchesSuccess,
GetDiffStateResponse, GetFragmentMetadataFromHash, GetFragmentMetadataFromHashResponse,
Expand All @@ -56,9 +58,11 @@ use super::proto::{
Initialize, InitializeResponse, MissingFragmentMetadata, NavigatedToDirectory,
NavigatedToDirectoryResponse, OpenBuffer, OpenBufferResponse, ReadFileContextResponse,
ResolveConflict, ResolveConflictResponse, ResolveConflictSuccess, ResyncCodebase,
RunCommandError, RunCommandErrorCode, RunCommandRequest, RunCommandResponse, RunCommandSuccess,
SaveBuffer, SaveBufferResponse, SaveBufferSuccess, ServerMessage, SessionBootstrapped,
TextEdit, UploadHandoffSnapshot, WriteFile, WriteFileResponse, WriteFileSuccess,
RipgrepSearchError, RipgrepSearchMatch, RipgrepSearchRequest, RipgrepSearchResponse,
RipgrepSearchSubmatch, RipgrepSearchSuccess, RunCommandError, RunCommandErrorCode,
RunCommandRequest, RunCommandResponse, RunCommandSuccess, SaveBuffer, SaveBufferResponse,
SaveBufferSuccess, ServerMessage, SessionBootstrapped, TextEdit, UploadHandoffSnapshot,
WriteFile, WriteFileResponse, WriteFileSuccess,
};
use super::server_buffer_tracker::{PendingBufferRequestKind, ServerBufferTracker};
use crate::code::global_buffer_model::{GlobalBufferModel, GlobalBufferModelEvent};
Expand All @@ -75,6 +79,15 @@ pub const GRACE_PERIOD: std::time::Duration = std::time::Duration::from_secs(10
/// large ref list.
const MAX_BRANCH_COUNT_CAP: usize = 500;

/// Server-side cap on the number of matched lines returned by `RipgrepSearch`.
const MAX_RIPGREP_SEARCH_MATCH_CAP: usize = 5_000;
/// Approximate payload budget for one remote search response.
///
/// Eight MB keeps transfer latency and memory well below the protocol's
/// 64 MB frame limit. Individual matches are never truncated because doing so
/// could remove a late submatch and corrupt its preview and click location.
const MAX_RIPGREP_SEARCH_RESPONSE_BYTES: usize = 8 * 1024 * 1024;

/// Unique identifier for a connected proxy session in daemon mode.
pub type ConnectionId = uuid::Uuid;
use super::protocol::RequestId;
Expand Down Expand Up @@ -772,6 +785,9 @@ impl ServerModel {
Some(host_scoped_request::Message::GitGetCommittedBranchFiles(m)) => {
self.handle_get_committed_branch_files(m, &request_id, conn_id, ctx)
}
Some(host_scoped_request::Message::RipgrepSearch(m)) => {
self.handle_ripgrep_search(m, &request_id, conn_id, ctx)
}
None => {
log::warn!(
"HostScopedRequest with no inner message (request_id={request_id})"
Expand Down Expand Up @@ -2754,6 +2770,110 @@ impl ServerModel {
HandlerOutcome::Async(Some(handle))
}

/// Handles `RipgrepSearch` — request/response backing global search in
/// remote sessions.
///
/// Runs the same ripgrep subprocess used by local global search (the
/// daemon binary includes the `ripgrep-search` worker subcommand) over
/// the requested roots and responds with all matches once the search
/// completes, capped to bound response size. Cancellable via `Abort`
/// like other async handlers.
fn handle_ripgrep_search(
&mut self,
msg: RipgrepSearchRequest,
request_id: &RequestId,
conn_id: ConnectionId,
ctx: &mut ModelContext<Self>,
) -> HandlerOutcome {
log::info!(
"Handling RipgrepSearch ({} roots, request_id={request_id})",
msg.roots.len()
);

if msg.pattern.is_empty() || msg.roots.is_empty() {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronously validate the reqeust

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is there a way we can move most of this logic to its own file, including the proto helper conversion? this would help isolate the handling of server messages to server_model.rs and prevent the file from growing too large

return ripgrep_search_error_response(
"RipgrepSearch requires a pattern and at least one root".to_string(),
);
}
if let Some(root) = msg.roots.iter().find(|root| !Path::new(root).is_absolute()) {
return ripgrep_search_error_response(format!(
"RipgrepSearch root must be absolute: {root}"
));
}

let roots: Vec<PathBuf> = msg.roots.iter().map(PathBuf::from).collect();
let match_cap = match msg.max_matches as usize {
0 => MAX_RIPGREP_SEARCH_MATCH_CAP,
requested => requested.min(MAX_RIPGREP_SEARCH_MATCH_CAP),
};
let pattern = msg.pattern;
let ignore_case = msg.ignore_case;
let multiline = msg.multiline;

let request_id_for_response = request_id.clone();
let handle = self.spawn_request_handler(
request_id.clone(),
async move {
let stream = warp_ripgrep::search::search_streaming(

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass params to start searching

std::slice::from_ref(&pattern),
&roots,
ignore_case,
multiline,
)?;
futures::pin_mut!(stream);

let mut matches = Vec::new();
let mut response_bytes: usize = 0;
let mut capped = false;
while let Some(m) = stream.next().await {

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

collect results against a match cap, break if we hit the match cap

this version of the design gets all of the results together instead of streaming them as we receive them to the client. we can revisit this if there's performance issues

if matches.len() >= match_cap {
capped = true;
break;
}
let m = ripgrep_match_to_proto(m);

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

convert to proto shape

let match_bytes = m
.file_path
.len()
.saturating_add(m.line_text.len())
.saturating_add(
m.submatches
.len()
.saturating_mul(2 * std::mem::size_of::<u64>()),
);
if response_bytes.saturating_add(match_bytes)
> MAX_RIPGREP_SEARCH_RESPONSE_BYTES
{
capped = true;
break;
}

response_bytes += match_bytes;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're approximating the response bytes by adding up each match's approx bytes to make sure we're staying within a reasonable response size (want to avoid performance issues of huge matches)

matches.push(m);
}
anyhow::Ok(RipgrepSearchSuccess { matches, capped })
},
move |me, result: anyhow::Result<RipgrepSearchSuccess>, _ctx| {
let response = match result {
Ok(success) => RipgrepSearchResponse {
result: Some(ripgrep_search_response::Result::Success(success)),
},
Err(err) => RipgrepSearchResponse {
result: Some(ripgrep_search_response::Result::Error(RipgrepSearchError {
message: format!("{err:#}"),
})),
},
};
me.send_server_message(
Some(conn_id),
Some(&request_id_for_response),
server_message::Message::RipgrepSearchResponse(response),

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

send response

);
},
ctx,
);
HandlerOutcome::Async(Some(handle))
}

/// Handles `DiscardFilesRequest` — request/response.
///
/// Runs git restore/stash on the remote filesystem for the specified files.
Expand Down Expand Up @@ -3258,6 +3378,35 @@ fn invalid_request_response(message: String) -> HandlerOutcome {
}))
}

fn ripgrep_search_error_response(message: String) -> HandlerOutcome {
HandlerOutcome::Sync(server_message::Message::RipgrepSearchResponse(
RipgrepSearchResponse {
result: Some(ripgrep_search_response::Result::Error(RipgrepSearchError {
message,
})),
},
))
}

/// Converts a ripgrep match to its proto form without altering line text or
/// submatch offsets. Response-wide caps bound payload size without corrupting
/// individual matches.
fn ripgrep_match_to_proto(m: warp_ripgrep::search::Match) -> RipgrepSearchMatch {
RipgrepSearchMatch {
file_path: m.file_path.to_string_lossy().to_string(),
line_number: m.line_number,
line_text: m.line_text,
submatches: m
.submatches
.into_iter()
.map(|submatch| RipgrepSearchSubmatch {
byte_start: submatch.byte_start.as_usize() as u64,
byte_end: submatch.byte_end.as_usize() as u64,
})
.collect(),
}
}

fn codebase_index_status_response(status: CodebaseIndexStatus) -> HandlerOutcome {
HandlerOutcome::Sync(server_message::Message::CodebaseIndexStatusUpdated(
CodebaseIndexStatusUpdated {
Expand Down
50 changes: 49 additions & 1 deletion app/src/remote_server/server_model_tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;

use string_offset::ByteOffset;
use warp_ripgrep::search::{Match as RipgrepMatch, Submatch};
use warp_util::standardized_path::StandardizedPath;
use warpui::App;

Expand All @@ -11,7 +14,7 @@ use super::super::proto::{
};
use super::super::protocol::RequestId;
use super::super::server_buffer_tracker::ServerBufferTracker;
use super::{ConnectionId, PendingFileOps, ServerModel};
use super::{ripgrep_match_to_proto, ConnectionId, PendingFileOps, ServerModel};
use crate::auth::auth_state::AuthState;
use crate::code_review::diff_state::DiffMode;
use crate::remote_server::diff_state_tracker::DiffModelKey;
Expand Down Expand Up @@ -268,6 +271,51 @@ fn host_scoped_response_fails_over_when_target_missing() {
});
}

// ── Ripgrep search match conversion ──────────────────────────────────

fn submatch(start: usize, end: usize) -> Submatch {
Submatch {
byte_start: ByteOffset::from(start),
byte_end: ByteOffset::from(end),
}
}

#[test]
fn ripgrep_match_to_proto_maps_fields() {
let m = RipgrepMatch {
file_path: PathBuf::from("/repo/src/main.rs"),
line_number: 42,
line_text: "fn main() {}".to_string(),
submatches: vec![submatch(3, 7)],
};

let proto = ripgrep_match_to_proto(m);

assert_eq!(proto.file_path, "/repo/src/main.rs");
assert_eq!(proto.line_number, 42);
assert_eq!(proto.line_text, "fn main() {}");
assert_eq!(proto.submatches.len(), 1);
assert_eq!(proto.submatches[0].byte_start, 3);
assert_eq!(proto.submatches[0].byte_end, 7);
}

#[test]
fn ripgrep_match_to_proto_preserves_late_submatch_and_full_line() {
let line = format!("{}needle", "x".repeat(8_000));
let m = RipgrepMatch {
file_path: PathBuf::from("/repo/long.rs"),
line_number: 1,
line_text: line.clone(),
submatches: vec![submatch(8_000, 8_006)],
};

let proto = ripgrep_match_to_proto(m);

assert_eq!(proto.line_text, line);
assert_eq!(proto.submatches[0].byte_start, 8_000);
assert_eq!(proto.submatches[0].byte_end, 8_006);
}

#[test]
fn non_host_scoped_response_is_not_failed_over() {
App::test((), |mut app| async move {
Expand Down
52 changes: 52 additions & 0 deletions crates/remote_server/proto/remote_server.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ message HostScopedRequest {
GitGetPrInfoRequest git_get_pr_info = 18;
GitGenerateCommitMessageRequest git_generate_commit_message = 19;
GitGetCommittedBranchFilesRequest git_get_committed_branch_files = 20;
RipgrepSearchRequest ripgrep_search = 21;
}
}

Expand Down Expand Up @@ -118,6 +119,7 @@ message ServerMessage {
GitGetPrInfoResponse git_get_pr_info_response = 30;
GitGenerateCommitMessageResponse git_generate_commit_message_response = 31;
GitGetCommittedBranchFilesResponse git_get_committed_branch_files_response = 32;
RipgrepSearchResponse ripgrep_search_response = 33;
}
}

Expand Down Expand Up @@ -403,6 +405,56 @@ message FileContextProto {
uint32 line_count = 7;
}

// ── Ripgrep search (global search) ────────────────────────────────

// Client → server: run a ripgrep search over the given root directories
// on the host. Used by global search for remote sessions.
message RipgrepSearchRequest {
// Regex pattern. Already escaped client-side for literal (non-regex)
// searches, matching local global search behavior.
string pattern = 1;
// Absolute directories on the host to search.
repeated string roots = 2;
bool ignore_case = 3;
bool multiline = 4;
// Maximum number of matched lines to return. The server also applies its
// own match and approximate payload caps.
uint32 max_matches = 5;
}

// Server → client: result of a RipgrepSearchRequest.
message RipgrepSearchResponse {
oneof result {
RipgrepSearchSuccess success = 1;
RipgrepSearchError error = 2;
}
}

message RipgrepSearchSuccess {
repeated RipgrepSearchMatch matches = 1;
// True when the search stopped early because the match cap was reached.
bool capped = 2;
}

message RipgrepSearchError {
string message = 1;
}

// A single matched line in a file. A line may contain multiple submatches;
// the client expands them into per-submatch result rows.
message RipgrepSearchMatch {
string file_path = 1;
uint32 line_number = 2;
string line_text = 3;
repeated RipgrepSearchSubmatch submatches = 4;
}

// Byte offsets into `line_text` for one submatch (end exclusive).
message RipgrepSearchSubmatch {
uint64 byte_start = 1;
uint64 byte_end = 2;
}

// ── Remote codebase indexing status ───────────────────────────────

message IndexCodebase {
Expand Down
1 change: 1 addition & 0 deletions crates/remote_server/src/host_response_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ fn every_host_scoped_request_has_a_response_disposition() {
M::GitGetPrInfo(_) => "manager::get_pr_info",
M::GitGenerateCommitMessage(_) => "manager::generate_commit_message",
M::GitGetCommittedBranchFiles(_) => "manager::get_committed_branch_files",
M::RipgrepSearch(_) => "manager::start_ripgrep_search",
}
}

Expand Down
Loading
Loading