@@ -190,6 +190,19 @@ impl crate::server::MyService {
190
190
self . check_privilege_in ( request. metadata ( ) , & [ "user" , "guest" ] )
191
191
. await ?;
192
192
let user_id = Self :: get_key_from_metadata ( request. metadata ( ) , "user_id" ) ;
193
+ if request. metadata ( ) . get ( "forwarding-target" ) . is_some ( ) {
194
+ let forwarding_target = request
195
+ . metadata ( )
196
+ . get ( "forwarding-target" )
197
+ . unwrap ( )
198
+ . to_str ( )
199
+ . unwrap ( )
200
+ . to_string ( ) ;
201
+ let task = request. into_inner ( ) ;
202
+ self . forward_inter_core_sync_task ( & user_id, & forwarding_target, & task)
203
+ . await ?;
204
+ return Ok ( Response :: new ( Empty :: default ( ) ) ) ;
205
+ }
193
206
if !self
194
207
. _internal_storage_contains ( & user_id, & format ! ( "tasks:{}" , request. get_ref( ) . task_id) )
195
208
. await ?
@@ -342,14 +355,36 @@ impl crate::server::MyService {
342
355
Err ( e) => return Err ( Status :: internal ( format ! ( "{}" , e) ) ) ,
343
356
}
344
357
} else {
345
- let ( core_addr, guest_jwt) = self . query_user_record ( user_id, target_user_id) . await ?;
346
- let mut client = match self . _grpc_connect ( & core_addr) . await {
347
- Ok ( client) => client,
348
- Err ( e) => return Err ( Status :: internal ( format ! ( "{}" , e) ) ) ,
349
- } ;
350
- client
351
- . inter_core_sync_task ( generate_request ( & guest_jwt, task. clone ( ) ) )
352
- . await ?;
358
+ let ( core_addr, guest_jwt, forwarding_user_id) =
359
+ self . query_user_record ( user_id, target_user_id) . await ?;
360
+ if let Some ( forwarding_user_id) = forwarding_user_id {
361
+ let ( core_addr, guest_jwt, double_forwarding_user_id) =
362
+ self . query_user_record ( user_id, & forwarding_user_id) . await ?;
363
+ if double_forwarding_user_id. is_some ( ) {
364
+ return Err ( Status :: failed_precondition ( format ! (
365
+ "Double forwarding is not allowed (checking user {})." ,
366
+ forwarding_user_id
367
+ ) ) ) ;
368
+ }
369
+ let mut client = match self . _grpc_connect ( & core_addr) . await {
370
+ Ok ( client) => client,
371
+ Err ( e) => return Err ( Status :: internal ( format ! ( "{}" , e) ) ) ,
372
+ } ;
373
+ let mut request = generate_request ( & guest_jwt, task. clone ( ) ) ;
374
+ request. metadata_mut ( ) . insert (
375
+ "forwarding-target" ,
376
+ tonic:: metadata:: MetadataValue :: try_from ( target_user_id) . unwrap ( ) ,
377
+ ) ;
378
+ client. inter_core_sync_task ( request) . await ?;
379
+ } else {
380
+ let mut client = match self . _grpc_connect ( & core_addr) . await {
381
+ Ok ( client) => client,
382
+ Err ( e) => return Err ( Status :: internal ( format ! ( "{}" , e) ) ) ,
383
+ } ;
384
+ client
385
+ . inter_core_sync_task ( generate_request ( & guest_jwt, task. clone ( ) ) )
386
+ . await ?;
387
+ }
353
388
}
354
389
Ok ( ( ) )
355
390
}
@@ -361,7 +396,13 @@ impl crate::server::MyService {
361
396
task : & Task ,
362
397
service : Arc < MyService > ,
363
398
) -> Result < ( ) , Status > {
364
- let ( core_addr, guest_jwt) = self . query_user_record ( user_id, target_user_id) . await ?;
399
+ let ( core_addr, guest_jwt, forwarding_user_id) =
400
+ self . query_user_record ( user_id, target_user_id) . await ?;
401
+ if forwarding_user_id. is_some ( ) {
402
+ return self
403
+ . send_inter_core_sync_task ( user_id, target_user_id, task)
404
+ . await ;
405
+ }
365
406
let mut client = match self . _grpc_connect ( & core_addr) . await {
366
407
Ok ( client) => client,
367
408
Err ( e) => return Err ( Status :: internal ( format ! ( "{}" , e) ) ) ,
@@ -409,6 +450,35 @@ impl crate::server::MyService {
409
450
Ok ( ( ) )
410
451
}
411
452
453
+ async fn forward_inter_core_sync_task (
454
+ & self ,
455
+ user_id : & str ,
456
+ target_user_id : & str ,
457
+ task : & Task ,
458
+ ) -> Result < ( ) , Status > {
459
+ if self
460
+ . inter_core_reverse_senders
461
+ . lock ( )
462
+ . await
463
+ . contains_key ( & ( user_id. to_string ( ) , target_user_id. to_string ( ) ) )
464
+ {
465
+ let inter_core_reverse_senders = self . inter_core_reverse_senders . lock ( ) . await ;
466
+ let tx = inter_core_reverse_senders
467
+ . get ( & ( user_id. to_string ( ) , target_user_id. to_string ( ) ) )
468
+ . unwrap ( ) ;
469
+ match tx. send ( Ok ( task. clone ( ) ) ) . await {
470
+ Ok ( _) => { }
471
+ Err ( e) => return Err ( Status :: internal ( format ! ( "{}" , e) ) ) ,
472
+ }
473
+ Ok ( ( ) )
474
+ } else {
475
+ Err ( Status :: failed_precondition ( format ! (
476
+ "Unable to locate target {}." ,
477
+ target_user_id
478
+ ) ) )
479
+ }
480
+ }
481
+
412
482
async fn remove_task_from_list_in_storage (
413
483
& self ,
414
484
user_id : & str ,
@@ -518,7 +588,7 @@ impl crate::server::MyService {
518
588
& self ,
519
589
user_id : & str ,
520
590
query_user_id : & str ,
521
- ) -> Result < ( String , String ) , Status > {
591
+ ) -> Result < ( String , String , Option < String > ) , Status > {
522
592
let mut counter = 0 ;
523
593
while self
524
594
. _internal_storage_read (
@@ -594,7 +664,17 @@ impl crate::server::MyService {
594
664
)
595
665
. await ?;
596
666
let guest_jwt = String :: from_utf8 ( guest_jwt) . unwrap ( ) ;
597
- Ok ( ( core_addr, guest_jwt) )
667
+ let forwarding_user_id = match self
668
+ . _internal_storage_read (
669
+ user_id,
670
+ & format ! ( "known_users:{}:forwarding_user_id" , & query_user_id) ,
671
+ )
672
+ . await
673
+ {
674
+ Ok ( forwarding_user_id) => Some ( String :: from_utf8 ( forwarding_user_id) . unwrap ( ) ) ,
675
+ Err ( _) => None ,
676
+ } ;
677
+ Ok ( ( core_addr, guest_jwt, forwarding_user_id) )
598
678
}
599
679
600
680
/**
0 commit comments