@@ -181,9 +181,78 @@ public Response send(final Request request){
181
181
return responses .get (0 );
182
182
}
183
183
184
+ @ Override
185
+ public Future <Response > sendAsync (final Request request ) {
186
+ final List <Request > requests = new ArrayList <Request >();
187
+ requests .add (request );
188
+
189
+ final List <Future <Response >> responses = sendAsync (requests );
190
+ return responses .get (0 );
191
+ }
192
+
184
193
@ Override
185
194
public List <Response > send (final List <Request > requests ){
186
- final List <FutureCountDownLatch <Response >> resultList = new ArrayList <FutureCountDownLatch <Response >>();
195
+ final List <Future <Response >> resultList = sendAsync (requests );
196
+
197
+ try {
198
+ final List <Response > responses = new ArrayList <Response >();
199
+
200
+ for (final Future <Response > resultLatches : resultList ){
201
+ final Response response = resultLatches .get ();
202
+ responses .add (response );
203
+ messages .remove (response .getUuid ());
204
+ }
205
+
206
+ return responses ;
207
+ }
208
+ catch (final Exception e ){
209
+ throw new JSR356SwaggerSocketException ("Error Receiving Swagger Socket Response(s)" , e );
210
+ }
211
+ }
212
+
213
+ @ Override
214
+ public List <Future <Response >> sendAsync (final List <Request > requests ){
215
+ return sendRequests (requests , Response .class );
216
+ }
217
+
218
+ @ Override
219
+ public <T > T send (final Request request , final Class <T > resultClass ) {
220
+ final List <Request > requests = new ArrayList <Request >();
221
+ requests .add (request );
222
+ return send (requests , resultClass ).get (0 );
223
+ }
224
+
225
+ @ Override
226
+ public <T > Future <T > sendAsync (final Request request , final Class <T > resultClass ) {
227
+ final List <Request > requests = new ArrayList <Request >();
228
+ requests .add (request );
229
+ return sendAsync (requests , resultClass ).get (0 );
230
+ }
231
+
232
+ @ Override
233
+ public <T > List <T > send (final List <Request > requests , final Class <T > resultClass ) {
234
+ serializeRequestBodies (requests );
235
+
236
+ final List <Response > responses = send (requests );
237
+
238
+ final List <T > resultValues = new ArrayList <T >();
239
+
240
+ for (final Response response : responses ) {
241
+ final T resultValue = deserializeResponse (response , resultClass );
242
+ resultValues .add (resultValue );
243
+ }
244
+
245
+ return resultValues ;
246
+ }
247
+
248
+ @ Override
249
+ public <T > List <Future <T >> sendAsync (final List <Request > requests , final Class <T > resultClass ) {
250
+ serializeRequestBodies (requests );
251
+ return sendRequests (requests , resultClass );
252
+ }
253
+
254
+ private <T > List <Future <T >> sendRequests (final List <Request > requests , final Class <T > resultClass ){
255
+ final List <Future <T >> resultList = new ArrayList <Future <T >>();
187
256
188
257
try {
189
258
reentrantLock .lock ();
@@ -199,7 +268,16 @@ public List<Response> send(final List<Request> requests){
199
268
for (final Request thisRequest : requestMessage .getRequests ()){
200
269
final String uuid = UUID .randomUUID ().toString ();
201
270
thisRequest .setUuid (uuid );
202
- final FutureCountDownLatch <Response > result = new FutureCountDownLatch <Response >();
271
+
272
+ final FutureCountDownLatch <T > result ;
273
+
274
+ if (resultClass == Response .class ){
275
+ result = new FutureCountDownLatch <T >();
276
+ }
277
+ else {
278
+ result = new FutureCountDownLatchWithAutoDeserialize <T >(resultClass );
279
+ }
280
+
203
281
messages .put (uuid , result );
204
282
resultList .add (result );
205
283
}
@@ -212,62 +290,31 @@ public List<Response> send(final List<Request> requests){
212
290
reentrantLock .unlock ();
213
291
}
214
292
215
- try {
216
- final List <Response > responses = new ArrayList <Response >();
217
-
218
- for (final FutureCountDownLatch <Response > resultLatches : resultList ){
219
- final Response response = resultLatches .get ();
220
- responses .add (response );
221
- messages .remove (response .getUuid ());
222
- }
223
-
224
- return responses ;
225
- }
226
- catch (final Exception e ){
227
- throw new JSR356SwaggerSocketException ("Error Receiving Swagger Socket Response(s)" , e );
228
- }
293
+ return resultList ;
229
294
}
230
295
231
- @ Override
232
- public <T > List <T > send (final List <Request > requests , final Class <T > resultClass ) {
233
-
234
- for (final Request request : requests ){
296
+ private void serializeRequestBodies (final List <Request > requests ){
297
+ for (final Request request : requests ) {
235
298
final Object messageBody = request .getMessageBody ();
236
299
237
- if (!(messageBody instanceof String )) {
300
+ if (!(messageBody instanceof String )) {
238
301
try {
239
302
request .setMessageBody (objectMapper .writeValueAsString (messageBody ));
240
303
} catch (JsonProcessingException e ) {
241
304
throw new JSR356SwaggerSocketException ("Error Serializing Swagger Socket Request(s)" , e );
242
305
}
243
306
}
244
307
}
308
+ }
245
309
246
- final List <Response > responses = send (requests );
247
-
248
- final List <T > resultValues = new ArrayList <T >();
249
-
250
- for (final Response response : responses ) {
251
- final String responseBody = (String ) response .getMessageBody ();
252
- final T resultValue ;
253
-
254
- try {
255
- resultValue = objectMapper .readValue (responseBody , resultClass );
256
- } catch (IOException e ) {
257
- throw new JSR356SwaggerSocketException ("Error Deserializing Swagger Socket Response(s)" , e );
258
- }
310
+ private <T > T deserializeResponse (final Response response , final Class <T > resultClass ){
311
+ final String responseBody = (String ) response .getMessageBody ();
259
312
260
- resultValues .add (resultValue );
313
+ try {
314
+ return objectMapper .readValue (responseBody , resultClass );
315
+ } catch (final IOException e ) {
316
+ throw new JSR356SwaggerSocketException ("Error Deserializing Swagger Socket Response(s)" , e );
261
317
}
262
-
263
- return resultValues ;
264
- }
265
-
266
- @ Override
267
- public <T > T send (final Request request , final Class <T > resultClass ) {
268
- final List <Request > requests = new ArrayList <Request >();
269
- requests .add (request );
270
- return send (requests , resultClass ).get (0 );
271
318
}
272
319
273
320
@ Override
@@ -315,7 +362,13 @@ private void handleResponses(final String responses) throws IOException {
315
362
for (int i = 0 ; i < responseMessageList .size (); i ++){
316
363
final Response thisResponse = responseMessageList .get (i );
317
364
final FutureCountDownLatch <Response > responseLatch = messages .get (thisResponse .getUuid ());
318
- responseLatch .set (thisResponse );
365
+
366
+ if (responseLatch instanceof FutureCountDownLatchWithAutoDeserialize ){
367
+ ((FutureCountDownLatchWithAutoDeserialize )responseLatch ).setResponse (thisResponse );
368
+ }
369
+ else {
370
+ responseLatch .set (thisResponse );
371
+ }
319
372
}
320
373
}
321
374
@@ -332,7 +385,7 @@ private void performHandshake() {
332
385
writeMessage (handshakeJson );
333
386
} catch (final JsonProcessingException e ) {
334
387
close ();
335
- throw new JSR356SwaggerSocketException ("Error Performing Handhsake !" , e );
388
+ throw new JSR356SwaggerSocketException ("Error Performing Handshake !" , e );
336
389
}
337
390
}
338
391
@@ -348,8 +401,8 @@ private boolean writeMessage(final String message) {
348
401
349
402
private class FutureCountDownLatch <T > implements Future <T > {
350
403
351
- private CountDownLatch countDownLatch = new CountDownLatch (1 );
352
- private T result = null ;
404
+ protected CountDownLatch countDownLatch = new CountDownLatch (1 );
405
+ protected T result = null ;
353
406
354
407
@ Override
355
408
public boolean cancel (boolean mayInterruptIfRunning ) {
@@ -374,7 +427,7 @@ public T get() throws InterruptedException, ExecutionException {
374
427
}
375
428
376
429
@ Override
377
- public T get (long timeout , TimeUnit unit ) throws InterruptedException , ExecutionException , TimeoutException {
430
+ public T get (final long timeout , final TimeUnit unit ) throws InterruptedException , ExecutionException , TimeoutException {
378
431
countDownLatch .await (timeout , unit );
379
432
return result ;
380
433
}
@@ -386,4 +439,42 @@ public void set(final T result) {
386
439
387
440
}
388
441
442
+ private class FutureCountDownLatchWithAutoDeserialize <T > extends FutureCountDownLatch <T > {
443
+ private Class <T > targetClass ;
444
+ private JSR356SwaggerSocketException deserializationException ;
445
+
446
+ public FutureCountDownLatchWithAutoDeserialize (final Class <T > targetClass ) {
447
+ this .targetClass = targetClass ;
448
+ }
449
+
450
+ @ Override
451
+ public T get (final long timeout , final TimeUnit unit ) throws InterruptedException , ExecutionException , TimeoutException {
452
+ countDownLatch .await (timeout , unit );
453
+
454
+ // If we had a deserialization exception it will be stored - rethrow it here.
455
+ if (deserializationException == null ){
456
+ return result ;
457
+ }
458
+ else {
459
+ throw deserializationException ;
460
+ }
461
+ }
462
+
463
+ public void setResponse (final Response response ) {
464
+ try {
465
+ this .result = deserializeResponse (response , targetClass );
466
+ }
467
+ catch (final JSR356SwaggerSocketException e ){
468
+ // Since this will be done async, capture a deserialization
469
+ // exception and re-throw it when get() is called to make sure
470
+ // the caller is actually notified instead of it being swallowed
471
+ // by the message handling thread.
472
+ deserializationException = e ;
473
+ }
474
+
475
+ countDownLatch .countDown ();
476
+ }
477
+
478
+ }
479
+
389
480
}
0 commit comments