Skip to content

Commit 921e36c

Browse files
Support sink expire commands (#12)
1 parent 9d84b44 commit 921e36c

File tree

16 files changed

+630
-55
lines changed

16 files changed

+630
-55
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,19 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## [1.0.3] - 2020-11-21
8+
### Added
9+
- Added support for Redis EXPIRE commands
10+
- Added support for Redis EXPIREAT commands
11+
- Added support for Redis PEXPIRE commands
12+
13+
### Changed
14+
- Improved source connector partitioning documentation
15+
16+
### Fixed
17+
- Source connector no longer logs every Redis message at an INFO level
18+
- Added missing configuration property `topic` to the source connector documentation
19+
720
## [1.0.2] - 2020-11-13
821
### Fixed
922
- Fixed POM description to include source capability

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ Kafka Connect Redis Source subscribes to Redis channels/patterns (including [key
1111
For more information, see the [detailed documentation](/docs/connectors/SOURCE.md).
1212

1313
### Sink
14-
Kafka Connect Redis Sink consumes Kafka records in a Redis command format and applies them to Redis. Several write-based commands are supported at this time (`SET`, `SADD`, and `GEOADD`) and more are slated for the near future.
14+
Kafka Connect Redis Sink consumes Kafka records in a Redis command format (`SET`, `GEOADD`, etc.) and applies them to Redis.
1515

1616
For more information, see the [detailed documentation](/docs/connectors/SINK.md).
1717

docs/connectors/SINK.md

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@ Consume messages from Kafka and apply them to Redis in the form of commands.
33

44
The following commands are supported at this time:
55
- [SET](https://redis.io/commands/set)
6+
- [EXPIRE](https://redis.io/commands/expire)
7+
- [EXPIREAT](https://redis.io/commands/expireat)
8+
- [PEXPIRE](https://redis.io/commands/pexpire)
69
- [SADD](https://redis.io/commands/sadd)
710
- [GEOADD](https://redis.io/commands/geoadd)
811

@@ -127,6 +130,126 @@ Keys are ignored.
127130
}
128131
```
129132

133+
#### EXPIRE
134+
##### Avro
135+
```json
136+
{
137+
"namespace": "io.github.jaredpetersen.kafkaconnectredis",
138+
"name": "RedisExpireCommand",
139+
"type": "record",
140+
"fields": [
141+
{
142+
"name": "key",
143+
"type": "string"
144+
},
145+
{
146+
"name": "seconds",
147+
"type": "long"
148+
}
149+
]
150+
}
151+
```
152+
153+
##### Connect JSON
154+
```json
155+
{
156+
"name": "io.github.jaredpetersen.kafkaconnectredis.RedisExpireCommand",
157+
"type": "struct",
158+
"fields": [
159+
{
160+
"field": "key",
161+
"type": "string",
162+
"optional": false
163+
},
164+
{
165+
"field": "seconds",
166+
"type": "int64",
167+
"optional": false
168+
}
169+
]
170+
}
171+
```
172+
173+
#### EXPIREAT
174+
##### Avro
175+
```json
176+
{
177+
"namespace": "io.github.jaredpetersen.kafkaconnectredis",
178+
"name": "RedisExpireatCommand",
179+
"type": "record",
180+
"fields": [
181+
{
182+
"name": "key",
183+
"type": "string"
184+
},
185+
{
186+
"name": "timestamp",
187+
"type": "long"
188+
}
189+
]
190+
}
191+
```
192+
193+
##### Connect JSON
194+
```json
195+
{
196+
"name": "io.github.jaredpetersen.kafkaconnectredis.RedisExpireatCommand",
197+
"type": "struct",
198+
"fields": [
199+
{
200+
"field": "key",
201+
"type": "string",
202+
"optional": false
203+
},
204+
{
205+
"field": "timestamp",
206+
"type": "int64",
207+
"optional": false
208+
}
209+
]
210+
}
211+
```
212+
213+
#### PEXPIRE
214+
##### Avro
215+
```json
216+
{
217+
"namespace": "io.github.jaredpetersen.kafkaconnectredis",
218+
"name": "RedisPexpireCommand",
219+
"type": "record",
220+
"fields": [
221+
{
222+
"name": "key",
223+
"type": "string"
224+
},
225+
{
226+
"name": "milliseconds",
227+
"type": "long"
228+
}
229+
]
230+
}
231+
```
232+
233+
##### Connect JSON
234+
```json
235+
{
236+
"name": "io.github.jaredpetersen.kafkaconnectredis.RedisPexpireCommand",
237+
"type": "struct",
238+
"fields": [
239+
{
240+
"field": "key",
241+
"type": "string",
242+
"optional": false
243+
},
244+
{
245+
"field": "milliseconds",
246+
"type": "int64",
247+
"optional": false
248+
}
249+
]
250+
}
251+
```
252+
130253
#### SADD
131254
##### Avro
132255
```json

docs/connectors/SOURCE.md

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -77,16 +77,19 @@ Subscribes to Redis channels/patterns (including [keyspace notifications](https:
7777
## Partitions
7878
Records are partitioned using the [`DefaultPartitioner`](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/DefaultPartitioner.java) class. This means that the record key is used to determine which partition the record is assigned to.
7979

80-
If you would prefer a different partitioning strategy, you may implement your own [`Partitioner`](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java) and configure the connector to use it via [`partitioner.class`](https://kafka.apache.org/documentation/#partitioner.class). Alternatively, you may also implement a custom [Single Message Transform (SMT)](https://docs.confluent.io/current/connect/transforms/index.html).
80+
In the case of subscribing to Redis keyspace notifications, it may be useful to avoid partitioning the data so that multiple event types can arrive in order as a single event stream. This can be accomplished by configuring the connector to publish to a Kafka topic that only contains a single partition, forcing the DefaultPartitioner to only utilize the single partition.
81+
82+
The plugin can be configured to use an alternative partitioning strategy if desired. Set the configuration property `connector.client.config.override.policy` to value `All` on the Kafka Connect worker (the overall Kafka Connect application that runs plugins). This will allow the override of the internal Kafka producer and consumer configurations. To override the partitioner for an individual connector plugin, add the configuration property `producer.override.partitioner.class` to the connector plugin with a value that points to a class implementing the [Partitioner](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/Partitioner.java) interface, e.g. `org.apache.kafka.clients.producer.internals.DefaultPartitioner`.
8183

8284
## Parallelization
8385
Splitting the workload between multiple tasks via the configuration property `max.tasks` is not supported at this time. Support for this will be added in the future.
8486

8587
## Configuration
8688
### Connector Properties
87-
| Name | Type | Default | Importance | Description |
88-
| --------------------------------- | ------- | ------- | ---------- | ------------------------------------------------------- |
89-
| `redis.uri` | string | | High | Redis connection information provided via a URI string. |
90-
| `redis.cluster.enabled` | boolean | false | High | Target Redis is running as a cluster. |
91-
| `redis.channels` | string | | High | Redis channels to subscribe to separated by commas. |
92-
| `redis.channels.patterns.enabled` | boolean | | High | Redis channels use patterns (PSUBSCRIBE). |
89+
| Name | Type | Default | Importance | Description |
90+
| --------------------------------- | ------- | -------------- | ---------- | ------------------------------------------------------- |
91+
| `topic` | string | `redis.events` | High | Topic to write to. |
92+
| `redis.uri` | string | | High | Redis connection information provided via a URI string. |
93+
| `redis.cluster.enabled` | boolean | false | High | Target Redis is running as a cluster. |
94+
| `redis.channels` | string | | High | Redis channels to subscribe to separated by commas. |
95+
| `redis.channels.patterns.enabled` | boolean | | High | Redis channels use patterns (PSUBSCRIBE). |

docs/demo/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ Now that we have Kubernetes set up locally, we'll need a Docker image that conta
1515

1616
Navigate to `demo/docker/` in this repository and run the following commands **in a separate terminal** to download the plugin and build the image for minikube:
1717
```bash
18-
curl -O https://oss.sonatype.org/service/local/repositories/releases/content/io/github/jaredpetersen/kafka-connect-redis/1.0.2/kafka-connect-redis-1.0.1.jar
18+
curl -O https://oss.sonatype.org/service/local/repositories/releases/content/io/github/jaredpetersen/kafka-connect-redis/1.0.3/kafka-connect-redis-1.0.3.jar
1919
eval $(minikube docker-env)
2020
docker build -t jaredpetersen/kafka-connect-redis:latest .
2121
```

docs/demo/SINK.md

Lines changed: 49 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
Send a request to the Kafka Connect REST API to configure it to use Kafka Connect Redis:
44

55
### Avro
6+
**IMPORTANT:** The Avro demo utilizes multiple topics in order to work around [a bug in the Avro console producer](https://github.com/confluentinc/schema-registry/issues/898). A fix has been merged but Confluent has not published a new Docker image for it yet (6.1.0+). Kafka Connect Redis works with Avro on a single topic; this is just a problem with the console producer provided by Confluent.
7+
68
```bash
79
curl --request POST \
810
--url "$(minikube -n kcr-demo service kafka-connect --url)/connectors" \
@@ -16,7 +18,7 @@ curl --request POST \
1618
"value.converter": "io.confluent.connect.avro.AvroConverter",
1719
"value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
1820
"tasks.max": "1",
19-
"topics": "redis.commands",
21+
"topics": "redis.commands.set,redis.commands.expire,redis.commands.expireat,redis.commands.pexpire,redis.commands.sadd,redis.commands.geoadd",
2022
"redis.uri": "redis://IEPfIr0eLF7UsfwrIlzy80yUaBG258j9@redis-cluster",
2123
"redis.cluster.enabled": true
2224
}
@@ -49,28 +51,54 @@ Create an interactive ephemeral query pod:
4951
kubectl -n kcr-demo run -it --rm kafka-write-records --image confluentinc/cp-schema-registry:6.0.0 --command /bin/bash
5052
```
5153

52-
Write records to the `redis.commands` topic:
53-
54-
**IMPORTANT:** The following Avro example does not completely work [due to a bug in the Avro console producer](https://github.com/confluentinc/schema-registry/issues/898). A fix has been merged but Confluent has not published a new Docker image for it yet (6.1.0+). In the meantime, you may only issue one Redis command type per topic if you are using the Avro console producer.
54+
Write records to the `redis.commands` topics:
5555

5656
```bash
5757
kafka-avro-console-producer \
5858
--broker-list kafka-broker-0.kafka-broker:9092 \
5959
--property schema.registry.url='http://kafka-schema-registry:8081' \
6060
--property value.schema='{"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisSetCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"value","type":"string"},{"name":"expiration","type":["null",{"name":"RedisSetCommandExpiration","type":"record","fields":[{"name":"type","type":{"name":"RedisSetCommandExpirationType","type":"enum","symbols":["EX","PX","KEEPTTL"]}},{"name":"time","type":["null","long"]}]}],"default":null},{"name":"condition","type":["null",{"name":"RedisSetCommandCondition","type":"enum","symbols":["NX","XX","KEEPTTL"]}],"default":null}]}' \
61-
--property value.subject.name.strategy='io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
62-
--topic redis.commands
61+
--topic redis.commands.set
6362
>{"key":"{user.1}.username","value":"jetpackmelon22","expiration":null,"condition":null}
6463
>{"key":"{user.2}.username","value":"anchorgoat74","expiration":{"io.github.jaredpetersen.kafkaconnectredis.RedisSetCommandExpiration":{"type":"EX","time":{"long":2100}}},"condition":{"io.github.jaredpetersen.kafkaconnectredis.RedisSetCommandCondition":"NX"}}
64+
>{"key":"product.milk","value":"$2.29","expiration":null,"condition":null}
65+
>{"key":"product.bread","value":"$5.49","expiration":null,"condition":null}
66+
>{"key":"product.waffles","value":"$2.59","expiration":null,"condition":null}
67+
```
68+
69+
```bash
70+
kafka-avro-console-producer \
71+
--broker-list kafka-broker-0.kafka-broker:9092 \
72+
--property schema.registry.url='http://kafka-schema-registry:8081' \
73+
--property value.schema='{"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisExpireCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"seconds","type":"long"}]}' \
74+
--topic redis.commands.expire
75+
>{"key":"product.milk","seconds":1800}
76+
```
77+
78+
```bash
79+
kafka-avro-console-producer \
80+
--broker-list kafka-broker-0.kafka-broker:9092 \
81+
--property schema.registry.url='http://kafka-schema-registry:8081' \
82+
--property value.schema='{"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisExpireatCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"timestamp","type":"long"}]}' \
83+
--topic redis.commands.expireat
84+
>{"key":"product.bread","timestamp":4130464553}
85+
```
86+
87+
```bash
88+
kafka-avro-console-producer \
89+
--broker-list kafka-broker-0.kafka-broker:9092 \
90+
--property schema.registry.url='http://kafka-schema-registry:8081' \
91+
--property value.schema='{"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisPexpireCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"milliseconds","type":"long"}]}' \
92+
--topic redis.commands.pexpire
93+
>{"key":"product.waffles","milliseconds":1800000}
6594
```
6695

6796
```bash
6897
kafka-avro-console-producer \
6998
--broker-list kafka-broker-0.kafka-broker:9092 \
7099
--property schema.registry.url='http://kafka-schema-registry:8081' \
71100
--property value.schema='{"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisSaddCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"values","type":{"type":"array","items":"string"}}]}' \
72-
--property value.subject.name.strategy='io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
73-
--topic redis.commands
101+
--topic redis.commands.sadd
74102
>{"key":"{user.1}.interests","values":["reading"]}
75103
>{"key":"{user.2}.interests","values":["sailing","woodworking","programming"]}
76104
```
@@ -80,8 +108,7 @@ kafka-avro-console-producer \
80108
--broker-list kafka-broker-0.kafka-broker:9092 \
81109
--property schema.registry.url='http://kafka-schema-registry:8081' \
82110
--property value.schema='{"namespace":"io.github.jaredpetersen.kafkaconnectredis","name":"RedisGeoaddCommand","type":"record","fields":[{"name":"key","type":"string"},{"name":"values","type":{"type":"array","items":{"name":"RedisGeoaddCommandGeolocation","type":"record","fields":[{"name":"longitude","type":"double"},{"name":"latitude","type":"double"},{"name":"member","type":"string"}]}}}]}' \
83-
--property value.subject.name.strategy='io.confluent.kafka.serializers.subject.TopicRecordNameStrategy' \
84-
--topic redis.commands
111+
--topic redis.commands.geoadd
85112
>{"key":"Sicily","values":[{"longitude":13.361389,"latitude":13.361389,"member":"Palermo"},{"longitude":15.087269,"latitude":37.502669,"member":"Catania"}]}
86113
```
87114

@@ -98,6 +125,12 @@ kafka-console-producer \
98125
--topic redis.commands
99126
>{"payload":{"key":"{user.1}.username","value":"jetpackmelon22"},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisSetCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"value","type":"string","optional":false},{"field":"expiration","type":"struct","fields":[{"field":"type","type":"string","optional":false},{"field":"time","type":"int64","optional":true}],"optional":true},{"field":"condition","type":"string","optional":true}]}}
100127
>{"payload":{"key":"{user.2}.username","value":"anchorgoat74","expiration":{"type":"EX","time":2100},"condition":"NX"},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisSetCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"value","type":"string","optional":false},{"field":"expiration","type":"struct","fields":[{"field":"type","type":"string","optional":false},{"field":"time","type":"int64","optional":true}],"optional":true},{"field":"condition","type":"string","optional":true}]}}
128+
>{"payload":{"key":"product.milk","value":"$2.29"},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisSetCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"value","type":"string","optional":false},{"field":"expiration","type":"struct","fields":[{"field":"type","type":"string","optional":false},{"field":"time","type":"int64","optional":true}],"optional":true},{"field":"condition","type":"string","optional":true}]}}
129+
>{"payload":{"key":"product.milk","seconds":1800},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisExpireCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"seconds","type":"int64","optional":false}]}}
130+
>{"payload":{"key":"product.bread","value":"$5.49"},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisSetCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"value","type":"string","optional":false},{"field":"expiration","type":"struct","fields":[{"field":"type","type":"string","optional":false},{"field":"time","type":"int64","optional":true}],"optional":true},{"field":"condition","type":"string","optional":true}]}}
131+
>{"payload":{"key":"product.bread","timestamp":4130464553},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisExpireatCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"timestamp","type":"int64","optional":false}]}}
132+
>{"payload":{"key":"product.waffles","value":"$2.59"},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisSetCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"value","type":"string","optional":false},{"field":"expiration","type":"struct","fields":[{"field":"type","type":"string","optional":false},{"field":"time","type":"int64","optional":true}],"optional":true},{"field":"condition","type":"string","optional":true}]}}
133+
>{"payload":{"key":"product.waffles","milliseconds":1800000},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisPexpireCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"milliseconds","type":"int64","optional":false}]}}
101134
>{"payload":{"key":"{user.1}.interests","values":["reading"]},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisSaddCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"values","type":"array","items":{"type":"string"},"optional":false}]}}
102135
>{"payload":{"key":"{user.2}.interests","values":["sailing","woodworking","programming"]},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisSaddCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"values","type":"array","items":{"type":"string"},"optional":false}]}}
103136
>{"payload":{"key":"Sicily","values":[{"longitude":13.361389,"latitude":13.361389,"member":"Palermo"},{"longitude":15.087269,"latitude":37.502669,"member":"Catania"}]},"schema":{"name":"io.github.jaredpetersen.kafkaconnectredis.RedisGeoaddCommand","type":"struct","fields":[{"field":"key","type":"string","optional":false},{"field":"values","type":"array","items":{"type":"struct","fields":[{"field":"longitude","type":"double","optional":false},{"field":"latitude","type":"double","optional":false},{"field":"member","type":"string","optional":false}]},"optional":false}]}}
@@ -118,6 +151,12 @@ Run commands to confirm the commands were applied correctly:
118151
```bash
119152
GET {user.1}.username
120153
GET {user.2}.username
154+
GET product.bread
155+
TTL product.bread
156+
GET product.milk
157+
TTL product.milk
158+
GET product.waffles
159+
PTTL product.waffles
121160
SMEMBERS {user.1}.interests
122161
SMEMBERS {user.2}.interests
123162
GEOPOS Sicily Catania

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
<groupId>io.github.jaredpetersen</groupId>
77
<artifactId>kafka-connect-redis</artifactId>
8-
<version>1.0.2</version>
8+
<version>1.0.3</version>
99
<packaging>jar</packaging>
1010

1111
<name>kafka-connect-redis</name>

0 commit comments

Comments
 (0)