Skip to content

Commit 26fbf50

Browse files
committed
impl
1 parent f20f299 commit 26fbf50

File tree

36 files changed

+28
-85
lines changed

36 files changed

+28
-85
lines changed

checkstyle/checkstyle.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
<property name="tokens" value="IDENT, NUM_DOUBLE, LAMBDA, TEXT_BLOCK_LITERAL_BEGIN, UNARY_MINUS, UNARY_PLUS, INC, DEC, POST_INC, POST_DEC" />
4444
</module>
4545
<module name="SimplifyBooleanReturn"/>
46+
<module name="UnusedLocalVariable"/>
4647

4748
<!-- style -->
4849
<module name="DefaultComesLast"/>

clients/src/main/java/org/apache/kafka/common/requests/DescribeUserScramCredentialsRequest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,8 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
6868
.setThrottleTimeMs(throttleTimeMs)
6969
.setErrorCode(apiError.error().code())
7070
.setErrorMessage(apiError.message());
71-
for (DescribeUserScramCredentialsRequestData.UserName user : data.users()) {
71+
72+
for (@SuppressWarnings("UnusedLocalVariable") DescribeUserScramCredentialsRequestData.UserName ignored : data.users()) {
7273
response.results().add(new DescribeUserScramCredentialsResponseData.DescribeUserScramCredentialsResult()
7374
.setErrorCode(apiError.error().code())
7475
.setErrorMessage(apiError.message()));

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ public void testHeartbeatRequestFailureNotifiedToGroupManagerAfterErrorPropagate
445445
time.sleep(DEFAULT_HEARTBEAT_INTERVAL_MS);
446446
NetworkClientDelegate.PollResult result = heartbeatRequestManager.poll(time.milliseconds());
447447
assertEquals(1, result.unsentRequests.size());
448-
ClientResponse response = createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_AUTHORIZATION_FAILED);
448+
createHeartbeatResponse(result.unsentRequests.get(0), Errors.GROUP_AUTHORIZATION_FAILED);
449449
result.unsentRequests.get(0).handler().onFailure(time.milliseconds(), new AuthenticationException("Fatal error in HB"));
450450

451451
// The error should be propagated before notifying the group manager. This ensures that the app thread is aware

clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2881,7 +2881,7 @@ private void verifySessionPartitions() {
28812881
field.setAccessible(true);
28822882
LinkedHashMap<?, ?> sessionPartitions =
28832883
(LinkedHashMap<?, ?>) field.get(handler);
2884-
for (Map.Entry<?, ?> entry : sessionPartitions.entrySet()) {
2884+
for (@SuppressWarnings("UnusedLocalVariable") Map.Entry<?, ?> ignored : sessionPartitions.entrySet()) {
28852885
// If `sessionPartitions` are modified on another thread, Thread.yield will increase the
28862886
// possibility of ConcurrentModificationException if appropriate synchronization is not used.
28872887
Thread.yield();

clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -397,7 +397,7 @@ public void testStressfulSituation() throws Exception {
397397
List<ProducerBatch> batches = accum.drain(metadataCache, nodes, 5 * 1024, 0).get(node1.id());
398398
if (batches != null) {
399399
for (ProducerBatch batch : batches) {
400-
for (Record record : batch.records().records())
400+
for (@SuppressWarnings("UnusedLocalVariable") Record ignored : batch.records().records())
401401
read++;
402402
accum.deallocate(batch);
403403
}

clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1674,7 +1674,7 @@ public void testForceCloseWithProducerIdReset() throws Exception {
16741674
Sender sender = new Sender(logContext, client, metadata, this.accumulator, true, MAX_REQUEST_SIZE, ACKS_ALL, 10,
16751675
senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, apiVersions);
16761676

1677-
Future<RecordMetadata> failedResponse = appendToAccumulator(tp0);
1677+
appendToAccumulator(tp0);
16781678
Future<RecordMetadata> successfulResponse = appendToAccumulator(tp1);
16791679
sender.runOnce(); // connect and send.
16801680

clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -527,11 +527,6 @@ public void produceRequestGetErrorResponseTest() {
527527
public void fetchResponseVersionTest() {
528528
Uuid id = Uuid.randomUuid();
529529
MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
530-
FetchResponseData.PartitionData partitionData = new FetchResponseData.PartitionData()
531-
.setPartitionIndex(0)
532-
.setHighWatermark(1000000)
533-
.setLogStartOffset(-1)
534-
.setRecords(records);
535530
LinkedHashMap<TopicIdPartition, FetchResponseData.PartitionData> idResponseData = new LinkedHashMap<>();
536531
idResponseData.put(new TopicIdPartition(id, new TopicPartition("test", 0)),
537532
new FetchResponseData.PartitionData()

connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorSourceTaskTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import org.apache.kafka.clients.consumer.ConsumerRecords;
2121
import org.apache.kafka.clients.consumer.KafkaConsumer;
2222
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
23-
import org.apache.kafka.clients.producer.KafkaProducer;
2423
import org.apache.kafka.clients.producer.RecordMetadata;
2524
import org.apache.kafka.common.TopicPartition;
2625
import org.apache.kafka.common.header.Header;
@@ -283,8 +282,6 @@ public void testCommitRecordWithNullMetadata() {
283282

284283
@SuppressWarnings("unchecked")
285284
KafkaConsumer<byte[], byte[]> consumer = mock(KafkaConsumer.class);
286-
@SuppressWarnings("unchecked")
287-
KafkaProducer<byte[], byte[]> producer = mock(KafkaProducer.class);
288285
MirrorSourceMetrics metrics = mock(MirrorSourceMetrics.class);
289286

290287
String sourceClusterName = "cluster1";

connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ public void start(Map<String, String> props) {
6060
@Override
6161
public void put(Collection<SinkRecord> records) {
6262
super.put(records);
63-
for (SinkRecord ignore : records) {
63+
for (@SuppressWarnings("UnusedLocalVariable") SinkRecord ignore : records) {
6464
count++;
6565
}
6666
}

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractWorkerSourceTaskTest.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -751,9 +751,6 @@ public void testSendRecordsFailedTransformationErrorToleranceAll() {
751751

752752
expectConvertHeadersAndKeyValue(emptyHeaders(), TOPIC);
753753

754-
TopicPartitionInfo topicPartitionInfo = new TopicPartitionInfo(0, null, Collections.emptyList(), Collections.emptyList());
755-
TopicDescription topicDesc = new TopicDescription(TOPIC, false, Collections.singletonList(topicPartitionInfo));
756-
757754
workerTask.toSend = Arrays.asList(record1);
758755

759756
// The transformation errored out so the error should be ignored & the record skipped with error tolerance all

connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConnectorTest.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,7 @@ protected void assertInitializedMetric(WorkerConnector workerConnector, String e
621621
String type = metrics.currentMetricValueAsString(metricGroup, "connector-type");
622622
String clazz = metrics.currentMetricValueAsString(metricGroup, "connector-class");
623623
String version = metrics.currentMetricValueAsString(metricGroup, "connector-version");
624+
assertEquals("unassigned", status);
624625
assertEquals(expectedType, type);
625626
assertNotNull(clazz);
626627
assertEquals(VERSION, version);

core/src/main/java/kafka/log/remote/RemoteLogManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -960,6 +960,7 @@ public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException
960960
// back to the caller. It's important to note that the task being executed is already
961961
// cancelled before the executing thread is interrupted. The caller is responsible
962962
// for handling the exception gracefully by checking if the task is already cancelled.
963+
@SuppressWarnings("UnusedLocalVariable")
963964
boolean ignored = copyQuotaManagerLockCondition.await(quotaTimeout().toMillis(), TimeUnit.MILLISECONDS);
964965
throttleTimeMs = rlmCopyQuotaManager.getThrottleTimeMs();
965966
}

generator/src/main/java/org/apache/kafka/message/checker/EvolutionVerifier.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ static void verifyVersionsMatchTopLevelMessage(
7474
verifyVersionsMatchTopLevelMessage(what, topLevelMessage, field);
7575
}
7676
for (StructSpec struct : topLevelMessage.commonStructs()) {
77-
for (FieldSpec field : topLevelMessage.fields()) {
77+
for (FieldSpec field : struct.fields()) {
7878
verifyVersionsMatchTopLevelMessage(what, topLevelMessage, field);
7979
}
8080
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2966,7 +2966,7 @@ public void testOffsetCommitsSensor() {
29662966
assertEquals(1, group.generationId());
29672967
group.transitionTo(ClassicGroupState.STABLE);
29682968

2969-
CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result = context.commitOffset(
2969+
context.commitOffset(
29702970
new OffsetCommitRequestData()
29712971
.setGroupId("foo")
29722972
.setMemberId("member")

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/modern/TopicMetadataTest.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@
2121

2222
import org.junit.jupiter.api.Test;
2323

24-
import java.util.Map;
25-
import java.util.Set;
26-
27-
import static org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
2824
import static org.junit.jupiter.api.Assertions.assertEquals;
2925
import static org.junit.jupiter.api.Assertions.assertNotEquals;
3026
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -49,7 +45,6 @@ public void testTopicIdAndNameCannotBeNull() {
4945
@Test
5046
public void testEquals() {
5147
Uuid topicId = Uuid.randomUuid();
52-
Map<Integer, Set<String>> partitionRacks = mkMapOfPartitionRacks(15);
5348
TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15);
5449

5550
assertEquals(new TopicMetadata(topicId, "foo", 15), topicMetadata);

metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,7 @@ public void testEventQueueOperationsStartedMetric() throws Throwable {
198198
build()
199199
) {
200200
QuorumController active = controlEnv.activeController();
201-
Map<Integer, Long> brokerEpochs = registerBrokersAndUnfence(active, 3);
201+
registerBrokersAndUnfence(active, 3);
202202

203203
// Test that a new operation increments operationsStarted. We retry this if needed
204204
// to handle the case where another operation is performed in between loading

metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1535,7 +1535,6 @@ public void testDeleteTopicsWithMutationQuotaExceeded() {
15351535
anonymousContextFor(ApiKeys.CREATE_TOPICS);
15361536
ControllerResult<CreateTopicsResponseData> createResult =
15371537
replicationControl.createTopics(createTopicsRequestContext, request, Collections.singleton("foo"));
1538-
CreateTopicsResponseData expectedResponse = new CreateTopicsResponseData();
15391538
CreatableTopicResult createdTopic = createResult.response().topics().find("foo");
15401539
assertEquals(NONE.code(), createdTopic.errorCode());
15411540
ctx.replay(createResult.records());

raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientReconfigTest.java

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -189,24 +189,6 @@ public void testLeaderDoesNotBootstrapRecordsWithKraftVersion0() throws Exceptio
189189
.withUnknownLeader(0)
190190
.build();
191191

192-
List<List<ControlRecord>> expectedBootstrapRecords = Arrays.asList(
193-
Arrays.asList(
194-
new ControlRecord(
195-
ControlRecordType.SNAPSHOT_HEADER,
196-
new SnapshotHeaderRecord()
197-
.setVersion((short) 0)
198-
.setLastContainedLogTimestamp(0)
199-
)
200-
),
201-
Arrays.asList(
202-
new ControlRecord(
203-
ControlRecordType.SNAPSHOT_FOOTER,
204-
new SnapshotFooterRecord()
205-
.setVersion((short) 0)
206-
)
207-
)
208-
);
209-
210192
// check leader does not write bootstrap records to log
211193
context.unattachedToLeader();
212194

server-common/src/test/java/org/apache/kafka/server/share/persister/PersisterStateManagerTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -653,9 +653,8 @@ public void testFindCoordinatorSuccess() {
653653

654654
CompletableFuture<ReadShareGroupStateResponse> resultFuture = handler.result();
655655

656-
ReadShareGroupStateResponse result = null;
657656
try {
658-
result = resultFuture.get();
657+
resultFuture.get();
659658
} catch (Exception e) {
660659
fail("Failed to get result from future", e);
661660
}

storage/src/main/java/org/apache/kafka/storage/internals/log/LogLoader.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ public LoadedLogOffsets load() throws IOException {
169169
long offset = LogFileUtils.offsetFromFile(file);
170170
if (offset >= minSwapFileOffset && offset < maxSwapFileOffset) {
171171
logger.info("Deleting segment files {} that is compacted but has not been deleted yet.", file.getName());
172+
@SuppressWarnings("UnusedLocalVariable")
172173
boolean ignore = file.delete();
173174
}
174175
}
@@ -186,6 +187,7 @@ public LoadedLogOffsets load() throws IOException {
186187
}
187188
if (file.getName().endsWith(LogFileUtils.SWAP_FILE_SUFFIX)) {
188189
logger.info("Recovering file {} by renaming from {} files.", file.getName(), LogFileUtils.SWAP_FILE_SUFFIX);
190+
@SuppressWarnings("UnusedLocalVariable")
189191
boolean ignore = file.renameTo(new File(Utils.replaceSuffix(file.getPath(), LogFileUtils.SWAP_FILE_SUFFIX, "")));
190192
}
191193
}

storage/src/test/java/org/apache/kafka/storage/internals/log/LogSegmentTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ public void teardown() throws IOException {
144144
})
145145
public void testAppendForLogSegmentOffsetOverflowException(long baseOffset, long largestOffset) throws IOException {
146146
try (LogSegment seg = createSegment(baseOffset, 10, Time.SYSTEM)) {
147-
long currentTime = Time.SYSTEM.milliseconds();
148147
MemoryRecords memoryRecords = v1Records(0, "hello");
149148
assertThrows(LogSegmentOffsetOverflowException.class, () -> seg.append(largestOffset, memoryRecords));
150149
}

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KTableEfficientRangeQueryTest.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import org.apache.kafka.streams.TestInputTopic;
2727
import org.apache.kafka.streams.Topology;
2828
import org.apache.kafka.streams.TopologyTestDriver;
29-
import org.apache.kafka.streams.kstream.KTable;
3029
import org.apache.kafka.streams.kstream.Materialized;
3130
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
3231
import org.apache.kafka.streams.state.KeyValueIterator;
@@ -117,8 +116,7 @@ public void testStoreConfig(final StoreType storeType, final boolean enableLoggi
117116
final Materialized<String, String, KeyValueStore<Bytes, byte[]>> stateStoreConfig = getStoreConfig(storeType, TABLE_NAME, enableLogging, enableCaching);
118117
//Create topology: table from input topic
119118
final StreamsBuilder builder = new StreamsBuilder();
120-
final KTable<String, String> table =
121-
builder.table("input", stateStoreConfig);
119+
builder.table("input", stateStoreConfig);
122120
final Topology topology = builder.build();
123121

124122
try (final TopologyTestDriver driver = new TopologyTestDriver(topology)) {

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public void testRegexRecordsAreProcessedAfterNewTopicCreatedWithMultipleSubtopol
212212

213213
final StreamsBuilder builder = new StreamsBuilder();
214214
final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
215-
final KStream<String, String> otherStream = builder.stream(Pattern.compile("not-a-match"));
215+
builder.stream(Pattern.compile("not-a-match"));
216216

217217
pattern1Stream
218218
.selectKey((k, v) -> k)

streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -885,7 +885,8 @@ public Set<StreamTask> drainRestoredActiveTasks(final Duration timeout) {
885885
restoredActiveTasksLock.lock();
886886
try {
887887
while (restoredActiveTasks.isEmpty() && now <= deadline) {
888-
final boolean elapsed = restoredActiveTasksCondition.await(deadline - now, TimeUnit.MILLISECONDS);
888+
@SuppressWarnings("UnusedLocalVariable")
889+
final boolean ignored = restoredActiveTasksCondition.await(deadline - now, TimeUnit.MILLISECONDS);
889890
now = time.milliseconds();
890891
}
891892
result.addAll(restoredActiveTasks);

streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -472,7 +472,6 @@ public void shouldMarkStreamStreamJoinAsSelfJoinSingleStream() {
472472
final GraphNode join = getNodeByType(builder.root, StreamStreamJoinNode.class, new HashSet<>());
473473
assertNotNull(join);
474474
assertTrue(((StreamStreamJoinNode) join).getSelfJoin());
475-
final GraphNode parent = join.parentNodes().stream().findFirst().get();
476475
final AtomicInteger count = new AtomicInteger();
477476
countJoinWindowNodes(count, builder.root, new HashSet<>());
478477
assertEquals(count.get(), 1);

streams/src/test/java/org/apache/kafka/streams/kstream/internals/SuppressScenarioTest.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.kafka.streams.kstream.Consumed;
3535
import org.apache.kafka.streams.kstream.Grouped;
3636
import org.apache.kafka.streams.kstream.KGroupedStream;
37-
import org.apache.kafka.streams.kstream.KStream;
3837
import org.apache.kafka.streams.kstream.KTable;
3938
import org.apache.kafka.streams.kstream.Materialized;
4039
import org.apache.kafka.streams.kstream.Named;
@@ -816,7 +815,7 @@ public void shouldWorkWithCogroupedTimeWindows() {
816815

817816
final KGroupedStream<String, String> stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
818817
final KGroupedStream<String, String> stream2 = builder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
819-
final KStream<Windowed<String>, Object> cogrouped = stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value)
818+
stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value)
820819
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(15)))
821820
.aggregate(() -> "", Named.as("test"), Materialized.as("store"))
822821
.suppress(Suppressed.untilWindowCloses(unbounded()))
@@ -829,7 +828,7 @@ public void shouldWorkWithCogroupedSlidingWindows() {
829828

830829
final KGroupedStream<String, String> stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
831830
final KGroupedStream<String, String> stream2 = builder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
832-
final KStream<Windowed<String>, Object> cogrouped = stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value)
831+
stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value)
833832
.windowedBy(SlidingWindows.ofTimeDifferenceWithNoGrace(Duration.ofMinutes(15)))
834833
.aggregate(() -> "", Named.as("test"), Materialized.as("store"))
835834
.suppress(Suppressed.untilWindowCloses(unbounded()))
@@ -842,7 +841,7 @@ public void shouldWorkWithCogroupedSessionWindows() {
842841

843842
final KGroupedStream<String, String> stream1 = builder.stream("one", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
844843
final KGroupedStream<String, String> stream2 = builder.stream("two", Consumed.with(Serdes.String(), Serdes.String())).groupByKey(Grouped.with(Serdes.String(), Serdes.String()));
845-
final KStream<Windowed<String>, Object> cogrouped = stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value)
844+
stream1.cogroup((key, value, aggregate) -> aggregate + value).cogroup(stream2, (key, value, aggregate) -> aggregate + value)
846845
.windowedBy(SessionWindows.ofInactivityGapAndGrace(Duration.ofMinutes(15), Duration.ofMinutes(5)))
847846
.aggregate(() -> "", (k, v1, v2) -> "", Named.as("test"), Materialized.as("store"))
848847
.suppress(Suppressed.untilWindowCloses(unbounded()))

streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorRecordContextTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public void shouldNotAllowNullHeaders() {
4444

4545
@Test
4646
public void shouldEstimateNullTopicAndEmptyHeadersAsZeroLength() {
47-
final Headers headers = new RecordHeaders();
4847
final ProcessorRecordContext context = new ProcessorRecordContext(
4948
42L,
5049
73L,

0 commit comments

Comments
 (0)