Skip to content

Commit 68acce2

Browse files
authored
feat: allow custom kafka connect storage topics (#26)
1 parent 68b2961 commit 68acce2

13 files changed

+687
-18
lines changed

docs/services.md

+25-9
Original file line numberDiff line numberDiff line change
@@ -101,15 +101,6 @@ This would allow your streams application to access kafka using `my-stream-appli
101101

102102
A basic example which defines one Kafka Connect cluster that has one connector running.
103103

104-
**Connect Topics**
105-
Currently, kafka connect's internal topics must be defined in the following format:
106-
107-
- `connect-configs-{service-name}`
108-
- `connect-offsets-{service-name}`
109-
- `connect-status-{service-name}`
110-
111-
In this example, the connect cluster `group.id` should be `my-connect-cluster`.
112-
113104
```yaml
114105
topics:
115106
connect-configs-my-connect-cluster:
@@ -151,6 +142,31 @@ Behind the scenes, this generates ACLs such as:
151142
- `READ` and `WRITE` for the internal kafka connect topics
152143
- `READ` for the consumer group `my-connect-cluster`
153144

145+
#### Storage Topics
146+
147+
By default, `kafka-gitops` generates ACLs for the internal storage topics following this format:
148+
149+
- `connect-configs-{service-name}`
150+
- `connect-offsets-{service-name}`
151+
- `connect-status-{service-name}`
152+
153+
You can specify custom internal storage topics using the `storage-topics` property:
154+
155+
```yaml
156+
services:
157+
my-connect-cluster:
158+
type: kafka-connect
159+
principal: User:myconnectcluster
160+
storage-topics:
161+
config: custom-config-topic
162+
offset: custom-offset-topic
163+
status: custom-status-topic
164+
connectors:
165+
rabbitmq-sink:
166+
consumes:
167+
- rabbitmq-data
168+
```
169+
154170
#### Group ID
155171

156172
The `group.id` used for the connect cluster ACLs defaults to the service name. You can override this by specifying the `group-id` property, as shown below:

src/main/java/com/devshawn/kafka/gitops/domain/state/service/KafkaConnectService.java

+34-6
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ public abstract class KafkaConnectService extends ServiceDetails {
2121

2222
public abstract Optional<String> getPrincipal();
2323

24+
@JsonProperty("storage-topics")
25+
public abstract Optional<KafkaConnectStorageTopics> getStorageTopics();
26+
2427
public abstract List<String> getProduces();
2528

2629
public abstract Map<String, KafkaConnectorDetails> getConnectors();
@@ -35,18 +38,43 @@ public List<AclDetails.Builder> getAcls(String serviceName) {
3538

3639
private List<AclDetails.Builder> getConnectWorkerAcls(String serviceName) {
3740
String groupId = getGroupId().isPresent() ? getGroupId().get() : serviceName;
41+
String configTopic = getConfigTopic(serviceName);
42+
String offsetTopic = getOffsetTopic(serviceName);
43+
String statusTopic = getStatusTopic(serviceName);
44+
3845
List<AclDetails.Builder> acls = new ArrayList<>();
39-
acls.add(generateReadAcl(String.format("connect-configs-%s", serviceName), getPrincipal()));
40-
acls.add(generateReadAcl(String.format("connect-offsets-%s", serviceName), getPrincipal()));
41-
acls.add(generateReadAcl(String.format("connect-status-%s", serviceName), getPrincipal()));
42-
acls.add(generateWriteACL(String.format("connect-configs-%s", serviceName), getPrincipal()));
43-
acls.add(generateWriteACL(String.format("connect-offsets-%s", serviceName), getPrincipal()));
44-
acls.add(generateWriteACL(String.format("connect-status-%s", serviceName), getPrincipal()));
46+
acls.add(generateReadAcl(configTopic, getPrincipal()));
47+
acls.add(generateReadAcl(offsetTopic, getPrincipal()));
48+
acls.add(generateReadAcl(statusTopic, getPrincipal()));
49+
acls.add(generateWriteACL(configTopic, getPrincipal()));
50+
acls.add(generateWriteACL(offsetTopic, getPrincipal()));
51+
acls.add(generateWriteACL(statusTopic, getPrincipal()));
4552
acls.add(generateConsumerGroupAcl(groupId, getPrincipal(), "READ"));
4653
getConnectors().forEach((connectorName, connector) -> acls.addAll(connector.getAcls(connectorName, getPrincipal())));
4754
return acls;
4855
}
4956

57+
private String getConfigTopic(String serviceName) {
58+
if (getStorageTopics().isPresent() && getStorageTopics().get().getConfig().isPresent()) {
59+
return getStorageTopics().get().getConfig().get();
60+
}
61+
return String.format("connect-configs-%s", serviceName);
62+
}
63+
64+
private String getOffsetTopic(String serviceName) {
65+
if (getStorageTopics().isPresent() && getStorageTopics().get().getOffset().isPresent()) {
66+
return getStorageTopics().get().getOffset().get();
67+
}
68+
return String.format("connect-offsets-%s", serviceName);
69+
}
70+
71+
private String getStatusTopic(String serviceName) {
72+
if (getStorageTopics().isPresent() && getStorageTopics().get().getStatus().isPresent()) {
73+
return getStorageTopics().get().getStatus().get();
74+
}
75+
return String.format("connect-status-%s", serviceName);
76+
}
77+
5078
public static class Builder extends KafkaConnectService_Builder {
5179

5280
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package com.devshawn.kafka.gitops.domain.state.service;
2+
3+
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
4+
import org.inferred.freebuilder.FreeBuilder;
5+
6+
import java.util.Optional;
7+
8+
@FreeBuilder
9+
@JsonDeserialize(builder = KafkaConnectStorageTopics.Builder.class)
10+
public interface KafkaConnectStorageTopics {
11+
12+
Optional<String> getConfig();
13+
14+
Optional<String> getOffset();
15+
16+
Optional<String> getStatus();
17+
18+
class Builder extends KafkaConnectStorageTopics_Builder {
19+
}
20+
}

src/test/groovy/com/devshawn/kafka/gitops/ApplyCommandIntegrationSpec.groovy

+3-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,9 @@ class ApplyCommandIntegrationSpec extends Specification {
5656
"custom-user-acls",
5757
"custom-group-id-application",
5858
"custom-group-id-connect",
59-
"custom-application-id-streams"
59+
"custom-application-id-streams",
60+
"custom-storage-topic",
61+
"custom-storage-topics"
6062
]
6163
}
6264

src/test/groovy/com/devshawn/kafka/gitops/PlanCommandIntegrationSpec.groovy

+5-2
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ class PlanCommandIntegrationSpec extends Specification {
6161
"custom-user-acls",
6262
"custom-group-id-application",
6363
"custom-group-id-connect",
64-
"custom-application-id-streams"
64+
"custom-application-id-streams",
65+
"custom-storage-topic",
66+
"custom-storage-topics"
6567
]
6668
}
6769

@@ -130,7 +132,8 @@ class PlanCommandIntegrationSpec extends Specification {
130132
"invalid-topic",
131133
"unrecognized-property",
132134
"invalid-format",
133-
"invalid-missing-user-principal"
135+
"invalid-missing-user-principal",
136+
"invalid-storage-topics"
134137
]
135138
}
136139

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
Executing apply...
2+
3+
Applying: [CREATE]
4+
5+
+ [ACL] test-connect-cluster-0
6+
+ resource_name: test-connect-cluster-configs
7+
+ resource_type: TOPIC
8+
+ resource_pattern: LITERAL
9+
+ resource_principal: User:test
10+
+ host: *
11+
+ operation: READ
12+
+ permission: ALLOW
13+
14+
15+
Successfully applied.
16+
17+
Applying: [CREATE]
18+
19+
+ [ACL] test-connect-cluster-1
20+
+ resource_name: connect-offsets-test-connect-cluster
21+
+ resource_type: TOPIC
22+
+ resource_pattern: LITERAL
23+
+ resource_principal: User:test
24+
+ host: *
25+
+ operation: READ
26+
+ permission: ALLOW
27+
28+
29+
Successfully applied.
30+
31+
Applying: [CREATE]
32+
33+
+ [ACL] test-connect-cluster-2
34+
+ resource_name: connect-status-test-connect-cluster
35+
+ resource_type: TOPIC
36+
+ resource_pattern: LITERAL
37+
+ resource_principal: User:test
38+
+ host: *
39+
+ operation: READ
40+
+ permission: ALLOW
41+
42+
43+
Successfully applied.
44+
45+
Applying: [CREATE]
46+
47+
+ [ACL] test-connect-cluster-3
48+
+ resource_name: test-connect-cluster-configs
49+
+ resource_type: TOPIC
50+
+ resource_pattern: LITERAL
51+
+ resource_principal: User:test
52+
+ host: *
53+
+ operation: WRITE
54+
+ permission: ALLOW
55+
56+
57+
Successfully applied.
58+
59+
Applying: [CREATE]
60+
61+
+ [ACL] test-connect-cluster-4
62+
+ resource_name: connect-offsets-test-connect-cluster
63+
+ resource_type: TOPIC
64+
+ resource_pattern: LITERAL
65+
+ resource_principal: User:test
66+
+ host: *
67+
+ operation: WRITE
68+
+ permission: ALLOW
69+
70+
71+
Successfully applied.
72+
73+
Applying: [CREATE]
74+
75+
+ [ACL] test-connect-cluster-5
76+
+ resource_name: connect-status-test-connect-cluster
77+
+ resource_type: TOPIC
78+
+ resource_pattern: LITERAL
79+
+ resource_principal: User:test
80+
+ host: *
81+
+ operation: WRITE
82+
+ permission: ALLOW
83+
84+
85+
Successfully applied.
86+
87+
Applying: [CREATE]
88+
89+
+ [ACL] test-connect-cluster-6
90+
+ resource_name: test-connect-cluster
91+
+ resource_type: GROUP
92+
+ resource_pattern: LITERAL
93+
+ resource_principal: User:test
94+
+ host: *
95+
+ operation: READ
96+
+ permission: ALLOW
97+
98+
99+
Successfully applied.
100+
101+
Applying: [CREATE]
102+
103+
+ [ACL] test-connect-cluster-7
104+
+ resource_name: production-topic
105+
+ resource_type: TOPIC
106+
+ resource_pattern: LITERAL
107+
+ resource_principal: User:test
108+
+ host: *
109+
+ operation: WRITE
110+
+ permission: ALLOW
111+
112+
113+
Successfully applied.
114+
115+
Applying: [CREATE]
116+
117+
+ [ACL] test-connect-cluster-8
118+
+ resource_name: consumption-topic
119+
+ resource_type: TOPIC
120+
+ resource_pattern: LITERAL
121+
+ resource_principal: User:test
122+
+ host: *
123+
+ operation: READ
124+
+ permission: ALLOW
125+
126+
127+
Successfully applied.
128+
129+
Applying: [CREATE]
130+
131+
+ [ACL] test-connect-cluster-9
132+
+ resource_name: connect-test-sink
133+
+ resource_type: GROUP
134+
+ resource_pattern: LITERAL
135+
+ resource_principal: User:test
136+
+ host: *
137+
+ operation: READ
138+
+ permission: ALLOW
139+
140+
141+
Successfully applied.
142+
143+
[SUCCESS] Apply complete! Resources: 10 created, 0 updated, 0 deleted.

0 commit comments

Comments
 (0)