@@ -114,39 +114,7 @@ async fn run_streaming_polish(
114114 // from what the user actually sees\"。
115115 let ( tx, rx) = std:: sync:: mpsc:: channel :: < String > ( ) ;
116116 let typer_handle = tokio:: task:: spawn_blocking ( move || {
117- let mut typed_text = String :: new ( ) ;
118- let mut first_failure: Option < String > = None ;
119- let mut pending = String :: new ( ) ;
120- while let Ok ( delta) = rx. recv ( ) {
121- pending. push_str ( & delta) ;
122- let flush_at = std:: time:: Instant :: now ( ) + STREAMING_INSERT_FLUSH_INTERVAL ;
123- loop {
124- let now = std:: time:: Instant :: now ( ) ;
125- if now >= flush_at {
126- break ;
127- }
128- match rx. recv_timeout ( flush_at. duration_since ( now) ) {
129- Ok ( delta) => pending. push_str ( & delta) ,
130- Err ( std:: sync:: mpsc:: RecvTimeoutError :: Timeout ) => break ,
131- Err ( std:: sync:: mpsc:: RecvTimeoutError :: Disconnected ) => {
132- first_failure =
133- flush_streaming_insert_buffer ( & mut pending, & mut typed_text) ;
134- return ( typed_text, first_failure) ;
135- }
136- }
137- }
138- first_failure = flush_streaming_insert_buffer ( & mut pending, & mut typed_text) ;
139- if first_failure. is_some ( ) {
140- // 一旦类型链路出错(如 Secure Input 启用),后续 delta 全部丢弃,但仍
141- // 把 mpsc drain 完,避免发送端阻塞。
142- while rx. recv ( ) . is_ok ( ) { }
143- break ;
144- }
145- }
146- if first_failure. is_none ( ) {
147- first_failure = flush_streaming_insert_buffer ( & mut pending, & mut typed_text) ;
148- }
149- ( typed_text, first_failure)
117+ drain_streaming_insert_deltas ( rx, STREAMING_INSERT_FLUSH_INTERVAL )
150118 } ) ;
151119
152120 // 3. 调流式润色,on_delta 塞 mpsc;should_cancel 检查 dictation 取消旗。
@@ -283,13 +251,77 @@ async fn run_streaming_polish(
283251 }
284252}
285253
254+ fn drain_streaming_insert_deltas (
255+ rx : std:: sync:: mpsc:: Receiver < String > ,
256+ flush_interval : std:: time:: Duration ,
257+ ) -> ( String , Option < String > ) {
258+ drain_streaming_insert_deltas_with ( rx, flush_interval, flush_streaming_insert_buffer)
259+ }
260+
261+ fn drain_streaming_insert_deltas_with < F > (
262+ rx : std:: sync:: mpsc:: Receiver < String > ,
263+ flush_interval : std:: time:: Duration ,
264+ mut flush_pending : F ,
265+ ) -> ( String , Option < String > )
266+ where
267+ F : FnMut ( & mut String , & mut String ) -> Option < String > ,
268+ {
269+ let mut typed_text = String :: new ( ) ;
270+ let mut first_failure: Option < String > = None ;
271+ let mut pending = String :: new ( ) ;
272+ while let Ok ( delta) = rx. recv ( ) {
273+ pending. push_str ( & delta) ;
274+ let flush_at = std:: time:: Instant :: now ( ) + flush_interval;
275+ loop {
276+ let now = std:: time:: Instant :: now ( ) ;
277+ if now >= flush_at {
278+ break ;
279+ }
280+ match rx. recv_timeout ( flush_at. duration_since ( now) ) {
281+ Ok ( delta) => pending. push_str ( & delta) ,
282+ Err ( std:: sync:: mpsc:: RecvTimeoutError :: Timeout ) => break ,
283+ Err ( std:: sync:: mpsc:: RecvTimeoutError :: Disconnected ) => {
284+ first_failure = flush_pending ( & mut pending, & mut typed_text) ;
285+ return ( typed_text, first_failure) ;
286+ }
287+ }
288+ }
289+ first_failure = flush_pending ( & mut pending, & mut typed_text) ;
290+ if first_failure. is_some ( ) {
291+ // 一旦类型链路出错(如 Secure Input 启用),后续 delta 全部丢弃,但仍
292+ // 把 mpsc drain 完,避免发送端阻塞。
293+ while rx. recv ( ) . is_ok ( ) { }
294+ break ;
295+ }
296+ }
297+ if first_failure. is_none ( ) {
298+ first_failure = flush_pending ( & mut pending, & mut typed_text) ;
299+ }
300+ ( typed_text, first_failure)
301+ }
302+
286303fn flush_streaming_insert_buffer ( pending : & mut String , typed_text : & mut String ) -> Option < String > {
304+ flush_streaming_insert_buffer_with (
305+ pending,
306+ typed_text,
307+ crate :: unicode_keystroke:: type_unicode_chunk,
308+ )
309+ }
310+
311+ fn flush_streaming_insert_buffer_with < F > (
312+ pending : & mut String ,
313+ typed_text : & mut String ,
314+ mut type_chunk : F ,
315+ ) -> Option < String >
316+ where
317+ F : FnMut ( & str ) -> Result < usize , crate :: unicode_keystroke:: TypeError > ,
318+ {
287319 if pending. is_empty ( ) {
288320 return None ;
289321 }
290322 let delta = std:: mem:: take ( pending) ;
291323 let delta_chars = delta. chars ( ) . count ( ) ;
292- match crate :: unicode_keystroke :: type_unicode_chunk ( & delta) {
324+ match type_chunk ( & delta) {
293325 Ok ( typed_chars) => {
294326 let appended = append_typed_prefix ( typed_text, & delta, typed_chars) ;
295327 if appended < delta_chars {
@@ -362,9 +394,7 @@ fn streaming_insert_eligible(
362394 mode : PolishMode ,
363395 raw_uses_llm : bool ,
364396) -> bool {
365- streaming_insert_enabled
366- && !translation_active
367- && ( mode != PolishMode :: Raw || raw_uses_llm)
397+ streaming_insert_enabled && !translation_active && ( mode != PolishMode :: Raw || raw_uses_llm)
368398}
369399
370400fn default_done_message ( status : InsertStatus , polish_failed : bool ) -> Option < String > {
@@ -1728,8 +1758,8 @@ fn append_typed_prefix(target: &mut String, delta: &str, typed_chars: usize) ->
17281758#[ cfg( test) ]
17291759mod tests {
17301760 use super :: {
1731- append_typed_prefix, default_done_message, dictation_error_code ,
1732- finalize_polished_text, streaming_insert_eligible,
1761+ append_typed_prefix, default_done_message, drain_streaming_insert_deltas_with ,
1762+ finalize_polished_text, flush_streaming_insert_buffer_with , streaming_insert_eligible,
17331763 } ;
17341764 use crate :: types:: { ChineseScriptPreference , CorrectionRule , InsertStatus , PolishMode } ;
17351765
@@ -1836,4 +1866,61 @@ mod tests {
18361866 Some ( "润色失败,已插入原文" . to_string( ) )
18371867 ) ;
18381868 }
1869+
1870+ #[ test]
1871+ fn streaming_insert_batches_queued_deltas_before_flush ( ) {
1872+ let ( tx, rx) = std:: sync:: mpsc:: channel ( ) ;
1873+ tx. send ( "你" . to_string ( ) ) . unwrap ( ) ;
1874+ tx. send ( "好" . to_string ( ) ) . unwrap ( ) ;
1875+ tx. send ( "🙂" . to_string ( ) ) . unwrap ( ) ;
1876+ drop ( tx) ;
1877+
1878+ let mut flushed = Vec :: new ( ) ;
1879+ let ( typed, failure) = drain_streaming_insert_deltas_with (
1880+ rx,
1881+ std:: time:: Duration :: from_millis ( 50 ) ,
1882+ |pending, typed_text| {
1883+ flushed. push ( pending. clone ( ) ) ;
1884+ typed_text. push_str ( pending) ;
1885+ pending. clear ( ) ;
1886+ None
1887+ } ,
1888+ ) ;
1889+
1890+ assert_eq ! ( flushed, vec![ "你好🙂" . to_string( ) ] ) ;
1891+ assert_eq ! ( typed, "你好🙂" ) ;
1892+ assert_eq ! ( failure, None ) ;
1893+ }
1894+
1895+ #[ test]
1896+ fn flush_streaming_insert_buffer_keeps_partial_unicode_prefix ( ) {
1897+ let mut pending = "a你🙂b" . to_string ( ) ;
1898+ let mut typed = String :: new ( ) ;
1899+
1900+ let failure = flush_streaming_insert_buffer_with ( & mut pending, & mut typed, |_| {
1901+ Err ( crate :: unicode_keystroke:: TypeError :: Partial {
1902+ typed_chars : 3 ,
1903+ source : Box :: new ( platform_type_error ( ) ) ,
1904+ } )
1905+ } ) ;
1906+
1907+ assert_eq ! ( typed, "a你🙂" ) ;
1908+ assert ! ( pending. is_empty( ) ) ;
1909+ assert ! ( failure. is_some( ) ) ;
1910+ }
1911+
1912+ #[ cfg( target_os = "macos" ) ]
1913+ fn platform_type_error ( ) -> crate :: unicode_keystroke:: TypeError {
1914+ crate :: unicode_keystroke:: TypeError :: EventAllocFailed
1915+ }
1916+
1917+ #[ cfg( target_os = "windows" ) ]
1918+ fn platform_type_error ( ) -> crate :: unicode_keystroke:: TypeError {
1919+ crate :: unicode_keystroke:: TypeError :: SendInputFailed ( "fail" . into ( ) )
1920+ }
1921+
1922+ #[ cfg( target_os = "linux" ) ]
1923+ fn platform_type_error ( ) -> crate :: unicode_keystroke:: TypeError {
1924+ crate :: unicode_keystroke:: TypeError :: EnigoText ( "fail" . into ( ) )
1925+ }
18391926}
0 commit comments