Skip to content

Commit 0c25da4

Browse files
committed
wire up APIs
1 parent 2c23e9d commit 0c25da4

File tree

2 files changed

+187
-45
lines changed

2 files changed

+187
-45
lines changed

codex-rs/app-server/src/codex_message_processor.rs

Lines changed: 180 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ use codex_app_server_protocol::CancelLoginAccountParams;
2323
use codex_app_server_protocol::CancelLoginAccountResponse;
2424
use codex_app_server_protocol::CancelLoginChatGptResponse;
2525
use codex_app_server_protocol::ClientRequest;
26+
use codex_app_server_protocol::CommandExecutionRequest;
27+
use codex_app_server_protocol::CommandExecutionRequestApprovalParams;
28+
use codex_app_server_protocol::CommandExecutionRequestApprovalResponse;
2629
use codex_app_server_protocol::ConversationGitInfo;
2730
use codex_app_server_protocol::ConversationSummary;
2831
use codex_app_server_protocol::ExecCommandApprovalParams;
@@ -31,6 +34,10 @@ use codex_app_server_protocol::ExecOneOffCommandParams;
3134
use codex_app_server_protocol::ExecOneOffCommandResponse;
3235
use codex_app_server_protocol::FeedbackUploadParams;
3336
use codex_app_server_protocol::FeedbackUploadResponse;
37+
use codex_app_server_protocol::FileChange as V2FileChange;
38+
use codex_app_server_protocol::FileChangeRequest;
39+
use codex_app_server_protocol::FileChangeRequestApprovalParams;
40+
use codex_app_server_protocol::FileChangeRequestApprovalResponse;
3441
use codex_app_server_protocol::FuzzyFileSearchParams;
3542
use codex_app_server_protocol::FuzzyFileSearchResponse;
3643
use codex_app_server_protocol::GetAccountParams;
@@ -62,12 +69,15 @@ use codex_app_server_protocol::ModelListParams;
6269
use codex_app_server_protocol::ModelListResponse;
6370
use codex_app_server_protocol::NewConversationParams;
6471
use codex_app_server_protocol::NewConversationResponse;
72+
use codex_app_server_protocol::ParsedCommand as V2ParsedCommand;
6573
use codex_app_server_protocol::RemoveConversationListenerParams;
6674
use codex_app_server_protocol::RemoveConversationSubscriptionResponse;
6775
use codex_app_server_protocol::RequestId;
6876
use codex_app_server_protocol::Result as JsonRpcResult;
6977
use codex_app_server_protocol::ResumeConversationParams;
7078
use codex_app_server_protocol::ResumeConversationResponse;
79+
use codex_app_server_protocol::ReviewDecision as V2ReviewDecision;
80+
use codex_app_server_protocol::SandboxCommandAssessment as V2SandboxCommandAssessment;
7181
use codex_app_server_protocol::SandboxMode;
7282
use codex_app_server_protocol::SendUserMessageParams;
7383
use codex_app_server_protocol::SendUserMessageResponse;
@@ -1259,7 +1269,7 @@ impl CodexMessageProcessor {
12591269
// Auto-attach a conversation listener when starting a thread.
12601270
// Use the same behavior as the v1 API with experimental_raw_events=false.
12611271
if let Err(err) = self
1262-
.attach_conversation_listener(conversation_id, false)
1272+
.attach_conversation_listener(conversation_id, false, ApiVersion::V2)
12631273
.await
12641274
{
12651275
tracing::warn!(
@@ -1537,7 +1547,7 @@ impl CodexMessageProcessor {
15371547
}) => {
15381548
// Auto-attach a conversation listener when resuming a thread.
15391549
if let Err(err) = self
1540-
.attach_conversation_listener(conversation_id, false)
1550+
.attach_conversation_listener(conversation_id, false, ApiVersion::V2)
15411551
.await
15421552
{
15431553
tracing::warn!(
@@ -2390,7 +2400,7 @@ impl CodexMessageProcessor {
23902400
experimental_raw_events,
23912401
} = params;
23922402
match self
2393-
.attach_conversation_listener(conversation_id, experimental_raw_events)
2403+
.attach_conversation_listener(conversation_id, experimental_raw_events, ApiVersion::V1)
23942404
.await
23952405
{
23962406
Ok(subscription_id) => {
@@ -2431,6 +2441,7 @@ impl CodexMessageProcessor {
24312441
&mut self,
24322442
conversation_id: ConversationId,
24332443
experimental_raw_events: bool,
2444+
api_version: ApiVersion,
24342445
) -> Result<Uuid, JSONRPCErrorError> {
24352446
let conversation = match self
24362447
.conversation_manager
@@ -2454,6 +2465,7 @@ impl CodexMessageProcessor {
24542465

24552466
let outgoing_for_task = self.outgoing.clone();
24562467
let pending_interrupts = self.pending_interrupts.clone();
2468+
let api_version_for_task = api_version;
24572469
tokio::spawn(async move {
24582470
loop {
24592471
tokio::select! {
@@ -2509,6 +2521,7 @@ impl CodexMessageProcessor {
25092521
conversation.clone(),
25102522
outgoing_for_task.clone(),
25112523
pending_interrupts.clone(),
2524+
api_version_for_task,
25122525
)
25132526
.await;
25142527
}
@@ -2657,6 +2670,7 @@ async fn apply_bespoke_event_handling(
26572670
conversation: Arc<CodexConversation>,
26582671
outgoing: Arc<OutgoingMessageSender>,
26592672
pending_interrupts: PendingInterrupts,
2673+
api_version: ApiVersion,
26602674
) {
26612675
let Event { id: event_id, msg } = event;
26622676
match msg {
@@ -2665,48 +2679,99 @@ async fn apply_bespoke_event_handling(
26652679
changes,
26662680
reason,
26672681
grant_root,
2668-
}) => {
2669-
let params = ApplyPatchApprovalParams {
2670-
conversation_id,
2671-
call_id,
2672-
file_changes: changes,
2673-
reason,
2674-
grant_root,
2675-
};
2676-
let rx = outgoing
2677-
.send_request(ServerRequestPayload::ApplyPatchApproval(params))
2678-
.await;
2679-
// TODO(mbolin): Enforce a timeout so this task does not live indefinitely?
2680-
tokio::spawn(async move {
2681-
on_patch_approval_response(event_id, rx, conversation).await;
2682-
});
2683-
}
2682+
}) => match api_version {
2683+
ApiVersion::V1 => {
2684+
let params = ApplyPatchApprovalParams {
2685+
conversation_id,
2686+
call_id,
2687+
file_changes: changes,
2688+
reason,
2689+
grant_root,
2690+
};
2691+
let rx = outgoing
2692+
.send_request(ServerRequestPayload::ApplyPatchApproval(params))
2693+
.await;
2694+
tokio::spawn(async move {
2695+
on_patch_approval_response(event_id, rx, conversation).await;
2696+
});
2697+
}
2698+
ApiVersion::V2 => {
2699+
let item_id = call_id.clone();
2700+
let request = FileChangeRequest {
2701+
call_id,
2702+
file_changes: changes
2703+
.into_iter()
2704+
.map(|(path, change)| (path, V2FileChange::from(change)))
2705+
.collect(),
2706+
reason,
2707+
grant_root,
2708+
};
2709+
let params = FileChangeRequestApprovalParams {
2710+
thread_id: conversation_id.to_string(),
2711+
turn_id: event_id.clone(),
2712+
item_id,
2713+
request,
2714+
};
2715+
let rx = outgoing
2716+
.send_request(ServerRequestPayload::FileChangeRequestApproval(params))
2717+
.await;
2718+
tokio::spawn(async move {
2719+
on_file_change_request_approval_response(event_id, rx, conversation).await;
2720+
});
2721+
}
2722+
},
26842723
EventMsg::ExecApprovalRequest(ExecApprovalRequestEvent {
26852724
call_id,
26862725
command,
26872726
cwd,
26882727
reason,
26892728
risk,
26902729
parsed_cmd,
2691-
}) => {
2692-
let params = ExecCommandApprovalParams {
2693-
conversation_id,
2694-
call_id,
2695-
command,
2696-
cwd,
2697-
reason,
2698-
risk,
2699-
parsed_cmd,
2700-
};
2701-
let rx = outgoing
2702-
.send_request(ServerRequestPayload::ExecCommandApproval(params))
2703-
.await;
2704-
2705-
// TODO(mbolin): Enforce a timeout so this task does not live indefinitely?
2706-
tokio::spawn(async move {
2707-
on_exec_approval_response(event_id, rx, conversation).await;
2708-
});
2709-
}
2730+
}) => match api_version {
2731+
ApiVersion::V1 => {
2732+
let params = ExecCommandApprovalParams {
2733+
conversation_id,
2734+
call_id,
2735+
command,
2736+
cwd,
2737+
reason,
2738+
risk,
2739+
parsed_cmd,
2740+
};
2741+
let rx = outgoing
2742+
.send_request(ServerRequestPayload::ExecCommandApproval(params))
2743+
.await;
2744+
tokio::spawn(async move {
2745+
on_exec_approval_response(event_id, rx, conversation).await;
2746+
});
2747+
}
2748+
ApiVersion::V2 => {
2749+
let item_id = call_id.clone();
2750+
let request = CommandExecutionRequest {
2751+
call_id,
2752+
command,
2753+
cwd,
2754+
reason,
2755+
risk: risk.map(V2SandboxCommandAssessment::from),
2756+
parsed_cmd: parsed_cmd.into_iter().map(V2ParsedCommand::from).collect(),
2757+
};
2758+
let params = CommandExecutionRequestApprovalParams {
2759+
thread_id: conversation_id.to_string(),
2760+
turn_id: event_id.clone(),
2761+
item_id,
2762+
request,
2763+
};
2764+
let rx = outgoing
2765+
.send_request(ServerRequestPayload::CommandExecutionRequestApproval(
2766+
params,
2767+
))
2768+
.await;
2769+
tokio::spawn(async move {
2770+
on_command_execution_request_approval_response(event_id, rx, conversation)
2771+
.await;
2772+
});
2773+
}
2774+
},
27102775
EventMsg::TokenCount(token_count_event) => {
27112776
if let Some(rate_limits) = token_count_event.rate_limits {
27122777
outgoing
@@ -2851,6 +2916,83 @@ async fn on_exec_approval_response(
28512916
}
28522917
}
28532918

2919+
async fn on_file_change_request_approval_response(
2920+
event_id: String,
2921+
receiver: oneshot::Receiver<JsonRpcResult>,
2922+
codex: Arc<CodexConversation>,
2923+
) {
2924+
let response = receiver.await;
2925+
let value = match response {
2926+
Ok(value) => value,
2927+
Err(err) => {
2928+
error!("request failed: {err:?}");
2929+
if let Err(submit_err) = codex
2930+
.submit(Op::PatchApproval {
2931+
id: event_id.clone(),
2932+
decision: ReviewDecision::Denied,
2933+
})
2934+
.await
2935+
{
2936+
error!("failed to submit denied PatchApproval after request failure: {submit_err}");
2937+
}
2938+
return;
2939+
}
2940+
};
2941+
2942+
let response = serde_json::from_value::<FileChangeRequestApprovalResponse>(value)
2943+
.unwrap_or_else(|err| {
2944+
error!("failed to deserialize FileChangeRequestApprovalResponse: {err}");
2945+
FileChangeRequestApprovalResponse {
2946+
decision: V2ReviewDecision::Denied,
2947+
}
2948+
});
2949+
2950+
let decision = response.decision.to_core();
2951+
if let Err(err) = codex
2952+
.submit(Op::PatchApproval {
2953+
id: event_id,
2954+
decision,
2955+
})
2956+
.await
2957+
{
2958+
error!("failed to submit PatchApproval: {err}");
2959+
}
2960+
}
2961+
2962+
async fn on_command_execution_request_approval_response(
2963+
event_id: String,
2964+
receiver: oneshot::Receiver<JsonRpcResult>,
2965+
conversation: Arc<CodexConversation>,
2966+
) {
2967+
let response = receiver.await;
2968+
let value = match response {
2969+
Ok(value) => value,
2970+
Err(err) => {
2971+
error!("request failed: {err:?}");
2972+
return;
2973+
}
2974+
};
2975+
2976+
let response = serde_json::from_value::<CommandExecutionRequestApprovalResponse>(value)
2977+
.unwrap_or_else(|err| {
2978+
error!("failed to deserialize CommandExecutionRequestApprovalResponse: {err}");
2979+
CommandExecutionRequestApprovalResponse {
2980+
decision: V2ReviewDecision::Denied,
2981+
}
2982+
});
2983+
2984+
let decision = response.decision.to_core();
2985+
if let Err(err) = conversation
2986+
.submit(Op::ExecApproval {
2987+
id: event_id,
2988+
decision,
2989+
})
2990+
.await
2991+
{
2992+
error!("failed to submit ExecApproval: {err}");
2993+
}
2994+
}
2995+
28542996
async fn read_summary_from_rollout(
28552997
path: &Path,
28562998
fallback_provider: &str,

codex-rs/app-server/tests/suite/v2/turn_start.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use app_test_support::create_shell_sse_response;
77
use app_test_support::to_response;
88
use codex_app_server_protocol::JSONRPCNotification;
99
use codex_app_server_protocol::JSONRPCResponse;
10+
use codex_app_server_protocol::ParsedCommand;
1011
use codex_app_server_protocol::RequestId;
1112
use codex_app_server_protocol::ServerRequest;
1213
use codex_app_server_protocol::ThreadStartParams;
@@ -17,7 +18,6 @@ use codex_app_server_protocol::TurnStartedNotification;
1718
use codex_app_server_protocol::UserInput as V2UserInput;
1819
use codex_core::protocol_config_types::ReasoningEffort;
1920
use codex_core::protocol_config_types::ReasoningSummary;
20-
use codex_protocol::parse_command::ParsedCommand;
2121
use codex_protocol::protocol::Event;
2222
use codex_protocol::protocol::EventMsg;
2323
use core_test_support::skip_if_no_network;
@@ -235,7 +235,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
235235
.await??;
236236
let ThreadStartResponse { thread } = to_response::<ThreadStartResponse>(start_resp)?;
237237

238-
// turn/start — expect ExecCommandApproval request from server
238+
// turn/start — expect CommandExecutionRequestApproval request from server
239239
let first_turn_id = mcp
240240
.send_turn_start_request(TurnStartParams {
241241
thread_id: thread.id.clone(),
@@ -258,12 +258,12 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
258258
mcp.read_stream_until_request_message(),
259259
)
260260
.await??;
261-
let ServerRequest::ExecCommandApproval { request_id, params } = server_req else {
262-
panic!("expected ExecCommandApproval request");
261+
let ServerRequest::CommandExecutionRequestApproval { request_id, params } = server_req else {
262+
panic!("expected CommandExecutionRequestApproval request");
263263
};
264-
assert_eq!(params.call_id, "call1");
264+
assert_eq!(params.request.call_id, "call1");
265265
assert_eq!(
266-
params.parsed_cmd,
266+
params.request.parsed_cmd,
267267
vec![ParsedCommand::Unknown {
268268
cmd: "python3 -c 'print(42)'".to_string()
269269
}]
@@ -302,7 +302,7 @@ async fn turn_start_exec_approval_toggle_v2() -> Result<()> {
302302
)
303303
.await??;
304304

305-
// Ensure we do NOT receive an ExecCommandApproval request before task completes
305+
// Ensure we do NOT receive a CommandExecutionRequestApproval request before task completes
306306
timeout(
307307
DEFAULT_READ_TIMEOUT,
308308
mcp.read_stream_until_notification_message("codex/event/task_complete"),

0 commit comments

Comments
 (0)