Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.Job
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.channels.actor
import java.util.PriorityQueue
import java.util.concurrent.atomic.AtomicLong

/**
* Handles packet broadcasting to connected devices using actor pattern for serialization
Expand Down Expand Up @@ -99,25 +100,90 @@ class BluetoothPacketBroadcaster(
val gattServer: BluetoothGattServer?,
val characteristic: BluetoothGattCharacteristic?
)

// Internal queued item with priority metadata for fair ordering
private data class QueuedBroadcast(
val request: BroadcastRequest,
val primaryPriority: Int,
val secondaryPriority: Int,
val sequence: Long
)

// Actor scope for the broadcaster
private val broadcasterScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val transferJobs = ConcurrentHashMap<String, Job>()

// SERIALIZATION: Actor to serialize all broadcast operations
@OptIn(kotlinx.coroutines.ObsoleteCoroutinesApi::class)
private val broadcasterActor = broadcasterScope.actor<BroadcastRequest>(
capacity = Channel.UNLIMITED
) {
Log.d(TAG, "🎭 Created packet broadcaster actor")
try {
for (request in channel) {
broadcastSinglePacketInternal(request.routed, request.gattServer, request.characteristic)
// Replace simple actor FIFO with a priority-aware channel + processor
private val broadcasterChannel = Channel<BroadcastRequest>(capacity = Channel.UNLIMITED)
private val sequenceCounter = AtomicLong(0)

init {
startPriorityProcessor()
}

private fun startPriorityProcessor() {
broadcasterScope.launch {
Log.d(TAG, "🎭 Created priority packet broadcaster processor")
// Min-heap by (primaryPriority, secondaryPriority, sequence)
val queue = PriorityQueue<QueuedBroadcast>(11) { a, b ->
when {
a.primaryPriority != b.primaryPriority -> a.primaryPriority - b.primaryPriority
a.secondaryPriority != b.secondaryPriority -> a.secondaryPriority - b.secondaryPriority
else -> (a.sequence - b.sequence).coerceIn(Int.MIN_VALUE.toLong(), Int.MAX_VALUE.toLong()).toInt()
}
}

try {
while (isActive) {
// If queue is empty, suspend to receive at least one item
if (queue.isEmpty()) {
val first = broadcasterChannel.receiveCatching().getOrNull() ?: break
queue.offer(computeQueuedBroadcast(first))
}

// Drain any immediately available items without suspending
while (true) {
val received = broadcasterChannel.tryReceive().getOrNull() ?: break
queue.offer(computeQueuedBroadcast(received))
}

// Process one highest-priority item
val next = queue.poll() ?: continue
broadcastSinglePacketInternal(next.request.routed, next.request.gattServer, next.request.characteristic)
}
} catch (e: Exception) {
Log.w(TAG, "Priority processor loop ended: ${'$'}{e.message}")
} finally {
Log.d(TAG, "🎭 Priority packet broadcaster processor terminated")
}
Comment on lines 124 to 171

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Close broadcaster channel when processor fails

The new priority processor swallows any exception and simply exits the loop (startPriorityProcessor), but it never closes broadcasterChannel or cancels the scope. If broadcastSinglePacketInternal ever throws (e.g., due to an unexpected BLE runtime error), the processor terminates and no longer consumes requests, yet broadcastSinglePacket continues to send into the still-open unlimited channel so packets accumulate and are never broadcast. The previous actor implementation propagated the failure, closing the channel and triggering the fallback path. Consider closing the channel or rethrowing so producers fail fast instead of silently dropping all future broadcasts.

Useful? React with 👍 / 👎.

} finally {
Log.d(TAG, "🎭 Packet broadcaster actor terminated")
}
}

private fun computeQueuedBroadcast(req: BroadcastRequest): QueuedBroadcast {
val pkt = req.routed.packet
val type = MessageType.fromValue(pkt.type)

// Primary priority: 0 = normal, 1 = fragments, 2 = file transfer (non-fragment)
val primary = when (type) {
MessageType.FRAGMENT -> 1
MessageType.FILE_TRANSFER -> 2
else -> 0
}

// Secondary priority: for fragments, use total fragment count (smaller = higher priority)
val secondary = if (type == MessageType.FRAGMENT) {
try {
val payload = com.bitchat.android.model.FragmentPayload.decode(pkt.payload)
// If decode fails, deprioritize heavily
payload?.total ?: Int.MAX_VALUE
} catch (_: Exception) {
Int.MAX_VALUE
}
} else 0

val seq = sequenceCounter.getAndIncrement()
return QueuedBroadcast(req, primary, secondary, seq)
}

fun broadcastPacket(
routed: RoutedPacket,
Expand Down Expand Up @@ -267,7 +333,7 @@ class BluetoothPacketBroadcaster(
// Submit broadcast request to actor for serialized processing
broadcasterScope.launch {
try {
broadcasterActor.send(BroadcastRequest(routed, gattServer, characteristic))
broadcasterChannel.send(BroadcastRequest(routed, gattServer, characteristic))
} catch (e: Exception) {
Log.w(TAG, "Failed to send broadcast request to actor: ${e.message}")
// Fallback to direct processing if actor fails
Expand Down Expand Up @@ -427,7 +493,7 @@ class BluetoothPacketBroadcaster(
return buildString {
appendLine("=== Packet Broadcaster Debug Info ===")
appendLine("Broadcaster Scope Active: ${broadcasterScope.isActive}")
appendLine("Actor Channel Closed: ${broadcasterActor.isClosedForSend}")
appendLine("Actor Channel Closed: ${broadcasterChannel.isClosedForSend}")
appendLine("Connection Scope Active: ${connectionScope.isActive}")
}
}
Expand All @@ -439,11 +505,11 @@ class BluetoothPacketBroadcaster(
Log.d(TAG, "Shutting down BluetoothPacketBroadcaster actor")

// Close the actor gracefully
broadcasterActor.close()
broadcasterChannel.close()

// Cancel the broadcaster scope
broadcasterScope.cancel()

Log.d(TAG, "BluetoothPacketBroadcaster shutdown complete")
}
}
}
Loading