18
18
//! and parsing responses
19
19
//!
20
20
21
- use std:: { error, io} ;
22
21
use std:: collections:: HashMap ;
23
22
use std:: sync:: { Arc , Mutex } ;
23
+ use std:: { error, io} ;
24
24
25
- use serde;
26
25
use base64;
27
26
use http;
27
+ use serde;
28
28
use serde_json;
29
29
30
30
use super :: { Request , Response } ;
31
- use util :: HashableValue ;
32
- use error :: Error ;
31
+ use crate :: error :: Error ;
32
+ use crate :: util :: HashableValue ;
33
33
34
34
/// An interface for an HTTP roundtripper that handles HTTP requests.
35
35
pub trait HttpRoundTripper {
@@ -38,11 +38,27 @@ pub trait HttpRoundTripper {
38
38
/// The type for errors generated by the roundtripper.
39
39
type Err : error:: Error ;
40
40
41
- /// Make an HTTP request. In practice only POST request will be made.
41
+ /// Make a synchronous HTTP request. In practice only POST request will be made.
42
+ #[ cfg( not( feature = "async" ) ) ]
42
43
fn request (
43
44
& self ,
44
- http:: Request < & [ u8 ] > ,
45
+ _request : http:: Request < & [ u8 ] > ,
45
46
) -> Result < http:: Response < Self :: ResponseBody > , Self :: Err > ;
47
+
48
+ /// Make an asynchronous HTTP request. In practice only POST request will be made.
49
+ #[ cfg( feature = "async" ) ]
50
+ fn request < ' life > (
51
+ & ' life self ,
52
+ _request : http:: Request < & ' life [ u8 ] > ,
53
+ ) -> std:: pin:: Pin <
54
+ Box <
55
+ dyn std:: future:: Future < Output = Result < http:: Response < Self :: ResponseBody > , Self :: Err > >
56
+ + Send
57
+ + ' life ,
58
+ > ,
59
+ >
60
+ where
61
+ Self : Sync + ' life ;
46
62
}
47
63
48
64
/// A handle to a remote JSONRPC server
@@ -54,7 +70,21 @@ pub struct Client<R: HttpRoundTripper> {
54
70
nonce : Arc < Mutex < u64 > > ,
55
71
}
56
72
57
- impl < Rt : HttpRoundTripper + ' static > Client < Rt > {
73
+ #[ cfg( not( feature = "async" ) ) ]
74
+ macro_rules! maybe_async_fn {
75
+ ( $( $tokens: tt) * ) => {
76
+ $( $tokens) *
77
+ } ;
78
+ }
79
+
80
+ #[ cfg( feature = "async" ) ]
81
+ macro_rules! maybe_async_fn {
82
+ ( $( #[ $( $meta: meta) * ] ) * $vis: vis $ident: ident $( $tokens: tt) * ) => {
83
+ $( #[ $( $meta) * ] ) * $vis async $ident $( $tokens) *
84
+ } ;
85
+ }
86
+
87
+ impl < Rt : HttpRoundTripper + ' static + Sync > Client < Rt > {
58
88
/// Creates a new client
59
89
pub fn new (
60
90
roundtripper : Rt ,
@@ -74,104 +104,139 @@ impl<Rt: HttpRoundTripper + 'static> Client<Rt> {
74
104
}
75
105
}
76
106
77
- /// Make a request and deserialize the response
78
- pub fn do_rpc < T : for < ' a > serde:: de:: Deserialize < ' a > > (
79
- & self ,
80
- rpc_name : & str ,
81
- args : & [ serde_json:: value:: Value ] ,
82
- ) -> Result < T , Error > {
83
- let request = self . build_request ( rpc_name, args) ;
84
- let response = self . send_request ( & request) ?;
107
+ maybe_async_fn ! {
108
+ /// Make a request and deserialize the response
109
+ pub fn do_rpc<T : for <' a> serde:: de:: Deserialize <' a>>(
110
+ & self ,
111
+ rpc_name: & str ,
112
+ args: & [ serde_json:: value:: Value ] ,
113
+ ) -> Result <T , Error > {
114
+ let request = self . build_request( rpc_name, args) ;
115
+
116
+ #[ cfg( not( feature = "async" ) ) ]
117
+ let response = self . send_request( & request) ?;
85
118
86
- Ok ( response. into_result ( ) ?)
119
+ #[ cfg( feature = "async" ) ]
120
+ let response = self . send_request( & request) . await ?;
121
+
122
+ Ok ( response. into_result( ) ?)
123
+ }
87
124
}
88
125
89
- /// The actual send logic used by both [send_request] and [send_batch].
90
- fn send_raw < B , R > ( & self , body : & B ) -> Result < R , Error >
91
- where
92
- B : serde:: ser:: Serialize ,
93
- R : for < ' de > serde:: de:: Deserialize < ' de > ,
94
- {
95
- // Build request
96
- let request_raw = serde_json:: to_vec ( body) ?;
97
-
98
- // Send request
99
- let mut request_builder = http:: Request :: post ( & self . url ) ;
100
- request_builder. header ( "Content-Type" , "application/json-rpc" ) ;
101
-
102
- // Set Authorization header
103
- if let Some ( ref user) = self . user {
104
- let mut auth = user. clone ( ) ;
105
- auth. push ( ':' ) ;
106
- if let Some ( ref pass) = self . pass {
107
- auth. push_str ( & pass[ ..] ) ;
126
+ maybe_async_fn ! {
127
+ /// The actual send logic used by both [send_request] and [send_batch].
128
+ fn send_raw<B , R >( & self , body: & B ) -> Result <R , Error >
129
+ where
130
+ B : serde:: ser:: Serialize ,
131
+ R : for <' de> serde:: de:: Deserialize <' de>,
132
+ {
133
+ // Build request
134
+ let request_raw = serde_json:: to_vec( body) ?;
135
+
136
+ // Send request
137
+ let mut request_builder = http:: Request :: post( & self . url) ;
138
+ request_builder. header( "Content-Type" , "application/json-rpc" ) ;
139
+
140
+ // Set Authorization header
141
+ if let Some ( ref user) = self . user {
142
+ let mut auth = user. clone( ) ;
143
+ auth. push( ':' ) ;
144
+ if let Some ( ref pass) = self . pass {
145
+ auth. push_str( & pass[ ..] ) ;
146
+ }
147
+ let value = format!( "Basic {}" , & base64:: encode( auth. as_bytes( ) ) ) ;
148
+ request_builder. header( "Authorization" , value) ;
108
149
}
109
- let value = format ! ( "Basic {}" , & base64:: encode( auth. as_bytes( ) ) ) ;
110
- request_builder. header ( "Authorization" , value) ;
111
- }
112
150
113
- // Errors only on invalid header or builder reuse.
114
- let http_request = request_builder. body ( & request_raw[ ..] ) . unwrap ( ) ;
151
+ // Errors only on invalid header or builder reuse.
152
+ let http_request = request_builder. body( & request_raw[ ..] ) . unwrap( ) ;
115
153
116
- let http_response =
117
- self . roundtripper . request ( http_request) . map_err ( |e| Error :: Http ( Box :: new ( e) ) ) ?;
154
+ #[ cfg( not( feature = "async" ) ) ]
155
+ let http_response = self
156
+ . roundtripper
157
+ . request( http_request)
158
+ . map_err( |e| Error :: Http ( Box :: new( e) ) ) ?;
118
159
119
- // nb we ignore stream.status since we expect the body
120
- // to contain information about any error
121
- Ok ( serde_json:: from_reader ( http_response. into_body ( ) ) ?)
122
- }
160
+ #[ cfg( feature = "async" ) ]
161
+ let http_response = self
162
+ . roundtripper
163
+ . request( http_request) . await
164
+ . map_err( |e| Error :: Http ( Box :: new( e) ) ) ?;
123
165
124
- /// Sends a request to a client
125
- pub fn send_request ( & self , request : & Request ) -> Result < Response , Error > {
126
- let response: Response = self . send_raw ( & request) ?;
127
- if response. jsonrpc != None && response. jsonrpc != Some ( From :: from ( "2.0" ) ) {
128
- return Err ( Error :: VersionMismatch ) ;
129
- }
130
- if response. id != request. id {
131
- return Err ( Error :: NonceMismatch ) ;
166
+
167
+ // nb we ignore stream.status since we expect the body
168
+ // to contain information about any error
169
+ Ok ( serde_json:: from_reader( http_response. into_body( ) ) ?)
132
170
}
133
- Ok ( response)
134
171
}
135
172
136
- /// Sends a batch of requests to the client. The return vector holds the response
137
- /// for the request at the corresponding index. If no response was provided, it's [None].
138
- ///
139
- /// Note that the requests need to have valid IDs, so it is advised to create the requests
140
- /// with [build_request].
141
- pub fn send_batch ( & self , requests : & [ Request ] ) -> Result < Vec < Option < Response > > , Error > {
142
- if requests. len ( ) < 1 {
143
- return Err ( Error :: EmptyBatch ) ;
144
- }
173
+ maybe_async_fn ! {
174
+ /// Sends a request to a client
175
+ pub fn send_request<' a, ' b>( & self , request: & Request <' a, ' b>) -> Result <Response , Error > {
176
+ #[ cfg( not( feature = "async" ) ) ]
177
+ let response: Response = self . send_raw( & request) ?;
145
178
146
- // If the request body is invalid JSON, the response is a single response object.
147
- // We ignore this case since we are confident we are producing valid JSON.
148
- let responses: Vec < Response > = self . send_raw ( & requests) ?;
149
- if responses. len ( ) > requests. len ( ) {
150
- return Err ( Error :: WrongBatchResponseSize ) ;
179
+ #[ cfg( feature = "async" ) ]
180
+ let response: Response = self . send_raw( & request) . await ?;
181
+
182
+ if response. jsonrpc != None && response. jsonrpc != Some ( From :: from( "2.0" ) ) {
183
+ return Err ( Error :: VersionMismatch ) ;
184
+ }
185
+ if response. id != request. id {
186
+ return Err ( Error :: NonceMismatch ) ;
187
+ }
188
+ Ok ( response)
151
189
}
190
+ }
152
191
153
- // To prevent having to clone responses, we first copy all the IDs so we can reference
154
- // them easily. IDs can only be of JSON type String or Number (or Null), so cloning
155
- // should be inexpensive and require no allocations as Numbers are more common .
156
- let ids : Vec < serde_json :: Value > = responses . iter ( ) . map ( |r| r . id . clone ( ) ) . collect ( ) ;
157
- // First index responses by ID and catch duplicate IDs.
158
- let mut resp_by_id = HashMap :: new ( ) ;
159
- for ( id , resp ) in ids . iter ( ) . zip ( responses . into_iter ( ) ) {
160
- if let Some ( dup ) = resp_by_id . insert ( HashableValue ( & id ) , resp ) {
161
- return Err ( Error :: BatchDuplicateResponseId ( dup . id ) ) ;
192
+ maybe_async_fn ! {
193
+ /// Sends a batch of requests to the client. The return vector holds the response
194
+ /// for the request at the corresponding index. If no response was provided, it's [None] .
195
+ ///
196
+ /// Note that the requests need to have valid IDs, so it is advised to create the requests
197
+ /// with [build_request].
198
+ pub fn send_batch< ' a , ' b> ( & self , requests : & [ Request < ' a , ' b> ] ) -> Result < Vec < Option < Response >> , Error > {
199
+ if requests . len ( ) < 1 {
200
+ return Err ( Error :: EmptyBatch ) ;
162
201
}
163
- }
164
- // Match responses to the requests.
165
- let results =
166
- requests. into_iter ( ) . map ( |r| resp_by_id. remove ( & HashableValue ( & r. id ) ) ) . collect ( ) ;
167
-
168
- // Since we're also just producing the first duplicate ID, we can also just produce the
169
- // first incorrect ID in case there are multiple.
170
- if let Some ( incorrect) = resp_by_id. into_iter ( ) . nth ( 0 ) {
171
- return Err ( Error :: WrongBatchResponseId ( incorrect. 1 . id ) ) ;
172
- }
173
202
174
- Ok ( results)
203
+ // If the request body is invalid JSON, the response is a single response object.
204
+ // We ignore this case since we are confident we are producing valid JSON.
205
+ #[ cfg( not( feature = "async" ) ) ]
206
+ let responses: Vec <Response > = self . send_raw( & requests) ?;
207
+
208
+ #[ cfg( feature = "async" ) ]
209
+ let responses: Vec <Response > = self . send_raw( & requests) . await ?;
210
+
211
+ if responses. len( ) > requests. len( ) {
212
+ return Err ( Error :: WrongBatchResponseSize ) ;
213
+ }
214
+
215
+ // To prevent having to clone responses, we first copy all the IDs so we can reference
216
+ // them easily. IDs can only be of JSON type String or Number (or Null), so cloning
217
+ // should be inexpensive and require no allocations as Numbers are more common.
218
+ let ids: Vec <serde_json:: Value > = responses. iter( ) . map( |r| r. id. clone( ) ) . collect( ) ;
219
+ // First index responses by ID and catch duplicate IDs.
220
+ let mut resp_by_id = HashMap :: new( ) ;
221
+ for ( id, resp) in ids. iter( ) . zip( responses. into_iter( ) ) {
222
+ if let Some ( dup) = resp_by_id. insert( HashableValue ( & id) , resp) {
223
+ return Err ( Error :: BatchDuplicateResponseId ( dup. id) ) ;
224
+ }
225
+ }
226
+ // Match responses to the requests.
227
+ let results = requests
228
+ . into_iter( )
229
+ . map( |r| resp_by_id. remove( & HashableValue ( & r. id) ) )
230
+ . collect( ) ;
231
+
232
+ // Since we're also just producing the first duplicate ID, we can also just produce the
233
+ // first incorrect ID in case there are multiple.
234
+ if let Some ( incorrect) = resp_by_id. into_iter( ) . nth( 0 ) {
235
+ return Err ( Error :: WrongBatchResponseId ( incorrect. 1 . id) ) ;
236
+ }
237
+
238
+ Ok ( results)
239
+ }
175
240
}
176
241
177
242
/// Builds a request
@@ -206,12 +271,30 @@ mod tests {
206
271
type ResponseBody = io:: Empty ;
207
272
type Err = io:: Error ;
208
273
274
+ #[ cfg( not( feature = "async" ) ) ]
209
275
fn request (
210
276
& self ,
211
277
_: http:: Request < & [ u8 ] > ,
212
278
) -> Result < http:: Response < Self :: ResponseBody > , Self :: Err > {
213
279
Err ( io:: ErrorKind :: Other . into ( ) )
214
280
}
281
+
282
+ #[ cfg( feature = "async" ) ]
283
+ fn request < ' life > (
284
+ & ' life self ,
285
+ request : http:: Request < & [ u8 ] > ,
286
+ ) -> std:: pin:: Pin <
287
+ Box <
288
+ dyn std:: future:: Future <
289
+ Output = Result < http:: Response < Self :: ResponseBody > , Self :: Err > ,
290
+ > + Send
291
+ + ' life ,
292
+ > ,
293
+ >
294
+ where
295
+ Self : Sync + ' life {
296
+ Box :: pin ( async { Err ( io:: ErrorKind :: Other . into ( ) ) } )
297
+ }
215
298
}
216
299
217
300
#[ test]
0 commit comments