Skip to content

Commit 7719b5f

Browse files
authored
KAFKA-18644: improve generic type names for internal FK-join classes (apache#18700)
Reviewers: Lucas Brutschy <[email protected]>
1 parent 9f78771 commit 7719b5f

13 files changed

+183
-181
lines changed

streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKey.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,26 +14,25 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
1817
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
1918

2019
import java.util.Objects;
2120

22-
public class CombinedKey<KF, KP> {
23-
private final KF foreignKey;
24-
private final KP primaryKey;
21+
public class CombinedKey<KRight, KLeft> {
22+
private final KRight foreignKey;
23+
private final KLeft primaryKey;
2524

26-
CombinedKey(final KF foreignKey, final KP primaryKey) {
25+
CombinedKey(final KRight foreignKey, final KLeft primaryKey) {
2726
Objects.requireNonNull(primaryKey, "primaryKey can't be null");
2827
this.foreignKey = foreignKey;
2928
this.primaryKey = primaryKey;
3029
}
3130

32-
public KF foreignKey() {
31+
public KRight foreignKey() {
3332
return foreignKey;
3433
}
3534

36-
public KP primaryKey() {
35+
public KLeft primaryKey() {
3736
return primaryKey;
3837
}
3938

streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/CombinedKeySchema.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -28,20 +28,20 @@
2828
/**
2929
* Factory for creating CombinedKey serializers / deserializers.
3030
*/
31-
public class CombinedKeySchema<KO, K> {
31+
public class CombinedKeySchema<KRight, KLeft> {
3232
private final Supplier<String> undecoratedPrimaryKeySerdeTopicSupplier;
3333
private final Supplier<String> undecoratedForeignKeySerdeTopicSupplier;
3434
private String primaryKeySerdeTopic;
3535
private String foreignKeySerdeTopic;
36-
private Serializer<K> primaryKeySerializer;
37-
private Deserializer<K> primaryKeyDeserializer;
38-
private Serializer<KO> foreignKeySerializer;
39-
private Deserializer<KO> foreignKeyDeserializer;
36+
private Serializer<KLeft> primaryKeySerializer;
37+
private Deserializer<KLeft> primaryKeyDeserializer;
38+
private Serializer<KRight> foreignKeySerializer;
39+
private Deserializer<KRight> foreignKeyDeserializer;
4040

4141
public CombinedKeySchema(final Supplier<String> foreignKeySerdeTopicSupplier,
42-
final Serde<KO> foreignKeySerde,
42+
final Serde<KRight> foreignKeySerde,
4343
final Supplier<String> primaryKeySerdeTopicSupplier,
44-
final Serde<K> primaryKeySerde) {
44+
final Serde<KLeft> primaryKeySerde) {
4545
undecoratedPrimaryKeySerdeTopicSupplier = primaryKeySerdeTopicSupplier;
4646
undecoratedForeignKeySerdeTopicSupplier = foreignKeySerdeTopicSupplier;
4747
primaryKeySerializer = primaryKeySerde == null ? null : primaryKeySerde.serializer();
@@ -50,17 +50,17 @@ public CombinedKeySchema(final Supplier<String> foreignKeySerdeTopicSupplier,
5050
foreignKeySerializer = foreignKeySerde == null ? null : foreignKeySerde.serializer();
5151
}
5252

53-
@SuppressWarnings("unchecked")
53+
@SuppressWarnings({"unchecked", "resource"})
5454
public void init(final ProcessorContext<?, ?> context) {
5555
primaryKeySerdeTopic = undecoratedPrimaryKeySerdeTopicSupplier.get();
5656
foreignKeySerdeTopic = undecoratedForeignKeySerdeTopicSupplier.get();
57-
primaryKeySerializer = primaryKeySerializer == null ? (Serializer<K>) context.keySerde().serializer() : primaryKeySerializer;
58-
primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer<K>) context.keySerde().deserializer() : primaryKeyDeserializer;
59-
foreignKeySerializer = foreignKeySerializer == null ? (Serializer<KO>) context.keySerde().serializer() : foreignKeySerializer;
60-
foreignKeyDeserializer = foreignKeyDeserializer == null ? (Deserializer<KO>) context.keySerde().deserializer() : foreignKeyDeserializer;
57+
primaryKeySerializer = primaryKeySerializer == null ? (Serializer<KLeft>) context.keySerde().serializer() : primaryKeySerializer;
58+
primaryKeyDeserializer = primaryKeyDeserializer == null ? (Deserializer<KLeft>) context.keySerde().deserializer() : primaryKeyDeserializer;
59+
foreignKeySerializer = foreignKeySerializer == null ? (Serializer<KRight>) context.keySerde().serializer() : foreignKeySerializer;
60+
foreignKeyDeserializer = foreignKeyDeserializer == null ? (Deserializer<KRight>) context.keySerde().deserializer() : foreignKeyDeserializer;
6161
}
6262

63-
Bytes toBytes(final KO foreignKey, final K primaryKey) {
63+
Bytes toBytes(final KRight foreignKey, final KLeft primaryKey) {
6464
//The serialization format - note that primaryKeySerialized may be null, such as when a prefixScan
6565
//key is being created.
6666
//{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
@@ -79,22 +79,22 @@ Bytes toBytes(final KO foreignKey, final K primaryKey) {
7979
}
8080

8181

82-
public CombinedKey<KO, K> fromBytes(final Bytes data) {
82+
public CombinedKey<KRight, KLeft> fromBytes(final Bytes data) {
8383
//{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
8484
final byte[] dataArray = data.get();
8585
final ByteBuffer dataBuffer = ByteBuffer.wrap(dataArray);
8686
final int foreignKeyLength = dataBuffer.getInt();
8787
final byte[] foreignKeyRaw = new byte[foreignKeyLength];
8888
dataBuffer.get(foreignKeyRaw, 0, foreignKeyLength);
89-
final KO foreignKey = foreignKeyDeserializer.deserialize(foreignKeySerdeTopic, foreignKeyRaw);
89+
final KRight foreignKey = foreignKeyDeserializer.deserialize(foreignKeySerdeTopic, foreignKeyRaw);
9090

9191
final byte[] primaryKeyRaw = new byte[dataArray.length - foreignKeyLength - Integer.BYTES];
9292
dataBuffer.get(primaryKeyRaw, 0, primaryKeyRaw.length);
93-
final K primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerdeTopic, primaryKeyRaw);
93+
final KLeft primaryKey = primaryKeyDeserializer.deserialize(primaryKeySerdeTopic, primaryKeyRaw);
9494
return new CombinedKey<>(foreignKey, primaryKey);
9595
}
9696

97-
Bytes prefixBytes(final KO key) {
97+
Bytes prefixBytes(final KRight key) {
9898
//The serialization format. Note that primaryKeySerialized is not required/used in this function.
9999
//{Integer.BYTES foreignKeyLength}{foreignKeySerialized}{Optional-primaryKeySerialized}
100100

streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignKeyExtractor.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,19 @@
3030
* <li>{@link #fromBiFunction(BiFunction)} - when the foreign key depends on both key and value</li>
3131
* </ul>
3232
*
33-
* @param <K> Type of primary table's key
34-
* @param <V> Type of primary table's value
35-
* @param <KO> Type of the foreign key to extract
33+
* @param <KLeft> Type of primary table's key
34+
* @param <VLeft> Type of primary table's value
35+
* @param <KRight> Type of the foreign key to extract
3636
*/
3737
@FunctionalInterface
38-
public interface ForeignKeyExtractor<K, V, KO> {
39-
KO extract(K key, V value);
38+
public interface ForeignKeyExtractor<KLeft, VLeft, KRight> {
39+
KRight extract(KLeft key, VLeft value);
4040

41-
static <K, V, KO> ForeignKeyExtractor<? super K, ? super V, ? extends KO> fromFunction(Function<? super V, ? extends KO> function) {
41+
static <KLeft, VLeft, KRight> ForeignKeyExtractor<? super KLeft, ? super VLeft, ? extends KRight> fromFunction(Function<? super VLeft, ? extends KRight> function) {
4242
return (key, value) -> function.apply(value);
4343
}
4444

45-
static <K, V, KO> ForeignKeyExtractor<? super K, ? super V, ? extends KO> fromBiFunction(BiFunction<? super K, ? super V, ? extends KO> biFunction) {
45+
static <KLeft, VLeft, KRight> ForeignKeyExtractor<? super KLeft, ? super VLeft, ? extends KRight> fromBiFunction(BiFunction<? super KLeft, ? super VLeft, ? extends KRight> biFunction) {
4646
return biFunction::apply;
4747
}
4848
}

streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
1817
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
1918

2019
import org.apache.kafka.common.metrics.Sensor;
@@ -43,15 +42,16 @@
4342
import java.util.Collections;
4443
import java.util.Set;
4544

46-
public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements
47-
ProcessorSupplier<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
45+
public class ForeignTableJoinProcessorSupplier<KLeft, KRight, VRight>
46+
implements ProcessorSupplier<KRight, Change<VRight>, KLeft, SubscriptionResponseWrapper<VRight>> {
47+
4848
private static final Logger LOG = LoggerFactory.getLogger(ForeignTableJoinProcessorSupplier.class);
4949
private final StoreFactory subscriptionStoreFactory;
50-
private final CombinedKeySchema<KO, K> keySchema;
50+
private final CombinedKeySchema<KRight, KLeft> keySchema;
5151
private boolean useVersionedSemantics = false;
5252

5353
public ForeignTableJoinProcessorSupplier(final StoreFactory subscriptionStoreFactory,
54-
final CombinedKeySchema<KO, K> keySchema) {
54+
final CombinedKeySchema<KRight, KLeft> keySchema) {
5555
this.subscriptionStoreFactory = subscriptionStoreFactory;
5656
this.keySchema = keySchema;
5757
}
@@ -62,7 +62,7 @@ public Set<StoreBuilder<?>> stores() {
6262
}
6363

6464
@Override
65-
public Processor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> get() {
65+
public Processor<KRight, Change<VRight>, KLeft, SubscriptionResponseWrapper<VRight>> get() {
6666
return new KTableKTableJoinProcessor();
6767
}
6868

@@ -75,12 +75,12 @@ public boolean isUseVersionedSemantics() {
7575
return useVersionedSemantics;
7676
}
7777

78-
private final class KTableKTableJoinProcessor extends ContextualProcessor<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
78+
private final class KTableKTableJoinProcessor extends ContextualProcessor<KRight, Change<VRight>, KLeft, SubscriptionResponseWrapper<VRight>> {
7979
private Sensor droppedRecordsSensor;
80-
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>> subscriptionStore;
80+
private TimestampedKeyValueStore<Bytes, SubscriptionWrapper<KLeft>> subscriptionStore;
8181

8282
@Override
83-
public void init(final ProcessorContext<K, SubscriptionResponseWrapper<VO>> context) {
83+
public void init(final ProcessorContext<KLeft, SubscriptionResponseWrapper<VRight>> context) {
8484
super.init(context);
8585
final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
8686
droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(
@@ -92,7 +92,7 @@ public void init(final ProcessorContext<K, SubscriptionResponseWrapper<VO>> cont
9292
}
9393

9494
@Override
95-
public void process(final Record<KO, Change<VO>> record) {
95+
public void process(final Record<KRight, Change<VRight>> record) {
9696
// if the key is null, we do not need to proceed aggregating
9797
// the record with the table
9898
if (record.key() == null) {
@@ -122,14 +122,14 @@ public void process(final Record<KO, Change<VO>> record) {
122122
final Bytes prefixBytes = keySchema.prefixBytes(record.key());
123123

124124
//Perform the prefixScan and propagate the results
125-
try (final KeyValueIterator<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> prefixScanResults =
125+
try (final KeyValueIterator<Bytes, ValueAndTimestamp<SubscriptionWrapper<KLeft>>> prefixScanResults =
126126
subscriptionStore.range(prefixBytes, Bytes.increment(prefixBytes))) {
127127

128128
while (prefixScanResults.hasNext()) {
129-
final KeyValue<Bytes, ValueAndTimestamp<SubscriptionWrapper<K>>> next = prefixScanResults.next();
129+
final KeyValue<Bytes, ValueAndTimestamp<SubscriptionWrapper<KLeft>>> next = prefixScanResults.next();
130130
// have to check the prefix because the range end is inclusive :(
131131
if (prefixEquals(next.key.get(), prefixBytes.get())) {
132-
final CombinedKey<KO, K> combinedKey = keySchema.fromBytes(next.key);
132+
final CombinedKey<KRight, KLeft> combinedKey = keySchema.fromBytes(next.key);
133133
context().forward(
134134
record.withKey(combinedKey.primaryKey())
135135
.withValue(new SubscriptionResponseWrapper<>(

streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
1817
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
1918

2019
import org.apache.kafka.common.errors.UnsupportedVersionException;
@@ -43,23 +42,25 @@
4342
* of the primary key. This eliminates race-condition results for rapidly-changing foreign-keys for a given primary key.
4443
* Applies the join and emits nulls according to LEFT/INNER rules.
4544
*
46-
* @param <K> Type of primary keys
47-
* @param <V> Type of primary values
48-
* @param <VO> Type of foreign values
49-
* @param <VR> Type of joined result of primary and foreign values
45+
* @param <KLeft> Type of primary keys
46+
* @param <VLeft> Type of primary values
47+
* @param <VRight> Type of foreign values
48+
* @param <VOut> Type of joined result of primary and foreign values
5049
*/
51-
public class ResponseJoinProcessorSupplier<K, V, VO, VR> implements ProcessorSupplier<K, SubscriptionResponseWrapper<VO>, K, VR> {
50+
public class ResponseJoinProcessorSupplier<KLeft, VLeft, VRight, VOut>
51+
implements ProcessorSupplier<KLeft, SubscriptionResponseWrapper<VRight>, KLeft, VOut> {
52+
5253
private static final Logger LOG = LoggerFactory.getLogger(ResponseJoinProcessorSupplier.class);
53-
private final KTableValueGetterSupplier<K, V> valueGetterSupplier;
54-
private final Serializer<V> constructionTimeValueSerializer;
54+
private final KTableValueGetterSupplier<KLeft, VLeft> valueGetterSupplier;
55+
private final Serializer<VLeft> constructionTimeValueSerializer;
5556
private final Supplier<String> valueHashSerdePseudoTopicSupplier;
56-
private final ValueJoiner<? super V, ? super VO, ? extends VR> joiner;
57+
private final ValueJoiner<? super VLeft, ? super VRight, ? extends VOut> joiner;
5758
private final boolean leftJoin;
5859

59-
public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> valueGetterSupplier,
60-
final Serializer<V> valueSerializer,
60+
public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier<KLeft, VLeft> valueGetterSupplier,
61+
final Serializer<VLeft> valueSerializer,
6162
final Supplier<String> valueHashSerdePseudoTopicSupplier,
62-
final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
63+
final ValueJoiner<? super VLeft, ? super VRight, ? extends VOut> joiner,
6364
final boolean leftJoin) {
6465
this.valueGetterSupplier = valueGetterSupplier;
6566
constructionTimeValueSerializer = valueSerializer;
@@ -69,24 +70,24 @@ public ResponseJoinProcessorSupplier(final KTableValueGetterSupplier<K, V> value
6970
}
7071

7172
@Override
72-
public Processor<K, SubscriptionResponseWrapper<VO>, K, VR> get() {
73+
public Processor<KLeft, SubscriptionResponseWrapper<VRight>, KLeft, VOut> get() {
7374
return new ContextualProcessor<>() {
7475
private String valueHashSerdePseudoTopic;
75-
private Serializer<V> runtimeValueSerializer = constructionTimeValueSerializer;
76+
private Serializer<VLeft> runtimeValueSerializer = constructionTimeValueSerializer;
7677

77-
private KTableValueGetter<K, V> valueGetter;
78+
private KTableValueGetter<KLeft, VLeft> valueGetter;
7879
private Sensor droppedRecordsSensor;
7980

8081

8182
@SuppressWarnings({"unchecked", "resource"})
8283
@Override
83-
public void init(final ProcessorContext<K, VR> context) {
84+
public void init(final ProcessorContext<KLeft, VOut> context) {
8485
super.init(context);
8586
valueHashSerdePseudoTopic = valueHashSerdePseudoTopicSupplier.get();
8687
valueGetter = valueGetterSupplier.get();
8788
valueGetter.init(context);
8889
if (runtimeValueSerializer == null) {
89-
runtimeValueSerializer = (Serializer<V>) context.valueSerde().serializer();
90+
runtimeValueSerializer = (Serializer<VLeft>) context.valueSerde().serializer();
9091
}
9192

9293
final InternalProcessorContext<?, ?> internalProcessorContext = (InternalProcessorContext<?, ?>) context;
@@ -98,14 +99,14 @@ public void init(final ProcessorContext<K, VR> context) {
9899
}
99100

100101
@Override
101-
public void process(final Record<K, SubscriptionResponseWrapper<VO>> record) {
102+
public void process(final Record<KLeft, SubscriptionResponseWrapper<VRight>> record) {
102103
if (record.value().version() != SubscriptionResponseWrapper.CURRENT_VERSION) {
103104
//Guard against modifications to SubscriptionResponseWrapper. Need to ensure that there is
104105
//compatibility with previous versions to enable rolling upgrades. Must develop a strategy for
105106
//upgrading from older SubscriptionWrapper versions to newer versions.
106107
throw new UnsupportedVersionException("SubscriptionResponseWrapper is of an incompatible version.");
107108
}
108-
final ValueAndTimestamp<V> currentValueWithTimestamp = valueGetter.get(record.key());
109+
final ValueAndTimestamp<VLeft> currentValueWithTimestamp = valueGetter.get(record.key());
109110

110111
final long[] currentHash = currentValueWithTimestamp == null ?
111112
null :
@@ -115,7 +116,7 @@ public void process(final Record<K, SubscriptionResponseWrapper<VO>> record) {
115116

116117
//If this value doesn't match the current value from the original table, it is stale and should be discarded.
117118
if (java.util.Arrays.equals(messageHash, currentHash)) {
118-
final VR result;
119+
final VOut result;
119120

120121
if (record.value().foreignValue() == null && (!leftJoin || currentValueWithTimestamp == null)) {
121122
result = null; //Emit tombstone

0 commit comments

Comments
 (0)