Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add rolling bloom filter, reliability utils and protobuf #4

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

shash256
Copy link
Collaborator

@shash256 shash256 commented Jan 13, 2025

This PR adds the rolling bloom filter to the SDS API, that is built upon bloom.nim. This PR also adds protobuf for message, bloom filter serialization and deserialization, definitions & utility functions for the ReliabilityManager. The core functions and unit tests for ReliabilityManager would be in a subsequent PR.

Would greatly appreciate nim related review and comments from the nwaku team !!

This PR is a part of the work towards nim-sds API deliverable

Copy link

@Ivansete-status Ivansete-status left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for it! Some comments so far :) Ping me if any comment is unclear or you need me to review again

src/protobuf.nim Outdated Show resolved Hide resolved
src/protobuf.nim Outdated Show resolved Hide resolved
src/protobuf.nim Outdated
Comment on lines 109 to 112
var intArray = newSeq[int](bytes.len div sizeof(int))
for i in 0 ..< intArray.len:
let start = i * sizeof(int)
copyMem(addr intArray[i], unsafeAddr bytes[start], sizeof(int))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That looks beautiful but a couple of doubts:

  1. Do we have a Nim module that already does that?
  2. Should we care about the desired endianness?

wdyt @arnetheduck ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out.

  1. For this use-case and from what I saw, this seems to be the efficient built-in option, but curious as well to know if there's anything else
  2. Updated to handle endianness. Does it look good now ?

src/reliability_utils.nim Outdated Show resolved Hide resolved
src/reliability_utils.nim Outdated Show resolved Hide resolved
src/reliability_utils.nim Outdated Show resolved Hide resolved
src/rolling_bloom_filter.nim Outdated Show resolved Hide resolved
Copy link

@jm-clius jm-clius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good! I've requested a change wrt the rolling bloom filter design being based on duration - I think we'll get more predictable behaviour if we simply design it based on a maximum capacity with some variability.

Comment on lines 87 to 90
for msg in rbf.messages:
if msg.timestamp > cutoff:
newMessages.add(msg)
newFilter.insert(msg.id)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the number of messages within the cutoff window is much higher than our capacity? From glancing at https://github.com/waku-org/nim-sds/blob/master/src/bloom.nim I don't think there will be any errors, but our false positive rate design will be way off.

To me it seems unnecessary to bring a time window consideration into the design here at all - we want SDS to work the same for a group communication at any rate. Perhaps therefore the rolling filter can operate on a "min" and "max" number of entries around the configured capacity? E.g. we can allow up to capacity + 20% of messages before triggering a clean, after which we add back the last capacity - 20% of messages? Something like that, but in any case avoiding the highly variable time element.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you said. The time-based approach might introduce unnecessary variability. I've updated the implementation to use capacity-based rolling filter with configurable thresholds for min and max (currently set to 20%)

src/reliability_utils.nim Outdated Show resolved Hide resolved
@jm-clius
Copy link

Side note: I think it's a good idea to mention the deliverable that this forms part of in the PR description. :) waku-org/pm#194

@shash256 shash256 linked an issue Jan 27, 2025 that may be closed by this pull request
9 tasks
Copy link

@jm-clius jm-clius left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Minor comments below, but no need to rerequest approval once addressed. :)

src/message.nim Outdated Show resolved Hide resolved
src/rolling_bloom_filter.nim Outdated Show resolved Hide resolved
Copy link

@Ivansete-status Ivansete-status left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for it! Some more comments :)

src/private/probabilities.nim Outdated Show resolved Hide resolved
src/protobuf.nim Outdated Show resolved Hide resolved
src/protobuf.nim Outdated Show resolved Hide resolved
src/protobuf.nim Outdated Show resolved Hide resolved
src/protobuf.nim Outdated Show resolved Hide resolved
src/rolling_bloom_filter.nim Outdated Show resolved Hide resolved
src/rolling_bloom_filter.nim Outdated Show resolved Hide resolved
Comment on lines 26 to 46
var filterResult: Result[BloomFilter, string]
{.gcsafe.}:
filterResult = initializeBloomFilter(capacity, errorRate)

if filterResult.isOk:
logInfo("Successfully initialized bloom filter")
let targetCapacity = capacity
let minCapacity = (capacity.float * 0.8).int
let maxCapacity = (capacity.float * 1.2).int
return RollingBloomFilter(
filter: filterResult.get(),
capacity: targetCapacity,
minCapacity: minCapacity,
maxCapacity: maxCapacity,
messages: @[]
)
else:
logError("Failed to initialize bloom filter: " & filterResult.error)

except Exception:
logError("Failed to initialize bloom filter: " & getCurrentExceptionMsg())

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var filterResult: Result[BloomFilter, string]
{.gcsafe.}:
filterResult = initializeBloomFilter(capacity, errorRate)
if filterResult.isOk:
logInfo("Successfully initialized bloom filter")
let targetCapacity = capacity
let minCapacity = (capacity.float * 0.8).int
let maxCapacity = (capacity.float * 1.2).int
return RollingBloomFilter(
filter: filterResult.get(),
capacity: targetCapacity,
minCapacity: minCapacity,
maxCapacity: maxCapacity,
messages: @[]
)
else:
logError("Failed to initialize bloom filter: " & filterResult.error)
except Exception:
logError("Failed to initialize bloom filter: " & getCurrentExceptionMsg())
let filter = BloomFilter.init(capacity, errorRate).valueOr:
return err("could not create bloom filter: " & $error)
let targetCapacity = capacity
let minCapacity = (capacity.float * 0.8).int
let maxCapacity = (capacity.float * 1.2).int
info "Successfully initialized bloom filter", targetCapacity, minCapacity, maxCapacity
return RollingBloomFilter(
filter: filter,
capacity: targetCapacity,
minCapacity: minCapacity,
maxCapacity: maxCapacity,
messages: @[]
)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added a minor variation to this to initialize with the default parameters if initialization with given parameters fails

Copy link

@Ivansete-status Ivansete-status left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for it! Super insightful changes 🥳
Just adding some more nitpick comments
Something important is to start using nph . Ping me anytime and I can explain how we do in nwaku.
Besides, I think is interesting to start separating modules and avoid having too generic modules such as reliability_utils.nim, and also I encourage to use private attributes as much as possible.

Thanks again for the outstanding and very enriching work!

rm.incomingBuffer.setLen(0)
rm.messageHistory.setLen(0)
except Exception:
error "Error during cleanup", msg = getCurrentExceptionMsg()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick comment

Suggested change
error "Error during cleanup", msg = getCurrentExceptionMsg()
error "Error during cleanup", error = getCurrentExceptionMsg()

try:
rm.bloomFilter.clean()
except Exception:
error "Failed to clean bloom filter", msg = getCurrentExceptionMsg()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
error "Failed to clean bloom filter", msg = getCurrentExceptionMsg()
error "Failed to clean bloom filter", error = getCurrentExceptionMsg()

Comment on lines +72 to +77
let newFilterResult = initializeBloomFilter(rbf.maxCapacity, rbf.filter.errorRate)
if newFilterResult.isErr:
error "Failed to create new bloom filter", error = newFilterResult.error
return

var newFilter = newFilterResult.get()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let newFilterResult = initializeBloomFilter(rbf.maxCapacity, rbf.filter.errorRate)
if newFilterResult.isErr:
error "Failed to create new bloom filter", error = newFilterResult.error
return
var newFilter = newFilterResult.get()
var newFilter = initializeBloomFilter(rbf.maxCapacity, rbf.filter.errorRate).valueOr:
error "Failed to create new bloom filter", error = $error
return

Comment on lines +15 to +16
UnacknowledgedMessage* = object
message*: Message

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other possible option

Suggested change
UnacknowledgedMessage* = object
message*: Message
UnacknowledgedMessage* = object of Message

lamportTimestamp*: int64
causalHistory*: seq[MessageID]
channelId*: ChannelID
content*: seq[byte]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the expected content and how are we going to link the WakuMessage with that Message type?

Comment on lines +4 to +7
MessageID* = seq[byte]
ChannelID* = seq[byte]

Message* = object

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to use a more explicit types' names to rapidly identify the object's purposes

Suggested change
MessageID* = seq[byte]
ChannelID* = seq[byte]
Message* = object
SdsMessageID* = seq[byte]
SdsChannelID* = seq[byte]
SdsMessage* = object


proc cleanup*(rm: ReliabilityManager) {.raises: [].} =
if not rm.isNil():
{.gcsafe.}:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Out of curiosity, why this is needed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create a nim library for SDS implementation from the API specification
3 participants