Skip to content

Commit

Permalink
[Do not release] Dest S3V2: Extra Logs for Hanging Syncs
Browse files Browse the repository at this point in the history
  • Loading branch information
johnny-schmidt committed Feb 6, 2025
1 parent c288e12 commit 5f7b92f
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import io.micronaut.context.annotation.Factory
import io.micronaut.context.annotation.Value
import jakarta.inject.Named
import jakarta.inject.Singleton
import java.util.concurrent.atomic.AtomicLong
import kotlin.math.min
import kotlinx.coroutines.channels.Channel

Expand Down Expand Up @@ -95,4 +96,10 @@ class SyncBeanFactory {
@Singleton
@Named("openStreamQueue")
class OpenStreamQueue : ChannelMessageQueue<DestinationStream>()

@Singleton
@Named("stateSizeCounter")
fun stateSizeCounter(): AtomicLong {
return AtomicLong(0L)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import io.airbyte.cdk.load.util.use
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.github.oshai.kotlinlogging.KotlinLogging
import io.micronaut.context.annotation.Secondary
import jakarta.inject.Named
import jakarta.inject.Singleton
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue
Expand Down Expand Up @@ -58,6 +59,7 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
abstract val syncManager: SyncManager
abstract val outputConsumer: suspend (T) -> Unit
abstract val timeProvider: TimeProvider
abstract val stateSizeCounter: AtomicLong

data class GlobalCheckpoint<T>(
val streamIndexes: List<Pair<DestinationStream.Descriptor, Long>>,
Expand All @@ -77,6 +79,7 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
index: Long,
checkpointMessage: T
) {
addSize(checkpointMessage)
flushLock.withLock {
if (checkpointsAreGlobal.updateAndGet { it == true } != false) {
throw IllegalStateException(
Expand Down Expand Up @@ -106,6 +109,7 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
keyIndexes: List<Pair<DestinationStream.Descriptor, Long>>,
checkpointMessage: T
) {
addSize(checkpointMessage)
flushLock.withLock {
if (checkpointsAreGlobal.updateAndGet { it != false } != true) {
throw IllegalStateException(
Expand Down Expand Up @@ -260,6 +264,8 @@ abstract class StreamsCheckpointManager<T> : CheckpointManager<DestinationStream
flushReadyCheckpointMessages()
}
}

abstract fun addSize(value: T)
}

@Singleton
Expand All @@ -268,11 +274,20 @@ class DefaultCheckpointManager(
override val catalog: DestinationCatalog,
override val syncManager: SyncManager,
override val outputConsumer: suspend (Reserved<CheckpointMessage>) -> Unit,
override val timeProvider: TimeProvider
override val timeProvider: TimeProvider,
@Named("stateSizeCounter") override val stateSizeCounter: AtomicLong
) : StreamsCheckpointManager<Reserved<CheckpointMessage>>() {
private val log = KotlinLogging.logger {}

init {
lastFlushTimeMs.set(timeProvider.currentTimeMillis())
}

override fun addSize(value: Reserved<CheckpointMessage>) {
log.info {
"Adding state of size ${value.bytesReserved} => ${stateSizeCounter.addAndGet(value.bytesReserved)}"
}
}
}

@SuppressFBWarnings(
Expand All @@ -281,9 +296,15 @@ class DefaultCheckpointManager(
)
@Singleton
@Secondary
class FreeingCheckpointConsumer(private val consumer: Consumer<AirbyteMessage>) :
class FreeingCheckpointConsumer(
private val consumer: Consumer<AirbyteMessage>,
@Named("stateSizeCounter") private val stateSizeCounter: AtomicLong
) :
suspend (Reserved<CheckpointMessage>) -> Unit {
override suspend fun invoke(message: Reserved<CheckpointMessage>) {
private val log = KotlinLogging.logger {}

override suspend fun invoke(message: Reserved<CheckpointMessage>) {
log.info { "Freeing state: new size: ${stateSizeCounter.addAndGet(-message.bytesReserved)}" }
message.use {
val outMessage = it.value.asProtocolMessage()
consumer.accept(outMessage)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import kotlinx.coroutines.sync.withLock

/** Releasable reservation of memory. */
class Reserved<T>(
private val parentManager: ReservationManager? = null,
val parentManager: ReservationManager? = null,
val bytesReserved: Long = 0,
val value: T,
) : CloseableCoroutine {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ class DefaultStreamManager(
log.info {
"Added ${batch.batch.state}->${batch.ranges} (groupId=${batch.batch.groupId}) to ${stream.descriptor.namespace}.${stream.descriptor.name}=>${rangesState[batch.batch.state]}"
}
log.debug {
log.info {
val groupLineMaybe =
if (fromCache != null) {
"\n From group cache: ${fromCache.state}->${fromCache.ranges}"
Expand Down Expand Up @@ -260,11 +260,13 @@ class DefaultStreamManager(
override fun isBatchProcessingComplete(): Boolean {
/* If the stream hasn't been fully read, it can't be done. */
if (!markedEndOfStream.get()) {
log.info { "Not complete by not marked end of stream" }
return false
}

/* A closed empty stream is always complete. */
if (recordCount.get() == 0L) {
log.info { "Complete because closed and no records"}
return true
}

Expand Down

0 comments on commit 5f7b92f

Please sign in to comment.