45
45
import java .util .concurrent .TimeUnit ;
46
46
47
47
import javax .annotation .CheckForNull ;
48
+ import javax .annotation .Nullable ;
48
49
49
50
/**
50
51
* {@link EventHandler} implementation that queues events and has a separate pool of threads responsible
@@ -67,7 +68,8 @@ public class AsyncEventHandler implements EventHandler, AutoCloseable {
67
68
private static final Logger logger = LoggerFactory .getLogger (AsyncEventHandler .class );
68
69
private static final ProjectConfigResponseHandler EVENT_RESPONSE_HANDLER = new ProjectConfigResponseHandler ();
69
70
70
- private final OptimizelyHttpClient httpClient ;
71
+ @ VisibleForTesting
72
+ public final OptimizelyHttpClient httpClient ;
71
73
private final ExecutorService workerExecutor ;
72
74
73
75
private final long closeTimeout ;
@@ -110,7 +112,15 @@ public AsyncEventHandler(int queueCapacity,
110
112
int validateAfter ,
111
113
long closeTimeout ,
112
114
TimeUnit closeTimeoutUnit ) {
113
- this (queueCapacity , numWorkers , maxConnections , connectionsPerRoute , validateAfter , closeTimeout , closeTimeoutUnit , null );
115
+ this (queueCapacity ,
116
+ numWorkers ,
117
+ maxConnections ,
118
+ connectionsPerRoute ,
119
+ validateAfter ,
120
+ closeTimeout ,
121
+ closeTimeoutUnit ,
122
+ null ,
123
+ null );
114
124
}
115
125
116
126
public AsyncEventHandler (int queueCapacity ,
@@ -120,24 +130,27 @@ public AsyncEventHandler(int queueCapacity,
120
130
int validateAfter ,
121
131
long closeTimeout ,
122
132
TimeUnit closeTimeoutUnit ,
133
+ @ Nullable OptimizelyHttpClient httpClient ,
123
134
@ Nullable ThreadFactory threadFactory ) {
135
+ if (httpClient != null ) {
136
+ this .httpClient = httpClient ;
137
+ } else {
138
+ maxConnections = validateInput ("maxConnections" , maxConnections , DEFAULT_MAX_CONNECTIONS );
139
+ connectionsPerRoute = validateInput ("connectionsPerRoute" , connectionsPerRoute , DEFAULT_MAX_PER_ROUTE );
140
+ validateAfter = validateInput ("validateAfter" , validateAfter , DEFAULT_VALIDATE_AFTER_INACTIVITY );
141
+ this .httpClient = OptimizelyHttpClient .builder ()
142
+ .withMaxTotalConnections (maxConnections )
143
+ .withMaxPerRoute (connectionsPerRoute )
144
+ .withValidateAfterInactivity (validateAfter )
145
+ // infrequent event discards observed. staled connections force-closed after a long idle time.
146
+ .withEvictIdleConnections (1L , TimeUnit .MINUTES )
147
+ .build ();
148
+ }
124
149
125
150
queueCapacity = validateInput ("queueCapacity" , queueCapacity , DEFAULT_QUEUE_CAPACITY );
126
151
numWorkers = validateInput ("numWorkers" , numWorkers , DEFAULT_NUM_WORKERS );
127
- maxConnections = validateInput ("maxConnections" , maxConnections , DEFAULT_MAX_CONNECTIONS );
128
- connectionsPerRoute = validateInput ("connectionsPerRoute" , connectionsPerRoute , DEFAULT_MAX_PER_ROUTE );
129
- validateAfter = validateInput ("validateAfter" , validateAfter , DEFAULT_VALIDATE_AFTER_INACTIVITY );
130
-
131
- this .httpClient = OptimizelyHttpClient .builder ()
132
- .withMaxTotalConnections (maxConnections )
133
- .withMaxPerRoute (connectionsPerRoute )
134
- .withValidateAfterInactivity (validateAfter )
135
- // infrequent event discards observed. staled connections force-closed after a long idle time.
136
- .withEvictIdleConnections (1L , TimeUnit .MINUTES )
137
- .build ();
138
152
139
153
NamedThreadFactory namedThreadFactory = new NamedThreadFactory ("optimizely-event-dispatcher-thread-%s" , true , threadFactory );
140
-
141
154
this .workerExecutor = new ThreadPoolExecutor (numWorkers , numWorkers ,
142
155
0L , TimeUnit .MILLISECONDS ,
143
156
new ArrayBlockingQueue <>(queueCapacity ),
@@ -302,6 +315,7 @@ public static class Builder {
302
315
int validateAfterInactivity = PropertyUtils .getInteger (CONFIG_VALIDATE_AFTER_INACTIVITY , DEFAULT_VALIDATE_AFTER_INACTIVITY );
303
316
private long closeTimeout = Long .MAX_VALUE ;
304
317
private TimeUnit closeTimeoutUnit = TimeUnit .MILLISECONDS ;
318
+ private OptimizelyHttpClient httpClient ;
305
319
306
320
public Builder withQueueCapacity (int queueCapacity ) {
307
321
if (queueCapacity <= 0 ) {
@@ -344,6 +358,11 @@ public Builder withCloseTimeout(long closeTimeout, TimeUnit unit) {
344
358
return this ;
345
359
}
346
360
361
+ public Builder withOptimizelyHttpClient (OptimizelyHttpClient httpClient ) {
362
+ this .httpClient = httpClient ;
363
+ return this ;
364
+ }
365
+
347
366
public AsyncEventHandler build () {
348
367
return new AsyncEventHandler (
349
368
queueCapacity ,
@@ -352,7 +371,9 @@ public AsyncEventHandler build() {
352
371
maxPerRoute ,
353
372
validateAfterInactivity ,
354
373
closeTimeout ,
355
- closeTimeoutUnit
374
+ closeTimeoutUnit ,
375
+ httpClient ,
376
+ null
356
377
);
357
378
}
358
379
}
0 commit comments