Skip to content

Commit 54ebe54

Browse files
Crowaydavsclaus
authored andcommitted
Iggy Client pooling + Header Stream and Topic overrides
1 parent 37337b1 commit 54ebe54

File tree

10 files changed

+219
-54
lines changed

10 files changed

+219
-54
lines changed

components/camel-iggy/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,11 @@
4141
<artifactId>iggy-java-sdk</artifactId>
4242
<version>${iggy-version}</version>
4343
</dependency>
44+
<dependency>
45+
<groupId>org.apache.commons</groupId>
46+
<artifactId>commons-pool2</artifactId>
47+
<version>${commons-pool2-version}</version>
48+
</dependency>
4449

4550
<!-- testing -->
4651
<dependency>

components/camel-iggy/src/main/java/org/apache/camel/component/iggy/IggyConstants.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ public final class IggyConstants {
2525
public static final String MESSAGE_CHECKSUM = "CamelIggyMessageChecksum";
2626
public static final String MESSAGE_LENGTH = "CamelIggyMessageLength";
2727
public static final String MESSAGE_SIZE = "CamelIggyMessageSize";
28+
public static final String TOPIC_OVERRIDE = "CamelIggyTopicOverride";
29+
public static final String STREAM_OVERRIDE = "CamelIggyStreamOverride";
2830

2931
private IggyConstants() {
3032
// Constants class

components/camel-iggy/src/main/java/org/apache/camel/component/iggy/IggyConsumer.java

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.TimeUnit;
2323

2424
import org.apache.camel.Processor;
25+
import org.apache.camel.component.iggy.client.IggyClientConnectionPool;
2526
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
2627
import org.apache.camel.support.DefaultConsumer;
2728
import org.apache.iggy.client.blocking.IggyBaseClient;
@@ -32,7 +33,7 @@ public class IggyConsumer extends DefaultConsumer {
3233

3334
private static final Logger LOG = LoggerFactory.getLogger(IggyConsumer.class);
3435
private final IggyEndpoint endpoint;
35-
private IggyBaseClient client;
36+
private IggyClientConnectionPool iggyClientConnectionPool;
3637
private ExecutorService executor;
3738
private final List<IggyFetchRecords> tasks = new ArrayList<>();
3839

@@ -47,17 +48,29 @@ protected void doStart() throws Exception {
4748
LOG.info("Starting Iggy consumer for stream {} and topic {}", endpoint.getConfiguration().getStreamId(),
4849
endpoint.getTopicName());
4950

50-
client = endpoint.getIggyClient();
51+
iggyClientConnectionPool = new IggyClientConnectionPool(
52+
endpoint.getConfiguration().getHost(),
53+
endpoint.getConfiguration().getPort(),
54+
endpoint.getConfiguration().getUsername(),
55+
endpoint.getConfiguration().getPassword(),
56+
endpoint.getConfiguration().getClientTransport());
5157

58+
IggyBaseClient client = iggyClientConnectionPool.borrowObject();
5259
endpoint.initializeTopic(client);
5360
endpoint.initializeConsumerGroup(client);
61+
iggyClientConnectionPool.returnClient(client);
5462

5563
executor = endpoint.createExecutor();
5664
BridgeExceptionHandlerToErrorHandler bridge = new BridgeExceptionHandlerToErrorHandler(this);
5765

5866
// For now, we'll just have one consumer task. This can be extended later if Iggy supports partitioned consumption.
5967
// TODO Handle streams, once tehy will be implemented in the java client
60-
IggyFetchRecords task = new IggyFetchRecords(this, endpoint, endpoint.getConfiguration(), client, bridge);
68+
IggyFetchRecords task = new IggyFetchRecords(
69+
this,
70+
endpoint,
71+
endpoint.getConfiguration(),
72+
iggyClientConnectionPool,
73+
bridge);
6174
executor.submit(task);
6275
tasks.add(task);
6376
}

components/camel-iggy/src/main/java/org/apache/camel/component/iggy/IggyEndpoint.java

Lines changed: 14 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,12 @@
2525
import org.apache.camel.Consumer;
2626
import org.apache.camel.Processor;
2727
import org.apache.camel.Producer;
28-
import org.apache.camel.RuntimeCamelException;
2928
import org.apache.camel.spi.Metadata;
3029
import org.apache.camel.spi.UriEndpoint;
3130
import org.apache.camel.spi.UriParam;
3231
import org.apache.camel.spi.UriPath;
3332
import org.apache.camel.support.DefaultEndpoint;
3433
import org.apache.iggy.client.blocking.IggyBaseClient;
35-
import org.apache.iggy.client.blocking.IggyClientBuilder;
36-
import org.apache.iggy.client.blocking.http.IggyHttpClient;
37-
import org.apache.iggy.client.blocking.tcp.IggyTcpClient;
3834
import org.apache.iggy.consumergroup.ConsumerGroupDetails;
3935
import org.apache.iggy.identifier.ConsumerId;
4036
import org.apache.iggy.identifier.StreamId;
@@ -74,9 +70,11 @@ public Consumer createConsumer(Processor processor) throws Exception {
7470
return consumer;
7571
}
7672

77-
public void initializeTopic(IggyBaseClient client) {
73+
public void initializeTopic(IggyBaseClient client, String topicName, String streamName) throws Exception {
7874
IggyConfiguration iggyConfiguration = getConfiguration();
7975
Objects.requireNonNull(iggyConfiguration.getStreamName(), "The stream name is required");
76+
String topic = topicName != null ? topicName : getTopicName();
77+
String stream = streamName != null ? streamName : iggyConfiguration.getStreamName();
8078

8179
if (iggyConfiguration.isAutoCreateStream()) {
8280
if (iggyConfiguration.getStreamId() != null) {
@@ -91,9 +89,9 @@ public void initializeTopic(IggyBaseClient client) {
9189
return streamDetails;
9290
});
9391
} else {
94-
client.streams().getStream(StreamId.of(iggyConfiguration.getStreamName())).orElseGet(() -> {
95-
LOG.debug("Creating stream with name {}", iggyConfiguration.getStreamName());
96-
StreamDetails streamDetails = client.streams().createStream(empty(), iggyConfiguration.getStreamName());
92+
client.streams().getStream(StreamId.of(stream)).orElseGet(() -> {
93+
LOG.debug("Creating stream with name {}", stream);
94+
StreamDetails streamDetails = client.streams().createStream(empty(), stream);
9795
LOG.debug("Stream created with details: {}", streamDetails.toString());
9896

9997
return streamDetails;
@@ -104,17 +102,17 @@ public void initializeTopic(IggyBaseClient client) {
104102

105103
if (iggyConfiguration.isAutoCreateTopic()) {
106104
client.topics()
107-
.getTopic(StreamId.of(iggyConfiguration.getStreamName()), TopicId.of(getTopicName()))
105+
.getTopic(StreamId.of(stream), TopicId.of(topic))
108106
.orElseGet(() -> {
109-
LOG.debug("Creating topic with name {}", getTopicName());
110-
TopicDetails topicDetails = client.topics().createTopic(StreamId.of(iggyConfiguration.getStreamName()),
107+
LOG.debug("Creating topic with name {}", topic);
108+
TopicDetails topicDetails = client.topics().createTopic(StreamId.of(stream),
111109
empty(),
112110
iggyConfiguration.getPartitionsCount(),
113111
iggyConfiguration.getCompressionAlgorithm(),
114112
BigInteger.valueOf(iggyConfiguration.getMessageExpiry()),
115113
BigInteger.valueOf(iggyConfiguration.getMaxTopicSize()),
116114
Optional.ofNullable(iggyConfiguration.getReplicationFactor()),
117-
getTopicName());
115+
topic);
118116

119117
LOG.debug("Topic created or retrieved with details: {}", topicDetails.toString());
120118

@@ -123,6 +121,10 @@ public void initializeTopic(IggyBaseClient client) {
123121
}
124122
}
125123

124+
public void initializeTopic(IggyBaseClient client) throws Exception {
125+
initializeTopic(client, null, null);
126+
}
127+
126128
public void initializeConsumerGroup(IggyBaseClient client) {
127129
Objects.requireNonNull(getConfiguration().getConsumerGroupName(), "Consumer group name is required");
128130

@@ -156,29 +158,6 @@ public void initializeConsumerGroup(IggyBaseClient client) {
156158
getConfiguration().getConsumerGroupName());
157159
}
158160

159-
private IggyBaseClient createClient() {
160-
return switch (getConfiguration().getClientTransport()) {
161-
case "HTTP" ->
162-
new IggyHttpClient(String.format("http://%s:%d", getConfiguration().getHost(), getConfiguration().getPort()));
163-
case "TCP" -> new IggyTcpClient(getConfiguration().getHost(), getConfiguration().getPort());
164-
default -> throw new RuntimeCamelException("Only HTTP or TCP transports are supported");
165-
};
166-
}
167-
168-
public IggyBaseClient getIggyClient() {
169-
IggyBaseClient iggyBaseClient = new IggyClientBuilder().withBaseClient(createClient()).build().getBaseClient();
170-
171-
loginIggyClient(iggyBaseClient);
172-
173-
return iggyBaseClient;
174-
}
175-
176-
private void loginIggyClient(IggyBaseClient client) {
177-
if (getConfiguration().getUsername() != null) {
178-
client.users().login(getConfiguration().getUsername(), getConfiguration().getPassword());
179-
}
180-
}
181-
182161
public ExecutorService createExecutor() {
183162
return getCamelContext().getExecutorServiceManager().newFixedThreadPool(this,
184163
"IggyConsumer[" + getTopicName() + "]", configuration.getConsumersCount());

components/camel-iggy/src/main/java/org/apache/camel/component/iggy/IggyFetchRecords.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.stream.Collectors;
2424

2525
import org.apache.camel.Exchange;
26+
import org.apache.camel.component.iggy.client.IggyClientConnectionPool;
2627
import org.apache.camel.support.BridgeExceptionHandlerToErrorHandler;
2728
import org.apache.iggy.client.blocking.IggyBaseClient;
2829
import org.apache.iggy.consumergroup.Consumer;
@@ -41,19 +42,20 @@ public class IggyFetchRecords implements Runnable {
4142
private final IggyConsumer iggyConsumer;
4243
private final IggyEndpoint endpoint;
4344
private final IggyConfiguration configuration;
44-
private final IggyBaseClient client;
45+
private final IggyClientConnectionPool iggyClientConnectionPool;
4546
private final BridgeExceptionHandlerToErrorHandler bridgeExceptionHandlerToErrorHandler;
4647

4748
private volatile boolean running;
4849
private final AtomicBoolean polling = new AtomicBoolean(false);
4950
private BigInteger offset;
5051

5152
public IggyFetchRecords(IggyConsumer iggyConsumer, IggyEndpoint endpoint, IggyConfiguration configuration,
52-
IggyBaseClient client, BridgeExceptionHandlerToErrorHandler bridgeExceptionHandlerToErrorHandler) {
53+
IggyClientConnectionPool iggyClientConnectionPool,
54+
BridgeExceptionHandlerToErrorHandler bridgeExceptionHandlerToErrorHandler) {
5355
this.iggyConsumer = iggyConsumer;
5456
this.endpoint = endpoint;
5557
this.configuration = configuration;
56-
this.client = client;
58+
this.iggyClientConnectionPool = iggyClientConnectionPool;
5759
this.bridgeExceptionHandlerToErrorHandler = bridgeExceptionHandlerToErrorHandler;
5860
this.offset = configuration.getStartingOffset() == null ? null : BigInteger.valueOf(configuration.getStartingOffset());
5961
}
@@ -91,6 +93,7 @@ private void pollMessages() {
9193
ConsumerId consumerId = ConsumerId.of(configuration.getConsumerGroupName());
9294

9395
PolledMessages polledMessages;
96+
IggyBaseClient client = iggyClientConnectionPool.borrowObject();
9497
if (configuration.isAutoCommit()) {
9598
polledMessages = client.messages()
9699
.pollMessages(streamId,
@@ -114,6 +117,8 @@ private void pollMessages() {
114117
offset = offset.add(BigInteger.valueOf(polledMessages.count()));
115118
}
116119

120+
iggyClientConnectionPool.returnClient(client);
121+
117122
LOG.debug("Fetched {} messages from partition {}, current offset {}",
118123
polledMessages.count(),
119124
polledMessages.partitionId(),

components/camel-iggy/src/main/java/org/apache/camel/component/iggy/IggyProducer.java

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818

1919
import java.util.Collections;
2020
import java.util.List;
21+
import java.util.Optional;
2122
import java.util.stream.Collectors;
2223

2324
import org.apache.camel.AsyncCallback;
2425
import org.apache.camel.Exchange;
2526
import org.apache.camel.RuntimeCamelException;
27+
import org.apache.camel.component.iggy.client.IggyClientConnectionPool;
2628
import org.apache.camel.support.DefaultAsyncProducer;
2729
import org.apache.iggy.client.blocking.IggyBaseClient;
2830
import org.apache.iggy.identifier.StreamId;
@@ -35,7 +37,7 @@ public class IggyProducer extends DefaultAsyncProducer {
3537
private static final Logger LOG = LoggerFactory.getLogger(IggyProducer.class);
3638

3739
private final IggyEndpoint endpoint;
38-
private IggyBaseClient client;
40+
private IggyClientConnectionPool iggyClientConnectionPool;
3941

4042
public IggyProducer(IggyEndpoint endpoint) {
4143
super(endpoint);
@@ -45,14 +47,20 @@ public IggyProducer(IggyEndpoint endpoint) {
4547
@Override
4648
protected void doStart() throws Exception {
4749
super.doStart();
48-
client = endpoint.getIggyClient();
50+
iggyClientConnectionPool = new IggyClientConnectionPool(
51+
endpoint.getConfiguration().getHost(),
52+
endpoint.getConfiguration().getPort(),
53+
endpoint.getConfiguration().getUsername(),
54+
endpoint.getConfiguration().getPassword(),
55+
endpoint.getConfiguration().getClientTransport());
4956

57+
IggyBaseClient client = iggyClientConnectionPool.borrowObject();
5058
endpoint.initializeTopic(client);
59+
iggyClientConnectionPool.returnClient(client);
5160
}
5261

5362
@Override
5463
public boolean process(Exchange exchange, AsyncCallback callback) {
55-
// TODO overrides from headers and reinitialize topic if topic/stream was updated
5664
IggyConfiguration iggyConfiguration = endpoint.getConfiguration();
5765

5866
try {
@@ -88,11 +96,28 @@ public boolean process(Exchange exchange, AsyncCallback callback) {
8896
.unwrap();
8997
*/
9098

99+
IggyBaseClient client = iggyClientConnectionPool.borrowObject();
100+
101+
Optional<String> topicOverride
102+
= Optional.ofNullable(exchange.getMessage().getHeader(IggyConstants.TOPIC_OVERRIDE, String.class));
103+
Optional<String> streamOverride
104+
= Optional.ofNullable(exchange.getMessage().getHeader(IggyConstants.STREAM_OVERRIDE, String.class));
105+
106+
String topic = topicOverride.orElse(endpoint.getTopicName());
107+
String stream = streamOverride.orElse(iggyConfiguration.getStreamName());
108+
if (topicOverride.isPresent() || streamOverride.isPresent()) {
109+
endpoint.initializeTopic(client,
110+
topic,
111+
stream);
112+
}
113+
91114
client.messages().sendMessages(
92-
StreamId.of(iggyConfiguration.getStreamName()),
93-
TopicId.of(endpoint.getTopicName()),
115+
StreamId.of(stream),
116+
TopicId.of(topic),
94117
iggyConfiguration.getPartitioning(),
95118
messages);
119+
120+
iggyClientConnectionPool.returnClient(client);
96121
} catch (Exception e) {
97122
exchange.setException(e);
98123
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.component.iggy.client;
18+
19+
import org.apache.commons.pool2.impl.GenericObjectPool;
20+
import org.apache.iggy.client.blocking.IggyBaseClient;
21+
22+
public class IggyClientConnectionPool {
23+
private final GenericObjectPool<IggyBaseClient> pool;
24+
25+
public IggyClientConnectionPool(String host, int port, String username, String password, String transport) {
26+
IggyClientFactory factory = new IggyClientFactory(host, port, username, password, transport);
27+
this.pool = new GenericObjectPool<>(factory);
28+
}
29+
30+
public IggyBaseClient borrowObject() throws Exception {
31+
return pool.borrowObject();
32+
}
33+
34+
public void returnClient(IggyBaseClient client) {
35+
pool.returnObject(client);
36+
}
37+
38+
public int getNumActive() {
39+
return pool.getNumActive();
40+
}
41+
42+
public int getNumIdle() {
43+
return pool.getNumIdle();
44+
}
45+
}

0 commit comments

Comments
 (0)