Skip to content

Conversation

@callebtc
Copy link
Collaborator

Description

Checklist

@callebtc callebtc changed the title Prioritize messages over fragments and file transfers Message queue: prioritize messages over fragments and file transfers Oct 20, 2025
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines 124 to 158
private fun startPriorityProcessor() {
broadcasterScope.launch {
Log.d(TAG, "🎭 Created priority packet broadcaster processor")
// Min-heap by (primaryPriority, secondaryPriority, sequence)
val queue = PriorityQueue<QueuedBroadcast>(11) { a, b ->
when {
a.primaryPriority != b.primaryPriority -> a.primaryPriority - b.primaryPriority
a.secondaryPriority != b.secondaryPriority -> a.secondaryPriority - b.secondaryPriority
else -> (a.sequence - b.sequence).coerceIn(Int.MIN_VALUE.toLong(), Int.MAX_VALUE.toLong()).toInt()
}
}

try {
while (isActive) {
// If queue is empty, suspend to receive at least one item
if (queue.isEmpty()) {
val first = broadcasterChannel.receiveCatching().getOrNull() ?: break
queue.offer(computeQueuedBroadcast(first))
}

// Drain any immediately available items without suspending
while (true) {
val received = broadcasterChannel.tryReceive().getOrNull() ?: break
queue.offer(computeQueuedBroadcast(received))
}

// Process one highest-priority item
val next = queue.poll() ?: continue
broadcastSinglePacketInternal(next.request.routed, next.request.gattServer, next.request.characteristic)
}
} catch (e: Exception) {
Log.w(TAG, "Priority processor loop ended: ${'$'}{e.message}")
} finally {
Log.d(TAG, "🎭 Priority packet broadcaster processor terminated")
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Close broadcaster channel when processor fails

The new priority processor swallows any exception and simply exits the loop (startPriorityProcessor), but it never closes broadcasterChannel or cancels the scope. If broadcastSinglePacketInternal ever throws (e.g., due to an unexpected BLE runtime error), the processor terminates and no longer consumes requests, yet broadcastSinglePacket continues to send into the still-open unlimited channel so packets accumulate and are never broadcast. The previous actor implementation propagated the failure, closing the channel and triggering the fallback path. Consider closing the channel or rethrowing so producers fail fast instead of silently dropping all future broadcasts.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants