-
Notifications
You must be signed in to change notification settings - Fork 822
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use a copy of the buffer when reading for storing. #716
Conversation
I've added a test file for the SegmentPersistentQueues, that reliably fails in a single-threaded test after 2621450 messages:
@andsel does that help you in tracking down the problem? |
I'll check this in next days |
After debugging, with messages of 81 bytes (so they take exactly 100 in the queue) and segments of 500 bytes, it seems things go wrong when the queue is completely consumed with a message ending exactly at the end of a segment. Because of the checks on Queue.java#L264 the segment is not released, even though it is empty, to ensure we don't end up with a queue with 0 segments. However, the next Read action does not go to the next segment, but instead starts over at the start of the segment it just completed. This is (of course) wrong, since it will just read random bytes, and anything can happen from there on. I think the cleaner solution is to allow a queue to have 0 segments, and open a segment at the first write if this is the case... |
A quick hack to allow a queue to have 0 segments seems to work, but may cause issues when loading the queues from disk. But the next issue popped up. Things also go awry when reading a message where only the 4 header bytes fit in the segment, and the rest is in the next segment. In that case the segment.bytesAfter(scan) returns the full segment size, instead of 0 (after reading the header, the segment is empty) The problem is caused by:
When the logicalOffset hits the boundary (i.e. the Segment has 0 space left) then the returned SegmentOffset becomes 0, and not Segment.SIZE. |
I managed to fix the last corner-case too. I've made the page size and segment size configurable. This makes it easier to set up specific scenarios for testing. |
Other threads may be reading the same buffer, so we must ensure we have our own reading position and mark.
ByteBuffer.put(int index, char value) writes at index from the start of the buffer, not at index from the mark. Thus setting the position does nothing.
19a1d40
to
da88b35
Compare
The segmented queues now seem to work correctly. Next up is the issue of the |
Hi @hylkevds I've a question.
81 bytes is the payload to enqueue? How that can take 100 bytes in queue. I mean that if a payload message takes 100 bytes in the queue, considering 4 bytes for length header, it means that the available payload size is 96, so I don't understand where the 81 comes from. I think that we should split this PR in 2 smaller PRs:
|
Yeah, that was not formulated the best way :)
Yes, I agree. |
The last problem was in the Segment implementation. The The quick solution is to not use
I'm wondering if it isn't better to give each Segment a I'm also wondering about the efficiency of I'll rebuild this PR into two PRs, one for the Queue part, and one for the Session part. |
The purge method can be called from both the disconnect message handler and from the connectionLost handler. If the disconnect happened normally then the connectionLost handler doesn't need to purge any more.
There may be multiple queue pools, a static callback breaks this.
Thank @hylkevds for all of this and to push forward on it. |
Other threads may be reading the same buffer, so we must ensure we have our own reading position and mark.
An alternative solution would be to have PostOffice.publish2Subscribers() somehow create a duplicate for each thread.