Skip to content

Commit bc11969

Browse files
committed
Added simple integration test
1 parent 5f7959f commit bc11969

File tree

4 files changed

+258
-8
lines changed

4 files changed

+258
-8
lines changed

topic/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@
4545
<artifactId>junit</artifactId>
4646
<scope>test</scope>
4747
</dependency>
48+
<dependency>
49+
<groupId>tech.ydb.test</groupId>
50+
<artifactId>ydb-junit4-support</artifactId>
51+
</dependency>
4852
<dependency>
4953
<groupId>org.apache.logging.log4j</groupId>
5054
<artifactId>log4j-slf4j-impl</artifactId>

topic/src/test/java/tech/ydb/topic/impl/DisjointOffsetRangeSetTest.java

+14-6
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import org.junit.Assert;
66
import org.junit.Test;
7+
import org.junit.function.ThrowingRunnable;
78

89
import tech.ydb.topic.description.OffsetsRange;
910
import tech.ydb.topic.read.impl.DisjointOffsetRangeSet;
@@ -28,19 +29,26 @@ public void testRangesSimple() {
2829
Assert.assertEquals(4, rangesResult.get(1).getEnd());
2930
}
3031

32+
private void assertException(ThrowingRunnable runnable, String addedRange, String existingRange) {
33+
RuntimeException ex = Assert.assertThrows(RuntimeException.class, runnable);
34+
String message = "Error adding new offset range. Added range " + addedRange +
35+
" clashes with existing range " + existingRange;
36+
Assert.assertEquals(message, ex.getMessage());
37+
}
38+
3139
@Test
3240
public void testReuseRangeSet() {
3341
DisjointOffsetRangeSet ranges = new DisjointOffsetRangeSet();
3442

3543
ranges.add(new OffsetsRangeImpl(30, 40));
3644
ranges.add(new OffsetsRangeImpl(10, 20));
3745
ranges.add(new OffsetsRangeImpl(0, 9));
38-
Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(8, 11)));
39-
Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(8, 10)));
40-
Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(9, 11)));
41-
Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(25, 31)));
42-
Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(31, 100)));
43-
Assert.assertThrows(RuntimeException.class, () -> ranges.add(new OffsetsRangeImpl(25, 100)));
46+
assertException(() -> ranges.add(new OffsetsRangeImpl(8, 11)), "[8,11)", "[0,9)");
47+
assertException(() -> ranges.add(new OffsetsRangeImpl(8, 10)), "[8,10)", "[0,9)");
48+
assertException(() -> ranges.add(new OffsetsRangeImpl(9, 11)), "[9,11)", "[10,20)");
49+
assertException(() -> ranges.add(new OffsetsRangeImpl(25, 31)), "[25,31)", "[30,40)");
50+
assertException(() -> ranges.add(new OffsetsRangeImpl(31, 100)), "[31,100)", "[30,40)");
51+
assertException(() -> ranges.add(new OffsetsRangeImpl(25, 100)), "[25,100)", "[30,40)");
4452
ranges.add(new OffsetsRangeImpl(9, 10));
4553
List<OffsetsRange> firstResult = ranges.getRangesAndClear();
4654
Assert.assertEquals(2, firstResult.size());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
package tech.ydb.topic.impl;
2+
3+
import java.util.List;
4+
import java.util.concurrent.CompletableFuture;
5+
import java.util.concurrent.ExecutionException;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.TimeoutException;
8+
import java.util.concurrent.atomic.AtomicInteger;
9+
10+
import org.junit.AfterClass;
11+
import org.junit.Assert;
12+
import org.junit.BeforeClass;
13+
import org.junit.ClassRule;
14+
import org.junit.FixMethodOrder;
15+
import org.junit.Test;
16+
import org.junit.runners.MethodSorters;
17+
import org.slf4j.Logger;
18+
import org.slf4j.LoggerFactory;
19+
20+
import tech.ydb.core.Status;
21+
import tech.ydb.test.junit4.GrpcTransportRule;
22+
import tech.ydb.topic.TopicClient;
23+
import tech.ydb.topic.description.Consumer;
24+
import tech.ydb.topic.description.TopicDescription;
25+
import tech.ydb.topic.read.AsyncReader;
26+
import tech.ydb.topic.read.DeferredCommitter;
27+
import tech.ydb.topic.read.SyncReader;
28+
import tech.ydb.topic.read.events.AbstractReadEventHandler;
29+
import tech.ydb.topic.read.events.DataReceivedEvent;
30+
import tech.ydb.topic.settings.CreateTopicSettings;
31+
import tech.ydb.topic.settings.ReadEventHandlersSettings;
32+
import tech.ydb.topic.settings.ReaderSettings;
33+
import tech.ydb.topic.settings.TopicReadSettings;
34+
import tech.ydb.topic.settings.WriterSettings;
35+
import tech.ydb.topic.write.Message;
36+
import tech.ydb.topic.write.SyncWriter;
37+
38+
/**
39+
*
40+
* @author Aleksandr Gorshenin
41+
*/
42+
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
43+
public class YdbTopicsIntegrationTest {
44+
private final static Logger logger = LoggerFactory.getLogger(YdbTopicsIntegrationTest.class);
45+
46+
@ClassRule
47+
public final static GrpcTransportRule ydbTransport = new GrpcTransportRule();
48+
49+
private final static String TEST_TOPIC = "integration_test_topic";
50+
private final static String TEST_CONSUMER1 = "consumer";
51+
private final static String TEST_CONSUMER2 = "other_consumer";
52+
53+
private static TopicClient client;
54+
55+
private final static byte[][] TEST_MESSAGES = new byte[][] {
56+
"Test message".getBytes(),
57+
"".getBytes(),
58+
" ".getBytes(),
59+
"Other message".getBytes(),
60+
"Last message".getBytes(),
61+
};
62+
63+
@BeforeClass
64+
public static void initTopic() {
65+
logger.info("Create test topic {} ...", TEST_TOPIC);
66+
67+
client = TopicClient.newClient(ydbTransport).build();
68+
client.createTopic(TEST_TOPIC, CreateTopicSettings.newBuilder()
69+
.addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER1).build())
70+
.addConsumer(Consumer.newBuilder().setName(TEST_CONSUMER2).build())
71+
.build()
72+
).join().expectSuccess("can't create a new topic");
73+
}
74+
75+
@AfterClass
76+
public static void dropTopic() {
77+
logger.info("Drop test topic {} ...", TEST_TOPIC);
78+
Status dropStatus = client.dropTopic(TEST_TOPIC).join();
79+
client.close();
80+
dropStatus.expectSuccess("can't drop test topic");
81+
}
82+
83+
@Test
84+
public void step01_writeWithoutDeduplication() throws InterruptedException, ExecutionException, TimeoutException {
85+
WriterSettings settings = WriterSettings.newBuilder()
86+
.setTopicPath(TEST_TOPIC)
87+
.build();
88+
SyncWriter writer = client.createSyncWriter(settings);
89+
writer.init();
90+
91+
for (int idx = 0; idx < TEST_MESSAGES.length; idx += 1) {
92+
writer.send(Message.newBuilder().setData(TEST_MESSAGES[idx]).build());
93+
}
94+
95+
for (int idx = TEST_MESSAGES.length - 1; idx >= 0; idx -= 1) {
96+
writer.send(Message.newBuilder().setData(TEST_MESSAGES[idx]).build());
97+
}
98+
99+
writer.flush();
100+
writer.shutdown(1, TimeUnit.MINUTES);
101+
}
102+
103+
@Test
104+
public void step02_readHalfWithoutCommit() throws InterruptedException {
105+
ReaderSettings settings = ReaderSettings.newBuilder()
106+
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
107+
.setConsumerName(TEST_CONSUMER1)
108+
.build();
109+
110+
SyncReader reader = client.createSyncReader(settings);
111+
reader.initAndWait();
112+
113+
for (byte[] bytes: TEST_MESSAGES) {
114+
tech.ydb.topic.read.Message msg = reader.receive();
115+
Assert.assertArrayEquals(bytes, msg.getData());
116+
}
117+
118+
reader.shutdown();
119+
}
120+
121+
@Test
122+
public void step03_readHalfWithCommit() throws InterruptedException {
123+
ReaderSettings settings = ReaderSettings.newBuilder()
124+
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
125+
.setConsumerName(TEST_CONSUMER1)
126+
.build();
127+
128+
SyncReader reader = client.createSyncReader(settings);
129+
reader.initAndWait();
130+
131+
for (byte[] bytes: TEST_MESSAGES) {
132+
tech.ydb.topic.read.Message msg = reader.receive();
133+
Assert.assertArrayEquals(bytes, msg.getData());
134+
msg.commit();
135+
}
136+
137+
reader.shutdown();
138+
}
139+
140+
@Test
141+
public void step03_readNextHalfWithoutCommit() throws InterruptedException {
142+
ReaderSettings settings = ReaderSettings.newBuilder()
143+
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
144+
.setConsumerName(TEST_CONSUMER1)
145+
.build();
146+
147+
SyncReader reader = client.createSyncReader(settings);
148+
reader.initAndWait();
149+
150+
for (int idx = TEST_MESSAGES.length - 1; idx >= 0; idx -= 1) {
151+
tech.ydb.topic.read.Message msg = reader.receive();
152+
Assert.assertArrayEquals(TEST_MESSAGES[idx], msg.getData());
153+
}
154+
155+
reader.shutdown();
156+
}
157+
158+
@Test
159+
public void step04_readNextHalfWithCommit() throws InterruptedException {
160+
ReaderSettings settings = ReaderSettings.newBuilder()
161+
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
162+
.setConsumerName(TEST_CONSUMER1)
163+
.build();
164+
165+
SyncReader reader = client.createSyncReader(settings);
166+
reader.initAndWait();
167+
168+
DeferredCommitter committer = DeferredCommitter.newInstance();
169+
for (int idx = TEST_MESSAGES.length - 1; idx >= 0; idx -= 1) {
170+
tech.ydb.topic.read.Message msg = reader.receive();
171+
Assert.assertArrayEquals(TEST_MESSAGES[idx], msg.getData());
172+
committer.add(msg);
173+
}
174+
175+
committer.commit();
176+
reader.shutdown();
177+
}
178+
179+
@Test
180+
public void step05_describeTopic() throws InterruptedException {
181+
TopicDescription description = client.describeTopic(TEST_TOPIC).join().getValue();
182+
183+
Assert.assertNull(description.getTopicStats());
184+
List<Consumer> consumers = description.getConsumers();
185+
Assert.assertEquals(2, consumers.size());
186+
187+
Assert.assertEquals(TEST_CONSUMER1, consumers.get(0).getName());
188+
Assert.assertEquals(TEST_CONSUMER2, consumers.get(1).getName());
189+
}
190+
191+
@Test
192+
public void step06_readAllByAsyncReader() throws InterruptedException {
193+
ReaderSettings settings = ReaderSettings.newBuilder()
194+
.addTopic(TopicReadSettings.newBuilder().setPath(TEST_TOPIC).build())
195+
.setConsumerName(TEST_CONSUMER2)
196+
.build();
197+
198+
final AtomicInteger atomicIdx = new AtomicInteger(0);
199+
final CompletableFuture<Void> wait = new CompletableFuture<>();
200+
final byte[][] results = new byte[TEST_MESSAGES.length * 2][];
201+
202+
AsyncReader reader = client.createAsyncReader(settings, ReadEventHandlersSettings.newBuilder()
203+
.setEventHandler(new AbstractReadEventHandler() {
204+
@Override
205+
public void onMessages(DataReceivedEvent dre) {
206+
for (tech.ydb.topic.read.Message msg: dre.getMessages()) {
207+
int idx = atomicIdx.getAndIncrement();
208+
if (idx < results.length) {
209+
results[idx] = msg.getData();
210+
}
211+
if (idx >= results.length - 1) {
212+
wait.complete(null);
213+
return;
214+
}
215+
}
216+
}
217+
})
218+
.build());
219+
220+
reader.init();
221+
wait.join();
222+
223+
reader.shutdown().join();
224+
225+
for (int idx = 0; idx < TEST_MESSAGES.length; idx += 1) {
226+
Assert.assertArrayEquals(TEST_MESSAGES[idx], results[idx]);
227+
Assert.assertArrayEquals(TEST_MESSAGES[idx], results[results.length - idx - 1]);
228+
}
229+
}
230+
}

topic/src/test/resources/log4j2.xml

+10-2
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,20 @@
77
</Appenders>
88

99
<Loggers>
10-
<Logger name="io.netty" level="warn" additivity="false">
10+
<Logger name="io.grpc" level="warn" additivity="false">
1111
<AppenderRef ref="Console"/>
1212
</Logger>
13-
<Logger name="tech.ydb.core.grpc" level="error" additivity="false">
13+
<Logger name="tech.ydb" level="debug" additivity="false">
1414
<AppenderRef ref="Console"/>
1515
</Logger>
16+
<Logger name="tech.ydb.topic" level="debug" additivity="false">
17+
<AppenderRef ref="Console"/>
18+
</Logger>
19+
20+
<!-- https://www.testcontainers.org/supported_docker_environment/logging_config/ -->
21+
<Logger name="org.testcontainers" level="warn" />
22+
<Logger name="com.github.dockerjava" level="warn"/>
23+
<Logger name="com.github.dockerjava.zerodep.shaded.org.apache.hc.client5.http.wire" level="off"/>
1624

1725
<Root level="debug" >
1826
<AppenderRef ref="Console"/>

0 commit comments

Comments
 (0)