Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.Job
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.channels.actor
import java.util.PriorityQueue
import java.util.concurrent.atomic.AtomicLong

/**
* Handles packet broadcasting to connected devices using actor pattern for serialization
Expand Down Expand Up @@ -99,25 +100,103 @@ class BluetoothPacketBroadcaster(
val gattServer: BluetoothGattServer?,
val characteristic: BluetoothGattCharacteristic?
)

// Internal queued item with priority metadata for fair ordering
private data class QueuedBroadcast(
val request: BroadcastRequest,
val primaryPriority: Int,
val secondaryPriority: Int,
val sequence: Long
)

// Actor scope for the broadcaster
private val broadcasterScope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val transferJobs = ConcurrentHashMap<String, Job>()

// SERIALIZATION: Actor to serialize all broadcast operations
@OptIn(kotlinx.coroutines.ObsoleteCoroutinesApi::class)
private val broadcasterActor = broadcasterScope.actor<BroadcastRequest>(
capacity = Channel.UNLIMITED
) {
Log.d(TAG, "🎭 Created packet broadcaster actor")
try {
for (request in channel) {
broadcastSinglePacketInternal(request.routed, request.gattServer, request.characteristic)
// Replace simple actor FIFO with a priority-aware channel + processor
private val broadcasterChannel = Channel<BroadcastRequest>(capacity = Channel.UNLIMITED)
private val sequenceCounter = AtomicLong(0)

init {
startPriorityProcessor()
}

private fun startPriorityProcessor() {
broadcasterScope.launch {
Log.d(TAG, "🎭 Priority packet broadcaster processor started")

// Min-heap by (primaryPriority, secondaryPriority, sequence)
val priorityQueue = PriorityQueue<QueuedBroadcast>(11) { a, b ->
when {
a.primaryPriority != b.primaryPriority -> a.primaryPriority - b.primaryPriority
a.secondaryPriority != b.secondaryPriority -> a.secondaryPriority - b.secondaryPriority
else -> (a.sequence - b.sequence).coerceIn(Int.MIN_VALUE.toLong(), Int.MAX_VALUE.toLong()).toInt()
}
}

var processorException: Exception? = null
try {
while (isActive) {
// If priority queue is empty, suspend to receive at least one item
if (priorityQueue.isEmpty()) {
val first = broadcasterChannel.receiveCatching().getOrNull() ?: break
priorityQueue.offer(computeQueuedBroadcast(first))
}

// Drain any immediately available items without suspending
while (true) {
val received = broadcasterChannel.tryReceive().getOrNull() ?: break
priorityQueue.offer(computeQueuedBroadcast(received))
}

// Process one highest-priority item
val next = priorityQueue.poll() ?: continue
try {
broadcastSinglePacketInternal(next.request.routed, next.request.gattServer, next.request.characteristic)
} catch (e: Exception) {
// BLE error during broadcast - log and save for channel closure
Log.e(TAG, "❌ Broadcast failed: ${e.message}", e)
processorException = e
throw e // Re-throw to exit loop and close channel
}
}
} catch (e: Exception) {
Log.e(TAG, "❌ Priority processor loop terminated: ${e.message}", e)
processorException = e
} finally {
// CRITICAL: Close channel so producers fail fast instead of accumulating packets
// This triggers the fallback path in broadcastSinglePacket()
broadcasterChannel.close(processorException)
Log.d(TAG, "🎭 Priority packet broadcaster processor terminated, channel closed")
}
} finally {
Log.d(TAG, "🎭 Packet broadcaster actor terminated")
}
}

private fun computeQueuedBroadcast(req: BroadcastRequest): QueuedBroadcast {
val pkt = req.routed.packet
val type = MessageType.fromValue(pkt.type)

// Primary priority: 0 = normal, 1 = fragments, 2 = file transfer (non-fragment)
val primary = when (type) {
MessageType.FRAGMENT -> 1
MessageType.FILE_TRANSFER -> 2
else -> 0
}

// Secondary priority: for fragments, use total fragment count (smaller = higher priority)
val secondary = if (type == MessageType.FRAGMENT) {
try {
val payload = com.bitchat.android.model.FragmentPayload.decode(pkt.payload)
// If decode fails, deprioritize heavily
payload?.total ?: Int.MAX_VALUE
} catch (_: Exception) {
Int.MAX_VALUE
}
} else 0

val seq = sequenceCounter.getAndIncrement()
return QueuedBroadcast(req, primary, secondary, seq)
}

fun broadcastPacket(
routed: RoutedPacket,
Expand Down Expand Up @@ -267,7 +346,7 @@ class BluetoothPacketBroadcaster(
// Submit broadcast request to actor for serialized processing
broadcasterScope.launch {
try {
broadcasterActor.send(BroadcastRequest(routed, gattServer, characteristic))
broadcasterChannel.send(BroadcastRequest(routed, gattServer, characteristic))
} catch (e: Exception) {
Log.w(TAG, "Failed to send broadcast request to actor: ${e.message}")
// Fallback to direct processing if actor fails
Expand Down Expand Up @@ -427,7 +506,7 @@ class BluetoothPacketBroadcaster(
return buildString {
appendLine("=== Packet Broadcaster Debug Info ===")
appendLine("Broadcaster Scope Active: ${broadcasterScope.isActive}")
appendLine("Actor Channel Closed: ${broadcasterActor.isClosedForSend}")
appendLine("Actor Channel Closed: ${broadcasterChannel.isClosedForSend}")
appendLine("Connection Scope Active: ${connectionScope.isActive}")
}
}
Expand All @@ -439,11 +518,11 @@ class BluetoothPacketBroadcaster(
Log.d(TAG, "Shutting down BluetoothPacketBroadcaster actor")

// Close the actor gracefully
broadcasterActor.close()
broadcasterChannel.close()

// Cancel the broadcaster scope
broadcasterScope.cancel()

Log.d(TAG, "BluetoothPacketBroadcaster shutdown complete")
}
}
}
Loading