Skip to content

Commit adcf870

Browse files
hylkevdsandsel
andauthored
Add test for segment persistent queues (#728)
Added tests for SegmentPersistentQueues a corruption and problems Co-authored-by: Andrea Selva <[email protected]>
1 parent 99f13c6 commit adcf870

File tree

1 file changed

+266
-0
lines changed

1 file changed

+266
-0
lines changed
Lines changed: 266 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,266 @@
1+
/*
2+
* Copyright (c) 2012-2023 The original author or authors
3+
* ------------------------------------------------------
4+
* All rights reserved. This program and the accompanying materials
5+
* are made available under the terms of the Eclipse Public License v1.0
6+
* and Apache License v2.0 which accompanies this distribution.
7+
*
8+
* The Eclipse Public License is available at
9+
* http://www.eclipse.org/legal/epl-v10.html
10+
*
11+
* The Apache License v2.0 is available at
12+
* http://www.opensource.org/licenses/apache2.0.php
13+
*
14+
* You may elect to redistribute this code under either of these licenses.
15+
*/
16+
package io.moquette.persistence;
17+
18+
import io.moquette.broker.SessionMessageQueue;
19+
import io.moquette.broker.SessionRegistry;
20+
import io.moquette.broker.SessionRegistry.EnqueuedMessage;
21+
import io.moquette.broker.SessionRegistry.PublishedMessage;
22+
import io.moquette.broker.subscriptions.Topic;
23+
import io.moquette.broker.unsafequeues.QueueException;
24+
import io.netty.buffer.ByteBuf;
25+
import io.netty.buffer.Unpooled;
26+
import io.netty.handler.codec.mqtt.MqttQoS;
27+
import java.io.IOException;
28+
import java.nio.charset.StandardCharsets;
29+
import static java.nio.charset.StandardCharsets.UTF_8;
30+
import java.nio.file.Path;
31+
import java.util.ArrayList;
32+
import java.util.List;
33+
import org.junit.jupiter.api.AfterAll;
34+
import org.junit.jupiter.api.AfterEach;
35+
import org.junit.jupiter.api.Assertions;
36+
import static org.junit.jupiter.api.Assertions.*;
37+
import org.junit.jupiter.api.BeforeAll;
38+
import org.junit.jupiter.api.Test;
39+
import org.junit.jupiter.api.io.TempDir;
40+
import org.slf4j.Logger;
41+
import org.slf4j.LoggerFactory;
42+
43+
public class SegmentPersistentQueueTest {
44+
45+
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPersistentQueueTest.class.getName());
46+
47+
private static final int PAGE_SIZE = 5000;
48+
private static final int SEGMENT_SIZE = 1000;
49+
private static SegmentQueueRepository queueRepository;
50+
List<SessionMessageQueue<EnqueuedMessage>> queues = new ArrayList<>();
51+
52+
private int queueIndex = 0;
53+
private static final String QUEUE_NAME = "test";
54+
55+
@TempDir
56+
static Path tempQueueFolder;
57+
58+
@BeforeAll
59+
public static void beforeAll() throws IOException, QueueException {
60+
System.setProperty("moquette.queue.debug", "false");
61+
queueRepository = new SegmentQueueRepository(tempQueueFolder.toFile().getAbsolutePath(), PAGE_SIZE, SEGMENT_SIZE);
62+
}
63+
64+
@AfterEach
65+
public void tearDown() {
66+
for (SessionMessageQueue<EnqueuedMessage> queue : queues) {
67+
queue.closeAndPurge();
68+
}
69+
queues.clear();
70+
}
71+
72+
@AfterAll
73+
public static void afterAll() {
74+
queueRepository.close();
75+
76+
}
77+
78+
private SessionMessageQueue<EnqueuedMessage> createQueue() {
79+
SessionMessageQueue<EnqueuedMessage> queue = queueRepository.getOrCreateQueue(QUEUE_NAME + (queueIndex++));
80+
queues.add(queue);
81+
return queue;
82+
}
83+
84+
private static void createAndAddToQueue(SessionMessageQueue<EnqueuedMessage> queue, String topic, int totalSize) {
85+
final PublishedMessage msg1 = createMessage(topic, totalSize);
86+
msg1.retain();
87+
queue.enqueue(msg1);
88+
msg1.release();
89+
}
90+
91+
private void createAndAddToQueues(String topic, int totalSize) {
92+
final PublishedMessage msg = createMessage(topic, totalSize);
93+
for (SessionMessageQueue<EnqueuedMessage> queue : queues) {
94+
msg.retain();
95+
queue.enqueue(msg);
96+
}
97+
msg.release();
98+
}
99+
100+
private void assertAllEmpty(String message) {
101+
for (SessionMessageQueue<EnqueuedMessage> queue : queues) {
102+
assertTrue(queue.isEmpty(), message);
103+
}
104+
}
105+
106+
private void assertAllNonEmpty(String message) {
107+
for (SessionMessageQueue<EnqueuedMessage> queue : queues) {
108+
assertFalse(queue.isEmpty(), message);
109+
}
110+
}
111+
112+
private void dequeueFromAll(String expected) {
113+
for (SessionMessageQueue<EnqueuedMessage> queue : queues) {
114+
final SessionRegistry.PublishedMessage mesg = (PublishedMessage) queue.dequeue();
115+
assertEquals(expected, mesg.getTopic().toString());
116+
}
117+
}
118+
119+
@Test
120+
public void testAdd() {
121+
LOGGER.info("testAdd");
122+
SessionMessageQueue<EnqueuedMessage> queue = queueRepository.getOrCreateQueue("testAdd");
123+
queues.add(queue);
124+
assertTrue(queue.isEmpty(), "Queue must start empty.");
125+
createAndAddToQueue(queue, "Hello", 100);
126+
assertFalse(queue.isEmpty(), "Queue must not be empty after adding.");
127+
createAndAddToQueue(queue, "world", 100);
128+
assertFalse(queue.isEmpty(), "Queue must not be empty after adding.");
129+
130+
assertEquals("Hello", ((PublishedMessage) queue.dequeue()).getTopic().toString());
131+
assertEquals("world", ((PublishedMessage) queue.dequeue()).getTopic().toString());
132+
assertAllEmpty("After dequeueing all, queue must be empty");
133+
134+
createAndAddToQueue(queue, "Hello", 100);
135+
assertFalse(queue.isEmpty(), "Queue must not be empty after adding.");
136+
assertEquals("Hello", ((PublishedMessage) queue.dequeue()).getTopic().toString());
137+
assertAllEmpty("After dequeueing all, queue must be empty");
138+
}
139+
140+
@Test
141+
public void testAdd2() {
142+
LOGGER.info("testAdd2");
143+
testAddX(2);
144+
}
145+
146+
@Test
147+
public void testAdd10() {
148+
LOGGER.info("testAdd10");
149+
testAddX(10);
150+
}
151+
152+
public void testAddX(int x) {
153+
for (int i = 0; i < x; i++) {
154+
createQueue();
155+
}
156+
assertAllEmpty("Queue must start empty.");
157+
158+
createAndAddToQueues("Hello", 100);
159+
assertAllNonEmpty("Queue must not be empty after adding.");
160+
161+
createAndAddToQueues("world", 100);
162+
assertAllNonEmpty("Queue must not be empty after adding.");
163+
164+
dequeueFromAll("Hello");
165+
assertAllNonEmpty("After dequeueing one, queue must not be empty");
166+
167+
createAndAddToQueues("crazy", 100);
168+
assertAllNonEmpty("Queue must not be empty after adding.");
169+
170+
dequeueFromAll("world");
171+
assertAllNonEmpty("Queue must not be empty after adding.");
172+
173+
dequeueFromAll("crazy");
174+
assertAllEmpty("After dequeueing all, queue must be empty");
175+
}
176+
private static String body;
177+
178+
private static String getBody(int bodySize) {
179+
if (body == null || body.length() != bodySize) {
180+
char a = 'A';
181+
char z = 'Z';
182+
char curChar = a;
183+
StringBuilder bodyString = new StringBuilder();
184+
for (int i = 0; i < 104 + 81; i++) {
185+
bodyString.append(curChar);
186+
if (curChar == z) {
187+
curChar = a;
188+
} else {
189+
curChar++;
190+
}
191+
}
192+
body = bodyString.toString();
193+
}
194+
return body;
195+
}
196+
197+
private static void checkMessage(PublishedMessage message, String expTopic) {
198+
assertEquals(expTopic, message.getTopic().toString());
199+
final String receivedBody = message.getPayload().toString(UTF_8);
200+
final String expectedBody = getBody(receivedBody.length());
201+
assertEquals(expectedBody, receivedBody);
202+
}
203+
204+
private static PublishedMessage createMessage(String topic, int totalMessageSize) {
205+
// 4 totalSize + 1 msgType + 1 qos + 4 topicSize + 4 bodySize = 14
206+
int bodySize = totalMessageSize - 14 - topic.getBytes(UTF_8).length;
207+
final ByteBuf payload = Unpooled.wrappedBuffer(getBody(bodySize).getBytes(StandardCharsets.UTF_8));
208+
return new PublishedMessage(Topic.asTopic(topic), MqttQoS.AT_LEAST_ONCE, payload, false);
209+
}
210+
211+
@Test
212+
public void testPerformance() {
213+
LOGGER.info("testPerformance");
214+
SessionMessageQueue<EnqueuedMessage> queue = queueRepository.getOrCreateQueue("testPerformance");
215+
queues.add(queue);
216+
final String topic = "Hello";
217+
final int numIterations = 10_000;
218+
final int perIteration = 3;
219+
// With a total (in-queue) message size of 201, and a segment size of 1000
220+
// we can be sure we hit all corner cases.
221+
final int totalMessageSize = 201;
222+
int countPush = 0;
223+
int countPull = 0;
224+
int j = 0;
225+
final String message = "Queue should have contained " + perIteration + " items";
226+
try {
227+
for (int i = 0; i < numIterations; i++) {
228+
for (j = 0; j < perIteration; j++) {
229+
countPush++;
230+
LOGGER.debug("push {}, {}", countPush, j);
231+
createAndAddToQueue(queue, topic, totalMessageSize);
232+
}
233+
j = 0;
234+
while (!queue.isEmpty()) {
235+
countPull++;
236+
j++;
237+
LOGGER.debug("pull {}, {}", countPull, j);
238+
final PublishedMessage msg = (PublishedMessage) queue.dequeue();
239+
checkMessage(msg, topic);
240+
}
241+
assertEquals(perIteration, j, message);
242+
}
243+
} catch (Exception ex) {
244+
LOGGER.error("", ex);
245+
Assertions.fail("Failed on push count " + countPush + ", pull count " + countPull + ", j " + j, ex);
246+
}
247+
assertTrue(queue.isEmpty(), "should be empty");
248+
}
249+
250+
@Test
251+
public void testReloadFromPersistedState() {
252+
LOGGER.info("testReloadFromPersistedState");
253+
SessionMessageQueue<EnqueuedMessage> queue = queueRepository.getOrCreateQueue("testReloadFromPersistedState");
254+
queues.add(queue);
255+
createAndAddToQueue(queue, "Hello", 100);
256+
createAndAddToQueue(queue, "crazy", 100);
257+
createAndAddToQueue(queue, "world", 100);
258+
assertEquals("Hello", ((PublishedMessage) queue.dequeue()).getTopic().toString());
259+
260+
queue = queueRepository.getOrCreateQueue("testReloadFromPersistedState");
261+
262+
assertEquals("crazy", ((PublishedMessage) queue.dequeue()).getTopic().toString());
263+
assertEquals("world", ((PublishedMessage) queue.dequeue()).getTopic().toString());
264+
assertTrue(queue.isEmpty(), "should be empty");
265+
}
266+
}

0 commit comments

Comments
 (0)