16
16
17
17
use std:: net:: SocketAddr ;
18
18
use std:: sync:: Arc ;
19
+ use std:: time:: Duration ;
19
20
20
21
use anyhow:: anyhow;
21
22
use anyhow:: Result ;
22
23
use clap:: Parser ;
24
+ use futures:: stream:: StreamExt ;
25
+ use rpc:: rpc:: StreamBasicClientInstruction ;
26
+ use rpc:: rpc:: StreamBasicClientTestResult ;
27
+ use rpc:: rpc:: StreamChunkTimeoutClientInstruction ;
28
+ use rpc:: rpc:: StreamChunkTimeoutClientTestResult ;
29
+ use rpc:: rpc:: StreamCreditTimeoutClientInstruction ;
30
+ use rpc:: rpc:: StreamCreditTimeoutClientTestResult ;
31
+ use rpc:: rpc:: StreamDeclaredExceptionClientInstruction ;
32
+ use rpc:: rpc:: StreamDeclaredExceptionClientTestResult ;
33
+ use rpc:: rpc:: StreamInitialDeclaredExceptionClientInstruction ;
34
+ use rpc:: rpc:: StreamInitialDeclaredExceptionClientTestResult ;
35
+ use rpc:: rpc:: StreamInitialResponseClientInstruction ;
36
+ use rpc:: rpc:: StreamInitialResponseClientTestResult ;
37
+ use rpc:: rpc:: StreamInitialTimeoutClientInstruction ;
38
+ use rpc:: rpc:: StreamInitialTimeoutClientTestResult ;
39
+ use rpc:: rpc:: StreamInitialUndeclaredExceptionClientInstruction ;
40
+ use rpc:: rpc:: StreamInitialUndeclaredExceptionClientTestResult ;
41
+ use rpc:: rpc:: StreamUndeclaredExceptionClientInstruction ;
42
+ use rpc:: rpc:: StreamUndeclaredExceptionClientTestResult ;
43
+ use rpc_clients:: rpc:: errors:: r_p_c_conformance_service:: StreamDeclaredExceptionStreamError ;
44
+ use rpc_clients:: rpc:: errors:: r_p_c_conformance_service:: StreamInitialDeclaredExceptionError ;
23
45
use rpc_clients:: rpc:: make_RPCConformanceService;
24
46
use rpc_clients:: rpc:: RPCConformanceService ;
25
47
use tracing_glog:: Glog ;
@@ -50,6 +72,10 @@ async fn get_client(
50
72
fb : fbinit:: FacebookInit ,
51
73
port : u16 ,
52
74
) -> Result < Arc < dyn RPCConformanceService + Send + Sync + ' static > > {
75
+ // TODO: Why aren't we using the SRclient builder here? It has more functionality it seems
76
+ // over the bareclient (specifically the ability to configure the RPC options without specying
77
+ // a serialization protocol)
78
+
53
79
thriftclient:: ThriftChannelBuilder :: from_sock_addr (
54
80
fb,
55
81
SocketAddr :: new ( std:: net:: Ipv6Addr :: LOCALHOST . into ( ) , port) ,
@@ -102,6 +128,7 @@ use rpc_clients::rpc::BasicInteraction;
102
128
103
129
async fn test ( client : & dyn RPCConformanceService ) -> Result < ( ) > {
104
130
use ClientInstruction :: * ;
131
+ #[ allow( unused) ]
105
132
match & client. getTestCase ( ) . await ?. clientInstruction {
106
133
requestResponseBasic( i) => request_response_basic ( client, i) . await ,
107
134
requestResponseDeclaredException( i) => request_response_declared_exn ( client, i) . await ,
@@ -112,16 +139,21 @@ async fn test(client: &dyn RPCConformanceService) -> Result<()> {
112
139
interactionFactoryFunction( i) => interaction_factory_function ( client, i) . await ,
113
140
interactionPersistsState( i) => interaction_persists_state ( client, i) . await ,
114
141
interactionTermination( i) => interaction_termination ( client, i) . await ,
115
- streamBasic( _) => not_implemented ( ) ,
116
- streamChunkTimeout( _) => not_implemented ( ) ,
117
- streamInitialResponse( _) => not_implemented ( ) ,
118
- streamCreditTimeout( _) => not_implemented ( ) ,
119
- streamDeclaredException( _) => not_implemented ( ) ,
120
- streamUndeclaredException( _) => not_implemented ( ) ,
121
- streamInitialDeclaredException( _) => not_implemented ( ) ,
122
- streamInitialUndeclaredException( _) => not_implemented ( ) ,
123
- streamInitialTimeout( _) => not_implemented ( ) ,
124
- i => Err ( anyhow ! ( format!( "not supported: {:?}" , i) ) ) ,
142
+ streamBasic( i) => stream_basic ( client, i) . await ,
143
+ streamChunkTimeout( i) => stream_chunk_timeout ( client, i) . await ,
144
+ streamInitialResponse( i) => stream_initial_response ( client, i) . await ,
145
+ streamCreditTimeout( i) => stream_credit_timeout ( client, i) . await ,
146
+ streamDeclaredException( i) => stream_declared_exception ( client, i) . await ,
147
+ streamUndeclaredException( i) => stream_undeclared_exception ( client, i) . await ,
148
+ streamInitialDeclaredException( i) => stream_initial_declared_exception ( client, i) . await ,
149
+ streamInitialUndeclaredException( i) => stream_initial_undeclared_exception ( client, i) . await ,
150
+ streamInitialTimeout( i) => stream_initial_timeout ( client, i) . await ,
151
+ sinkBasic( i) => not_implemented ( ) ,
152
+ sinkChunkTimeout( i) => not_implemented ( ) ,
153
+ sinkInitialResponse( i) => not_implemented ( ) ,
154
+ sinkDeclaredException( i) => not_implemented ( ) ,
155
+ sinkUndeclaredException( i) => not_implemented ( ) ,
156
+ UnknownField ( i) => Err ( anyhow ! ( format!( "not supported: {:?}" , i) ) ) ,
125
157
}
126
158
}
127
159
@@ -309,3 +341,203 @@ async fn interaction_termination(
309
341
client. sendTestResult ( & test_result) . await ?;
310
342
Ok ( ( ) )
311
343
}
344
+
345
+ async fn stream_basic (
346
+ client : & dyn RPCConformanceService ,
347
+ instr : & StreamBasicClientInstruction ,
348
+ ) -> Result < ( ) > {
349
+ // TODO: configure rpc_options buffer?
350
+ let mut stream = client. streamBasic ( & instr. request ) . await ?;
351
+ let mut test_result = StreamBasicClientTestResult {
352
+ streamPayloads : vec ! [ ] ,
353
+ ..Default :: default ( )
354
+ } ;
355
+ while let Some ( response) = stream. next ( ) . await {
356
+ test_result. streamPayloads . push ( response?) ;
357
+ }
358
+ client
359
+ . sendTestResult ( & ClientTestResult :: streamBasic ( test_result) )
360
+ . await ?;
361
+ Ok ( ( ) )
362
+ }
363
+
364
+ async fn stream_chunk_timeout (
365
+ client : & dyn RPCConformanceService ,
366
+ instr : & StreamChunkTimeoutClientInstruction ,
367
+ ) -> Result < ( ) > {
368
+ let mut stream = client. streamChunkTimeout ( & instr. request ) . await ?;
369
+ let mut test_result = StreamChunkTimeoutClientTestResult {
370
+ chunkTimeoutException : false ,
371
+ streamPayloads : vec ! [ ] ,
372
+ ..Default :: default ( )
373
+ } ;
374
+
375
+ while let Some ( response) = stream. next ( ) . await {
376
+ match response {
377
+ Ok ( r) => {
378
+ test_result. streamPayloads . push ( r) ;
379
+ }
380
+ Err ( exn) => {
381
+ test_result. chunkTimeoutException |=
382
+ exn. to_string ( ) . contains ( "TTransportException" ) ;
383
+ }
384
+ }
385
+ }
386
+
387
+ client
388
+ . sendTestResult ( & ClientTestResult :: streamChunkTimeout ( test_result) )
389
+ . await ?;
390
+ Ok ( ( ) )
391
+ }
392
+
393
+ async fn stream_initial_response (
394
+ client : & dyn RPCConformanceService ,
395
+ instr : & StreamInitialResponseClientInstruction ,
396
+ ) -> Result < ( ) > {
397
+ let ( initial_response, mut stream) = client. streamInitialResponse ( & instr. request ) . await ?;
398
+ let mut test_result = StreamInitialResponseClientTestResult {
399
+ initialResponse : initial_response,
400
+ streamPayloads : vec ! [ ] ,
401
+ ..Default :: default ( )
402
+ } ;
403
+
404
+ while let Some ( response) = stream. next ( ) . await {
405
+ test_result. streamPayloads . push ( response?) ;
406
+ }
407
+
408
+ client
409
+ . sendTestResult ( & ClientTestResult :: streamInitialResponse ( test_result) )
410
+ . await ?;
411
+
412
+ Ok ( ( ) )
413
+ }
414
+
415
+ async fn stream_credit_timeout (
416
+ client : & dyn RPCConformanceService ,
417
+ instr : & StreamCreditTimeoutClientInstruction ,
418
+ ) -> Result < ( ) > {
419
+ let mut stream = client. streamCreditTimeout ( & instr. request ) . await ?;
420
+ let mut test_result = StreamCreditTimeoutClientTestResult {
421
+ creditTimeoutException : false ,
422
+ ..Default :: default ( )
423
+ } ;
424
+
425
+ while let Some ( response) = stream. next ( ) . await {
426
+ match response {
427
+ Err ( exn) if exn. to_string ( ) . contains ( "TTransportException" ) => {
428
+ test_result. creditTimeoutException = true ;
429
+ }
430
+ _ => {
431
+ // Sleep longer than the stream expiration time so that the server
432
+ // will run out of credit and throw a credit timeout exception
433
+ tokio:: time:: sleep ( Duration :: from_millis ( instr. creditTimeoutMs as u64 ) ) . await ;
434
+ }
435
+ }
436
+ }
437
+
438
+ client
439
+ . sendTestResult ( & ClientTestResult :: streamCreditTimeout ( test_result) )
440
+ . await ?;
441
+
442
+ Ok ( ( ) )
443
+ }
444
+
445
+ async fn stream_declared_exception (
446
+ client : & dyn RPCConformanceService ,
447
+ instr : & StreamDeclaredExceptionClientInstruction ,
448
+ ) -> Result < ( ) > {
449
+ let mut stream = client. streamDeclaredException ( & instr. request ) . await ?;
450
+ let mut test_result = StreamDeclaredExceptionClientTestResult {
451
+ userException : None ,
452
+ ..Default :: default ( )
453
+ } ;
454
+ while let Some ( result) = stream. next ( ) . await {
455
+ if let Err ( StreamDeclaredExceptionStreamError :: e( user_error) ) = result {
456
+ test_result. userException = Some ( Box :: new ( user_error) ) ;
457
+ } ;
458
+ }
459
+ client
460
+ . sendTestResult ( & ClientTestResult :: streamDeclaredException ( test_result) )
461
+ . await ?;
462
+ Ok ( ( ) )
463
+ }
464
+
465
+ async fn stream_undeclared_exception (
466
+ client : & dyn RPCConformanceService ,
467
+ instr : & StreamUndeclaredExceptionClientInstruction ,
468
+ ) -> Result < ( ) > {
469
+ let mut stream = client. streamUndeclaredException ( & instr. request ) . await ?;
470
+ let mut test_result = StreamUndeclaredExceptionClientTestResult {
471
+ exceptionMessage : "" . to_string ( ) ,
472
+ ..Default :: default ( )
473
+ } ;
474
+ while let Some ( result) = stream. next ( ) . await {
475
+ if let Err ( err) = result {
476
+ test_result. exceptionMessage = err. to_string ( ) ;
477
+ } ;
478
+ }
479
+ client
480
+ . sendTestResult ( & ClientTestResult :: streamUndeclaredException ( test_result) )
481
+ . await ?;
482
+ Ok ( ( ) )
483
+ }
484
+
485
+ async fn stream_initial_declared_exception (
486
+ client : & dyn RPCConformanceService ,
487
+ instr : & StreamInitialDeclaredExceptionClientInstruction ,
488
+ ) -> Result < ( ) > {
489
+ let result = client. streamInitialDeclaredException ( & instr. request ) . await ;
490
+ let mut test_result = StreamInitialDeclaredExceptionClientTestResult {
491
+ userException : None ,
492
+ ..Default :: default ( )
493
+ } ;
494
+ if let Err ( StreamInitialDeclaredExceptionError :: e( user_error) ) = result {
495
+ test_result. userException = Some ( Box :: new ( user_error) ) ;
496
+ } ;
497
+ client
498
+ . sendTestResult ( & ClientTestResult :: streamInitialDeclaredException (
499
+ test_result,
500
+ ) )
501
+ . await ?;
502
+ Ok ( ( ) )
503
+ }
504
+
505
+ async fn stream_initial_undeclared_exception (
506
+ client : & dyn RPCConformanceService ,
507
+ instr : & StreamInitialUndeclaredExceptionClientInstruction ,
508
+ ) -> Result < ( ) > {
509
+ let result = client
510
+ . streamInitialUndeclaredException ( & instr. request )
511
+ . await ;
512
+ let mut test_result = StreamInitialUndeclaredExceptionClientTestResult {
513
+ exceptionMessage : "" . to_string ( ) ,
514
+ ..Default :: default ( )
515
+ } ;
516
+ if let Err ( err) = result {
517
+ test_result. exceptionMessage = err. to_string ( ) ;
518
+ } ;
519
+ client
520
+ . sendTestResult ( & ClientTestResult :: streamInitialUndeclaredException (
521
+ test_result,
522
+ ) )
523
+ . await ?;
524
+ Ok ( ( ) )
525
+ }
526
+
527
+ async fn stream_initial_timeout (
528
+ client : & dyn RPCConformanceService ,
529
+ instr : & StreamInitialTimeoutClientInstruction ,
530
+ ) -> Result < ( ) > {
531
+ let result = client. streamInitialTimeout ( & instr. request ) . await ;
532
+ let mut test_result = StreamInitialTimeoutClientTestResult {
533
+ timeoutException : false ,
534
+ ..Default :: default ( )
535
+ } ;
536
+ if let Err ( _err) = result {
537
+ test_result. timeoutException = true ;
538
+ }
539
+ client
540
+ . sendTestResult ( & ClientTestResult :: streamInitialTimeout ( test_result) )
541
+ . await ?;
542
+ Ok ( ( ) )
543
+ }
0 commit comments