diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt index 7ced4f3d7493b..25007479f0716 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/config/SyncBeanFactory.kt @@ -18,6 +18,7 @@ import io.micronaut.context.annotation.Factory import io.micronaut.context.annotation.Value import jakarta.inject.Named import jakarta.inject.Singleton +import java.util.concurrent.atomic.AtomicLong import kotlin.math.min import kotlinx.coroutines.channels.Channel @@ -95,4 +96,10 @@ class SyncBeanFactory { @Singleton @Named("openStreamQueue") class OpenStreamQueue : ChannelMessageQueue() + + @Singleton + @Named("stateSizeCounter") + fun stateSizeCounter(): AtomicLong { + return AtomicLong(0L) + } } diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/CheckpointManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/CheckpointManager.kt index 6085392bd5062..00b394f2de484 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/CheckpointManager.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/CheckpointManager.kt @@ -13,6 +13,7 @@ import io.airbyte.cdk.load.util.use import io.airbyte.protocol.models.v0.AirbyteMessage import io.github.oshai.kotlinlogging.KotlinLogging import io.micronaut.context.annotation.Secondary +import jakarta.inject.Named import jakarta.inject.Singleton import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentLinkedQueue @@ -58,6 +59,7 @@ abstract class StreamsCheckpointManager : CheckpointManager Unit abstract val timeProvider: TimeProvider + abstract val stateSizeCounter: AtomicLong data class GlobalCheckpoint( val streamIndexes: List>, @@ -77,6 +79,7 @@ abstract class StreamsCheckpointManager : CheckpointManager : CheckpointManager>, checkpointMessage: T ) { + addSize(checkpointMessage) flushLock.withLock { if (checkpointsAreGlobal.updateAndGet { it != false } != true) { throw IllegalStateException( @@ -260,6 +264,8 @@ abstract class StreamsCheckpointManager : CheckpointManager) -> Unit, - override val timeProvider: TimeProvider + override val timeProvider: TimeProvider, + @Named("stateSizeCounter") override val stateSizeCounter: AtomicLong ) : StreamsCheckpointManager>() { + private val log = KotlinLogging.logger {} + init { lastFlushTimeMs.set(timeProvider.currentTimeMillis()) } + + override fun addSize(value: Reserved) { + log.info { + "Adding state of size ${value.bytesReserved} => ${stateSizeCounter.addAndGet(value.bytesReserved)}" + } + } } @SuppressFBWarnings( @@ -281,9 +296,15 @@ class DefaultCheckpointManager( ) @Singleton @Secondary -class FreeingCheckpointConsumer(private val consumer: Consumer) : +class FreeingCheckpointConsumer( + private val consumer: Consumer, + @Named("stateSizeCounter") private val stateSizeCounter: AtomicLong +) : suspend (Reserved) -> Unit { - override suspend fun invoke(message: Reserved) { + private val log = KotlinLogging.logger {} + + override suspend fun invoke(message: Reserved) { + log.info { "Freeing state: new size: ${stateSizeCounter.addAndGet(-message.bytesReserved)}" } message.use { val outMessage = it.value.asProtocolMessage() consumer.accept(outMessage) diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/ReservationManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/ReservationManager.kt index 354dfadd123dd..b10f21f83a60a 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/ReservationManager.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/ReservationManager.kt @@ -14,7 +14,7 @@ import kotlinx.coroutines.sync.withLock /** Releasable reservation of memory. */ class Reserved( - private val parentManager: ReservationManager? = null, + val parentManager: ReservationManager? = null, val bytesReserved: Long = 0, val value: T, ) : CloseableCoroutine { diff --git a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt index b8a71842a6764..e6560fbc0c39f 100644 --- a/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt +++ b/airbyte-cdk/bulk/core/load/src/main/kotlin/io/airbyte/cdk/load/state/StreamManager.kt @@ -188,7 +188,7 @@ class DefaultStreamManager( log.info { "Added ${batch.batch.state}->${batch.ranges} (groupId=${batch.batch.groupId}) to ${stream.descriptor.namespace}.${stream.descriptor.name}=>${rangesState[batch.batch.state]}" } - log.debug { + log.info { val groupLineMaybe = if (fromCache != null) { "\n From group cache: ${fromCache.state}->${fromCache.ranges}" @@ -260,11 +260,13 @@ class DefaultStreamManager( override fun isBatchProcessingComplete(): Boolean { /* If the stream hasn't been fully read, it can't be done. */ if (!markedEndOfStream.get()) { + log.info { "Not complete by not marked end of stream" } return false } /* A closed empty stream is always complete. */ if (recordCount.get() == 0L) { + log.info { "Complete because closed and no records"} return true }