Skip to content

Commit 451acea

Browse files
Release 1.2.1 (#22)
1 parent 2fd30a0 commit 451acea

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1465
-1795
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## [1.2.1] - 2021-07-17
8+
### Changed
9+
- Modified Confluent archive to follow new standards
10+
- Stopped using reactive Lettuce
11+
- Upgraded various dependencies
12+
713
## [1.2.0] - 2021-02-13
814
### Added
915
- Handle Redis cluster topology changes on the fly

lombok.config

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
config.stopBubbling = true
22
lombok.addLombokGeneratedAnnotation = true
3+
lombok.builder.className = Builder
4+
lombok.log.fieldname = LOG

pom.xml

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@
55

66
<groupId>io.github.jaredpetersen</groupId>
77
<artifactId>kafka-connect-redis</artifactId>
8-
<version>1.2.0</version>
8+
<version>1.2.1</version>
99
<packaging>jar</packaging>
1010

11-
<name>kafka-connect-redis</name>
12-
<description>Kafka Connect source and sink connector for Redis</description>
11+
<name>Kafka Redis Connector (Sink and Source)</name>
12+
<description>Kafka sink and source connector for Redis</description>
1313
<url>https://github.com/jaredpetersen/kafka-connect-redis</url>
1414

1515
<licenses>
@@ -62,69 +62,57 @@
6262
<dependency>
6363
<groupId>org.apache.kafka</groupId>
6464
<artifactId>connect-api</artifactId>
65-
<version>2.7.0</version>
65+
<version>2.8.0</version>
6666
<scope>provided</scope>
6767
</dependency>
6868

6969
<dependency>
7070
<groupId>io.lettuce</groupId>
7171
<artifactId>lettuce-core</artifactId>
72-
<version>6.0.2.RELEASE</version>
73-
</dependency>
74-
75-
<dependency>
76-
<groupId>io.projectreactor</groupId>
77-
<artifactId>reactor-core</artifactId>
78-
<version>3.4.2</version>
72+
<version>6.1.4.RELEASE</version>
7973
</dependency>
8074

8175
<dependency>
8276
<groupId>org.projectlombok</groupId>
8377
<artifactId>lombok</artifactId>
84-
<version>1.18.18</version>
78+
<version>1.18.20</version>
8579
<scope>provided</scope>
8680
</dependency>
8781

8882
<dependency>
8983
<groupId>org.slf4j</groupId>
90-
<artifactId>slf4j-nop</artifactId>
91-
<version>1.7.30</version>
84+
<artifactId>slf4j-simple</artifactId>
85+
<version>1.7.31</version>
9286
<scope>test</scope>
9387
</dependency>
9488

9589
<dependency>
9690
<groupId>org.junit.jupiter</groupId>
9791
<artifactId>junit-jupiter</artifactId>
98-
<version>5.7.1</version>
92+
<version>5.7.2</version>
9993
<scope>test</scope>
10094
</dependency>
10195

10296
<dependency>
10397
<groupId>org.mockito</groupId>
10498
<artifactId>mockito-core</artifactId>
105-
<version>3.7.7</version>
99+
<version>3.11.2</version>
106100
<scope>test</scope>
107101
</dependency>
108102

109103
<dependency>
110104
<groupId>org.testcontainers</groupId>
111105
<artifactId>testcontainers</artifactId>
112-
<version>1.15.2</version>
106+
<version>1.15.3</version>
113107
<scope>test</scope>
114108
</dependency>
115109
<dependency>
116110
<groupId>org.testcontainers</groupId>
117111
<artifactId>junit-jupiter</artifactId>
118-
<version>1.15.2</version>
112+
<version>1.15.3</version>
119113
<scope>test</scope>
120114
</dependency>
121115

122-
<dependency>
123-
<groupId>io.projectreactor</groupId>
124-
<artifactId>reactor-test</artifactId>
125-
<version>3.4.2</version>
126-
<scope>test</scope>
127-
</dependency>
128116
</dependencies>
129117

130118
<build>
@@ -218,8 +206,9 @@
218206
<goal>kafka-connect</goal>
219207
</goals>
220208
<configuration>
221-
<title>Kafka Connect Redis</title>
222-
<documentationUrl>${project.url}/blob/main/README.md</documentationUrl>
209+
<name>redis-connector</name>
210+
<title>Redis Connector (Sink and Source)</title>
211+
<documentationUrl>${project.url}</documentationUrl>
223212
<ownerLogo>docs/logos/jaredpetersen-logo.png</ownerLogo>
224213
<ownerUsername>jaredpetersen</ownerUsername>
225214
<ownerType>user</ownerType>
@@ -228,6 +217,17 @@
228217
<supportProviderName>Open Source Community</supportProviderName>
229218
<supportSummary>Support provided through community involvement.</supportSummary>
230219
<supportUrl>${project.issueManagement.url}</supportUrl>
220+
<fileSets>
221+
<fileSet>
222+
<directory>${project.basedir}</directory>
223+
<outputDirectory>doc</outputDirectory>
224+
<includes>
225+
<include>README.md</include>
226+
<include>LICENSE.md</include>
227+
<include>CHANGELOG.md</include>
228+
</includes>
229+
</fileSet>
230+
</fileSets>
231231
<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
232232
<componentTypes>
233233
<componentType>source</componentType>
@@ -252,7 +252,7 @@
252252
<descriptorRefs>
253253
<descriptorRef>jar-with-dependencies</descriptorRef>
254254
</descriptorRefs>
255-
<finalName>${project.name}-${project.version}</finalName>
255+
<finalName>${project.artifactId}-${project.version}</finalName>
256256
<appendAssemblyId>false</appendAssemblyId>
257257
</configuration>
258258
<executions>

src/main/java/io/github/jaredpetersen/kafkaconnectredis/sink/RedisSinkTask.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,31 @@
33
import io.github.jaredpetersen.kafkaconnectredis.sink.config.RedisSinkConfig;
44
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.RecordConverter;
55
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.Writer;
6+
import io.github.jaredpetersen.kafkaconnectredis.sink.writer.record.RedisCommand;
67
import io.github.jaredpetersen.kafkaconnectredis.util.VersionUtil;
78
import io.lettuce.core.RedisClient;
89
import io.lettuce.core.api.StatefulRedisConnection;
9-
import io.lettuce.core.api.reactive.RedisReactiveCommands;
10+
import io.lettuce.core.api.sync.RedisCommands;
1011
import io.lettuce.core.cluster.ClusterClientOptions;
1112
import io.lettuce.core.cluster.ClusterTopologyRefreshOptions;
1213
import io.lettuce.core.cluster.RedisClusterClient;
1314
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
14-
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
15+
import io.lettuce.core.cluster.api.sync.RedisClusterCommands;
1516
import java.util.Collection;
1617
import java.util.Map;
18+
import lombok.extern.slf4j.Slf4j;
1719
import org.apache.kafka.common.config.ConfigException;
1820
import org.apache.kafka.connect.errors.ConnectException;
1921
import org.apache.kafka.connect.sink.SinkRecord;
2022
import org.apache.kafka.connect.sink.SinkTask;
21-
import org.slf4j.Logger;
22-
import org.slf4j.LoggerFactory;
23-
import reactor.core.publisher.Flux;
2423

2524
/**
2625
* Kafka Connect Task for Kafka Connect Redis Sink.
2726
*/
27+
@Slf4j
2828
public class RedisSinkTask extends SinkTask {
29+
private static final RecordConverter RECORD_CONVERTER = new RecordConverter();
30+
2931
private RedisClient redisStandaloneClient;
3032
private StatefulRedisConnection<String, String> redisStandaloneConnection;
3133

@@ -34,10 +36,6 @@ public class RedisSinkTask extends SinkTask {
3436

3537
private Writer writer;
3638

37-
private static final RecordConverter RECORD_CONVERTER = new RecordConverter();
38-
39-
private static final Logger LOG = LoggerFactory.getLogger(RedisSinkTask.class);
40-
4139
@Override
4240
public String version() {
4341
return VersionUtil.getVersion();
@@ -67,14 +65,14 @@ public void start(final Map<String, String> props) {
6765

6866
this.redisClusterConnection = this.redisClusterClient.connect();
6967

70-
final RedisClusterReactiveCommands<String, String> redisClusterCommands = this.redisClusterConnection.reactive();
68+
final RedisClusterCommands<String, String> redisClusterCommands = this.redisClusterConnection.sync();
7169
this.writer = new Writer(redisClusterCommands);
7270
}
7371
else {
7472
this.redisStandaloneClient = RedisClient.create(config.getRedisUri());
7573
this.redisStandaloneConnection = this.redisStandaloneClient.connect();
7674

77-
final RedisReactiveCommands<String, String> redisStandaloneCommands = this.redisStandaloneConnection.reactive();
75+
final RedisCommands<String, String> redisStandaloneCommands = this.redisStandaloneConnection.sync();
7876
this.writer = new Writer(redisStandaloneCommands);
7977
}
8078
}
@@ -88,14 +86,27 @@ public void put(final Collection<SinkRecord> records) {
8886
LOG.info("writing {} record(s) to redis", records.size());
8987
LOG.debug("records: {}", records);
9088

91-
Flux
92-
.fromIterable(records)
93-
.flatMapSequential(RECORD_CONVERTER::convert)
94-
.onErrorMap(error -> new ConnectException("failed to convert record", error))
95-
.flatMapSequential(redisCommand -> this.writer.write(redisCommand))
96-
.onErrorMap(error -> new ConnectException("failed to write record", error))
97-
.then()
98-
.block();
89+
for (SinkRecord record : records) {
90+
put(record);
91+
}
92+
}
93+
94+
private void put(SinkRecord record) {
95+
final RedisCommand redisCommand;
96+
97+
try {
98+
redisCommand = RECORD_CONVERTER.convert(record);
99+
}
100+
catch (Exception exception) {
101+
throw new ConnectException("failed to convert record", exception);
102+
}
103+
104+
try {
105+
writer.write(redisCommand);
106+
}
107+
catch (Exception exception) {
108+
throw new ConnectException("failed to write record", exception);
109+
}
99110
}
100111

101112
@Override

0 commit comments

Comments
 (0)