Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More logging around rebalancing when rebalanceSafeCommits is true #1360

Merged
merged 16 commits into from
Nov 10, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,8 @@ object ConsumerSpec extends ZIOSpecDefaultSlf4j with KafkaRandom {
clientId = clientId,
groupId = Some(groupId),
`max.poll.records` = 1,
rebalanceSafeCommits = rebalanceSafeCommits
rebalanceSafeCommits = rebalanceSafeCommits,
maxRebalanceDuration = 60.seconds
)
consumer <- Consumer.make(settings)
} yield consumer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,34 +125,70 @@ private[consumer] final class Runloop private (
ZIO.attempt(consumer.commitAsync(java.util.Collections.emptyMap(), null)).orDie
}

def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): Task[Boolean] =
sealed trait EndOffsetCommitStatus
case object EndOffsetNotCommitted extends EndOffsetCommitStatus
case object EndOffsetCommitPending extends EndOffsetCommitStatus
case object EndOffsetCommitted extends EndOffsetCommitStatus

final case class StreamCompletionStatus(
tp: TopicPartition,
isDone: Boolean,
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
lastPulledOffset: Option[Long],
endOffsetCommitStatus: EndOffsetCommitStatus
) {
override def toString: String =
s"${tp}: isDone=${isDone}, lastPulledOffset=${lastPulledOffset.getOrElse("none")}, endOffsetCommitStatus: ${endOffsetCommitStatus}"
}

def getStreamCompletionStatuses(newCommits: Chunk[Commit]): ZIO[Any, Nothing, Chunk[StreamCompletionStatus]] =
for {
committedOffsets <- committedOffsetsRef.get
allPendingCommitOffsets = (previousPendingCommits ++ commitsOfEndingStreams(newCommits)).flatMap(_.offsets)
streamResults <-
ZIO.foreach(streamsToEnd) { stream =>
for {
isDone <- stream.completedPromise.isDone
lastPulledOffset <- stream.lastPulledOffset
endOffset <- if (isDone) stream.completedPromise.await else ZIO.none
} yield (isDone || lastPulledOffset.isEmpty, endOffset)
}
committedOffsets <- committedOffsetsRef.get
} yield {
val allStreamsCompleted = streamResults.forall(_._1)
allStreamsCompleted && {
val endOffsets: Chunk[Offset] = streamResults.flatMap(_._2)
val allPendingCommits = previousPendingCommits ++ commitsOfEndingStreams(newCommits)
endOffsets.forall { endOffset =>
val tp = endOffset.topicPartition
val offset = endOffset.offset
def endOffsetWasCommitted = committedOffsets.contains(tp, offset)
def endOffsetCommitIsPending = allPendingCommits.exists { pendingCommit =>
pendingCommit.offsets.get(tp).exists { pendingOffset =>
pendingOffset.offset() >= offset
}
}
endOffsetWasCommitted || endOffsetCommitIsPending

endOffsetCommitStatus = endOffset match {
case Some(endOffset)
if committedOffsets.contains(stream.tp, endOffset.offset) =>
EndOffsetCommitted
case Some(endOffset) if allPendingCommitOffsets.exists { case (tp, offset) =>
tp == stream.tp && offset.offset() >= endOffset.offset
erikvanoosten marked this conversation as resolved.
Show resolved Hide resolved
} =>
EndOffsetCommitPending
case _ => EndOffsetNotCommitted
}
} yield StreamCompletionStatus(stream.tp, isDone, lastPulledOffset.map(_.offset), endOffsetCommitStatus)
}
}
} yield streamResults

def logInitialStreamCompletionStatuses: ZIO[Any, Nothing, Unit] =
getStreamCompletionStatuses(newCommits = Chunk.empty).flatMap { completionStatuses =>
val statusStrings = completionStatuses.map(_.toString)
ZIO.logInfo(s"Waiting for ${streamsToEnd.size} streams to end: ${statusStrings.mkString("; ")}")
}

def logFinalStreamCompletionStatuses(completed: Boolean, newCommits: Chunk[Commit]): ZIO[Any, Nothing, Unit] =
getStreamCompletionStatuses(newCommits).flatMap { completionStatuses =>
ZIO
.logWarning(
s"Exceeded deadline waiting for streams to commit the offsets of the records they consumed; the rebalance will continue. This might cause another consumer to process some records again. ${completionStatuses.map(_.toString).mkString("; ")}"
)
}
.unless(completed)
.unit

def endingStreamsCompletedAndCommitsExist(newCommits: Chunk[Commit]): UIO[Boolean] =
for {
completionStatuses <- getStreamCompletionStatuses(newCommits)
statusStrings = completionStatuses.map(_.toString)
_ <- ZIO.logDebug(s"Waiting for ${streamsToEnd.size} streams to end: ${statusStrings.mkString("; ")}")
} yield completionStatuses.forall { status =>
// A stream is complete when it never got any records, or when it committed the offset of the last consumed record
status.lastPulledOffset.isEmpty || (status.isDone && status.endOffsetCommitStatus != EndOffsetNotCommitted)
}

def commitSync: Task[Unit] =
Expand All @@ -174,15 +210,18 @@ private[consumer] final class Runloop private (
//
// Note, we cannot use ZStream.fromQueue because that will emit nothing when the queue is empty.
// Instead, we poll the queue in a loop.
ZIO.logDebug(s"Waiting for ${streamsToEnd.size} streams to end") *>
logInitialStreamCompletionStatuses *>
ZStream
.fromZIO(blockingSleep(commitQueuePollInterval) *> commitQueue.takeAll)
.tap(commitAsync)
.forever
.takeWhile(_ => java.lang.System.nanoTime() <= deadline)
.scan(Chunk.empty[Runloop.Commit])(_ ++ _)
.takeUntilZIO(endingStreamsCompletedAndCommitsExist)
.runDrain *>
.mapZIO(commits => endingStreamsCompletedAndCommitsExist(commits).map((_, commits)))
.takeUntil { case (completed, _) => completed }
.runLast
.map(_.getOrElse((false, Chunk.empty)))
.flatMap { case (completed, commits) => logFinalStreamCompletionStatuses(completed, commits) } *>
commitSync *>
ZIO.logDebug(s"Done waiting for ${streamsToEnd.size} streams to end")
}
Expand Down
Loading