Skip to content

Commit dde949d

Browse files
Merge remote-tracking branch 'origin/master' into log-all-requests
# Conflicts: # logging-modules/log-all-requests/src/main/java/com/baeldung/logallrequests/ResponseWrapper.java # logging-modules/log-all-requests/src/test/java/com/baeldung/logallrequests/LoggingFilterIntegrationTest.java
2 parents c7f404c + 8717d70 commit dde949d

File tree

1,317 files changed

+16114
-3720
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,317 files changed

+16114
-3720
lines changed

README.md

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -44,19 +44,21 @@ We also have a parents profile to build only parent modules.
4444

4545
Therefore, we have a total of 9 profiles:
4646

47-
| Profile | Includes | Type of test enabled |
48-
|--------------------|-----------------------------|------------------------------|
49-
| default | JDK21 projects | *UnitTest |
50-
| integration | JDK21 projects | *IntegrationTest |
51-
| default-jdk17 | JDK17 projects | *UnitTest |
52-
| integration-jdk17 | JDK17 projects | *IntegrationTest |
53-
| profile-jdk22 | JDK22 projects | *UnitTest & *IntegrationTest |
54-
| profile-jdk23 | JDK23 projects | *UnitTest & *IntegrationTest |
55-
| default-heavy | Heavy/long running projects | *UnitTest |
56-
| integration-heavy | Heavy/long running projects | *IntegrationTest |
57-
| default-jdk8 | JDK8 projects | *UnitTest |
58-
| integration-jdk8 | JDK8 projects | *IntegrationTest |
59-
| parents | Set of parent modules | None |
47+
| Profile | Includes | Type of test enabled |
48+
|-------------------|-----------------------------|----------------------|
49+
| default | JDK21 projects | *UnitTest |
50+
| integration | JDK21 projects | *IntegrationTest |
51+
| default-jdk17 | JDK17 projects | *UnitTest |
52+
| integration-jdk17 | JDK17 projects | *IntegrationTest |
53+
| default-jdk22 | JDK22 projects | *UnitTest |
54+
| integration-jdk22 | JDK22 projects | *IntegrationTest |
55+
| default-jdk23 | JDK23 projects | *UnitTest |
56+
| integration-jdk23 | JDK23 projects | *IntegrationTest |
57+
| default-heavy | Heavy/long running projects | *UnitTest |
58+
| integration-heavy | Heavy/long running projects | *IntegrationTest |
59+
| default-jdk8 | JDK8 projects | *UnitTest |
60+
| integration-jdk8 | JDK8 projects | *IntegrationTest |
61+
| parents | Set of parent modules | None |
6062

6163
Building the project
6264
====================

algorithms-modules/pom.xml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
<module>algorithms-miscellaneous-7</module>
2525
<module>algorithms-miscellaneous-8</module>
2626
<module>algorithms-miscellaneous-9</module>
27+
<module>algorithms-miscellaneous-10</module>
2728
<module>algorithms-numeric</module>
2829
<module>algorithms-searching</module>
2930
<module>algorithms-sorting</module>
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
package com.baeldung.kafka;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertNotEquals;
5+
6+
import java.io.IOException;
7+
import java.util.ArrayList;
8+
import java.util.List;
9+
import java.util.Properties;
10+
import java.util.concurrent.ExecutionException;
11+
12+
import org.apache.kafka.clients.consumer.ConsumerConfig;
13+
import org.apache.kafka.clients.consumer.KafkaConsumer;
14+
import org.apache.kafka.clients.producer.KafkaProducer;
15+
import org.apache.kafka.clients.producer.ProducerConfig;
16+
import org.apache.kafka.clients.producer.ProducerRecord;
17+
import org.apache.kafka.clients.producer.RecordMetadata;
18+
import org.apache.kafka.common.TopicPartition;
19+
import org.apache.kafka.common.serialization.StringDeserializer;
20+
import org.apache.kafka.common.serialization.StringSerializer;
21+
import org.junit.jupiter.api.AfterAll;
22+
import org.junit.jupiter.api.BeforeAll;
23+
import org.junit.jupiter.api.MethodOrderer;
24+
import org.junit.jupiter.api.Order;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.api.TestMethodOrder;
27+
import org.testcontainers.containers.KafkaContainer;
28+
import org.testcontainers.junit.jupiter.Container;
29+
import org.testcontainers.junit.jupiter.Testcontainers;
30+
import org.testcontainers.utility.DockerImageName;
31+
32+
@Testcontainers
33+
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
34+
public class KafaProducerConsumerAckOptsLiveTest {
35+
36+
private static final String CONSUMER_GROUP_ID = "ConsumerGroup1";
37+
38+
private static final long INVALID_OFFSET = -1;
39+
private static final String TOPIC = "baeldung-kafka-github";
40+
private static final String MESSAGE_KEY = "message";
41+
private static final String TEST_MESSAGE = "Kafka Test Message";
42+
private static final Integer PARTITION_NUMBER = 3;
43+
44+
static KafkaProducer<String, String> producerack0;
45+
static KafkaProducer<String, String> producerack1;
46+
static KafkaProducer<String, String> producerackAll;
47+
48+
@Container
49+
private static final KafkaContainer KAFKA_CONTAINER = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka"));
50+
51+
@BeforeAll
52+
static void setUp() throws IOException, InterruptedException {
53+
54+
KAFKA_CONTAINER.addExposedPort(9092);
55+
56+
KAFKA_CONTAINER.execInContainer("/bin/sh", "-c", "/usr/bin/kafka-topics " + "--bootstrap-server localhost:9092 " + "--create " +
57+
"--replication-factor 1 " + "--partitions " + PARTITION_NUMBER + " " + "--topic " + TOPIC);
58+
59+
KAFKA_CONTAINER.start();
60+
61+
Properties producerProperties = getProducerProperties();
62+
producerProperties.put(ProducerConfig.ACKS_CONFIG, "0");
63+
64+
producerack0 = new KafkaProducer<>(producerProperties);
65+
66+
Properties producerack1Prop = getProducerProperties();
67+
producerack1Prop.put(ProducerConfig.ACKS_CONFIG, "1");
68+
69+
producerack1 = new KafkaProducer<>(producerack1Prop);
70+
71+
Properties producerackAllProp = getProducerProperties();
72+
producerackAllProp.put(ProducerConfig.ACKS_CONFIG, "all");
73+
74+
producerackAll = new KafkaProducer<>(producerackAllProp);
75+
76+
}
77+
78+
static Properties getProducerProperties() {
79+
Properties producerProperties = new Properties();
80+
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
81+
82+
producerProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
83+
producerProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
84+
producerProperties.put(ProducerConfig.RETRIES_CONFIG, 0);
85+
producerProperties.put(ProducerConfig.BATCH_SIZE_CONFIG, 1000);
86+
87+
producerProperties.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 10000);
88+
producerProperties.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 300);
89+
90+
producerProperties.put(ProducerConfig.LINGER_MS_CONFIG, 10);
91+
92+
producerProperties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
93+
producerProperties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
94+
95+
return producerProperties;
96+
}
97+
98+
static Properties getConsumerProperties() {
99+
Properties consumerProperties = new Properties();
100+
consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_CONTAINER.getBootstrapServers());
101+
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
102+
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
103+
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_ID);
104+
105+
return consumerProperties;
106+
}
107+
108+
@AfterAll
109+
static void destroy() {
110+
KAFKA_CONTAINER.stop();
111+
}
112+
113+
@Test
114+
@Order(1)
115+
void givenProducerAck0_whenProducerSendsRecord_thenDoesNotReturnOffset() throws ExecutionException, InterruptedException {
116+
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, 0, MESSAGE_KEY, TEST_MESSAGE + "_0");
117+
for (int i = 0; i < 50; i++) {
118+
RecordMetadata metadata = producerack0.send(record)
119+
.get();
120+
assertEquals(INVALID_OFFSET, metadata.offset());
121+
}
122+
}
123+
124+
@Test
125+
@Order(2)
126+
void givenProducerAck1_whenProducerSendsRecord_thenReturnsValidOffset() throws ExecutionException, InterruptedException {
127+
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, 1, MESSAGE_KEY, TEST_MESSAGE + "_1");
128+
for (int i = 0; i < 50; i++) {
129+
RecordMetadata metadata = producerack1.send(record)
130+
.get();
131+
assertNotEquals(INVALID_OFFSET, metadata.offset());
132+
}
133+
}
134+
135+
@Test
136+
@Order(3)
137+
void givenProducerAckAll_whenProducerSendsRecord_thenReturnsValidOffset() throws ExecutionException, InterruptedException {
138+
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, 2, MESSAGE_KEY, TEST_MESSAGE + "_ALL");
139+
for (int i = 0; i < 50; i++) {
140+
RecordMetadata metadata = producerackAll.send(record)
141+
.get();
142+
assertNotEquals(INVALID_OFFSET, metadata.offset());
143+
}
144+
}
145+
146+
@Test
147+
@Order(4)
148+
void whenSeekingKafkaResetConfigLatest_thenConsumerOffsetSetToLatestRecordOffset() {
149+
Properties consumerProperties = getConsumerProperties();
150+
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
151+
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
152+
long expectedStartOffset = 50;
153+
long actualStartOffset;
154+
155+
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
156+
TopicPartition partition1 = new TopicPartition(TOPIC, 1);
157+
List<TopicPartition> partitions = new ArrayList<>();
158+
partitions.add(partition1);
159+
consumer.assign(partitions);
160+
actualStartOffset = consumer.position(partition1);
161+
}
162+
163+
assertEquals(expectedStartOffset, actualStartOffset);
164+
}
165+
166+
@Test
167+
@Order(5)
168+
void whenSeekingKafkaResetConfigEarliest_thenConsumerOffsetSetToZero() {
169+
Properties consumerProperties = getConsumerProperties();
170+
consumerProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
171+
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
172+
long expectedStartOffset = 0;
173+
long actualStartOffset;
174+
175+
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) {
176+
TopicPartition partition2 = new TopicPartition(TOPIC, 2);
177+
List<TopicPartition> partitions = new ArrayList<>();
178+
partitions.add(partition2);
179+
consumer.assign(partitions);
180+
actualStartOffset = consumer.position(partition2);
181+
}
182+
183+
assertEquals(expectedStartOffset, actualStartOffset);
184+
}
185+
}

apache-kafka/pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,12 @@
124124

125125
<properties>
126126
<jackson.version>2.15.4</jackson.version> <!-- Upgrading to version >= 2.16 breaks tests -->
127-
<kafka.version>3.4.0</kafka.version>
127+
<kafka.version>3.9.0</kafka.version>
128128
<testcontainers-kafka.version>1.19.3</testcontainers-kafka.version>
129129
<testcontainers-jupiter.version>1.15.3</testcontainers-jupiter.version>
130130
<org.apache.spark.spark-core.version>2.4.8</org.apache.spark.spark-core.version>
131131
<com.datastax.spark.spark-cassandra-connector.version>2.5.2</com.datastax.spark.spark-cassandra-connector.version>
132132
<com.datastax.spark.spark-cassandra-connector-java.version>1.6.0-M1</com.datastax.spark.spark-cassandra-connector-java.version>
133133
</properties>
134134

135-
</project>
135+
</project>

apache-kafka/src/main/java/com/baeldung/kafka/exactlyonce/TransactionalWordCount.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import org.apache.kafka.clients.consumer.ConsumerRecord;
44
import org.apache.kafka.clients.consumer.ConsumerRecords;
5+
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
56
import org.apache.kafka.clients.consumer.KafkaConsumer;
67
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
78
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -63,7 +64,7 @@ public static void main(String[] args) {
6364
offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1));
6465
}
6566

66-
producer.sendOffsetsToTransaction(offsetsToCommit, CONSUMER_GROUP_ID);
67+
producer.sendOffsetsToTransaction(offsetsToCommit, new ConsumerGroupMetadata(CONSUMER_GROUP_ID));
6768
producer.commitTransaction();
6869

6970
}

apache-libraries-2/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
- [Introduction to Apache Beam](https://www.baeldung.com/apache-beam)
77
- [Introduction to Apache Pulsar](https://www.baeldung.com/apache-pulsar)
88
- [Introduction to Apache Curator](https://www.baeldung.com/apache-curator)
9-
- [Introduction to Apache BVal](https://www.baeldung.com/apache-bval)
9+
- [Intro to Apache BVal](https://www.baeldung.com/apache-bval)
1010
- [Building a Microservice with Apache Meecrowave](https://www.baeldung.com/apache-meecrowave)
1111
- [A Quick Guide to Apache Geode](https://www.baeldung.com/apache-geode)
1212
- [Convert Avro File to JSON File in Java](https://www.baeldung.com/java-avro-json)

apache-libraries/pom.xml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@
7878
<artifactId>mesos</artifactId>
7979
<version>${mesos.library.version}</version>
8080
</dependency>
81+
82+
<dependency>
83+
<groupId>net.javacrumbs.json-unit</groupId>
84+
<artifactId>json-unit-assertj</artifactId>
85+
<version>${json-unit-assertj.version}</version>
86+
<scope>test</scope>
87+
</dependency>
8188
</dependencies>
8289

8390
<build>
@@ -118,6 +125,7 @@
118125
<solr.solr-solrj.version>6.4.0</solr.solr-solrj.version>
119126
<apache.avro.version>1.12.0</apache.avro.version>
120127
<mesos.library.version>1.11.0</mesos.library.version>
128+
<json-unit-assertj.version>3.5.0</json-unit-assertj.version>
121129
</properties>
122130

123131
</project>
Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,43 @@
11
package com.baeldung.apache.avro.util;
22

3+
import static java.util.Collections.emptyList;
34

45
import org.apache.avro.Schema;
56
import org.apache.avro.SchemaBuilder;
67

78
public class AvroSchemaBuilder {
89

9-
public Schema createAvroHttpRequestSchema(){
10-
11-
Schema clientIdentifier = SchemaBuilder.record("ClientIdentifier").namespace("com.baeldung.avro.model")
12-
.fields().requiredString("hostName").requiredString("ipAddress").endRecord();
10+
//@formatter:off
11+
static Schema clientIdentifierSchema() {
12+
return SchemaBuilder.record("ClientIdentifier")
13+
.namespace("com.baeldung.avro.model")
14+
.fields()
15+
.requiredString("hostName")
16+
.requiredString("ipAddress")
17+
.endRecord();
18+
}
1319

14-
Schema avroHttpRequest = SchemaBuilder.record("AvroHttpRequest").namespace("com.baeldung.avro.model").fields()
15-
.requiredLong("requestTime")
16-
.name("clientIdentifier").type(clientIdentifier).noDefault()
17-
.name("employeeNames").type().array().items().stringType().arrayDefault(null)
18-
.name("active").type().enumeration("Active").symbols("YES", "NO").noDefault()
19-
.endRecord();
20-
return avroHttpRequest;
20+
static Schema avroHttpRequestSchema() {
21+
return SchemaBuilder.record("AvroHttpRequest")
22+
.namespace("com.baeldung.avro.model").fields()
23+
.requiredLong("requestTime")
24+
.name("clientIdentifier")
25+
.type(clientIdentifierSchema())
26+
.noDefault()
27+
.name("employeeNames")
28+
.type()
29+
.array()
30+
.items()
31+
.stringType()
32+
.arrayDefault(emptyList())
33+
.name("active")
34+
.type()
35+
.enumeration("Active")
36+
.symbols("YES", "NO")
37+
.noDefault()
38+
.endRecord();
2139
}
40+
//@formatter:on
2241
}
2342

2443

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package com.baeldung.apache.avro.util;
2+
3+
import static com.baeldung.apache.avro.util.AvroSchemaBuilder.clientIdentifierSchema;
4+
import static net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson;
5+
6+
import org.apache.avro.Schema;
7+
import org.junit.jupiter.api.Test;
8+
9+
class AvroSchemaBuilderUnitTest {
10+
11+
@Test
12+
void whenCallingSchemaToString_thenReturnJsonAvroSchema() {
13+
Schema clientIdSchema = clientIdentifierSchema();
14+
15+
assertThatJson(clientIdSchema.toString()).isEqualTo("""
16+
{
17+
"type":"record",
18+
"name":"ClientIdentifier",
19+
"namespace":"com.baeldung.avro.model",
20+
"fields":[
21+
{
22+
"name":"hostName",
23+
"type":"string"
24+
},
25+
{
26+
"name":"ipAddress",
27+
"type":"string"
28+
}
29+
]
30+
}
31+
""");
32+
}
33+
34+
}

0 commit comments

Comments
 (0)