Skip to content

Commit

Permalink
fix: Circuit breaker with id for journal and snapshot store (akka#32555)
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren authored Oct 8, 2024
1 parent dc38598 commit 542aa74
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import akka.actor.{
}
import akka.annotation.InternalApi
import akka.pattern.internal.CircuitBreakerTelemetryProvider
import com.typesafe.config.Config

import scala.jdk.CollectionConverters._

/**
Expand Down Expand Up @@ -63,14 +65,26 @@ final class CircuitBreakersRegistry(system: ExtendedActorSystem) extends Extensi
if (config.hasPath(id)) config.getConfig(id).withFallback(defaultBreakerConfig)
else defaultBreakerConfig

val maxFailures = breakerConfig.getInt("max-failures")
val callTimeout = breakerConfig.getDuration("call-timeout", MILLISECONDS).millis
val resetTimeout = breakerConfig.getDuration("reset-timeout", MILLISECONDS).millis
val maxResetTimeout = breakerConfig.getDuration("max-reset-timeout", MILLISECONDS).millis
val exponentialBackoffFactor = breakerConfig.getDouble("exponential-backoff")
val randomFactor = breakerConfig.getDouble("random-factor")
breakerFromConfig(id, breakerConfig)
}

/** INTERNAL API */
@InternalApi
private[akka] def getOrCreate(id: String, config: Config): CircuitBreaker =
breakers.computeIfAbsent(id, _ => breakerFromConfig(id, config))

private[akka] def get(id: String): CircuitBreaker =
breakers.computeIfAbsent(id, createCircuitBreaker)

val allowExceptions: Set[String] = breakerConfig.getStringList("exception-allowlist").asScala.toSet
private def breakerFromConfig(id: String, config: Config): CircuitBreaker = {
val maxFailures = config.getInt("max-failures")
val callTimeout = config.getDuration("call-timeout", MILLISECONDS).millis
val resetTimeout = config.getDuration("reset-timeout", MILLISECONDS).millis
val maxResetTimeout = config.getDuration("max-reset-timeout", MILLISECONDS).millis
val exponentialBackoffFactor = config.getDouble("exponential-backoff")
val randomFactor = config.getDouble("random-factor")

val allowExceptions: Set[String] = config.getStringList("exception-allowlist").asScala.toSet

val telemetry = CircuitBreakerTelemetryProvider.start(id, system)
new CircuitBreaker(
Expand All @@ -84,12 +98,4 @@ final class CircuitBreakersRegistry(system: ExtendedActorSystem) extends Extensi
allowExceptions,
telemetry)(system.dispatcher)
}

/** INTERNAL API */
@InternalApi
private[akka] def getOrCreate(id: String)(customCreate: () => CircuitBreaker): CircuitBreaker =
breakers.computeIfAbsent(id, _ => customCreate())

private[akka] def get(id: String): CircuitBreaker =
breakers.computeIfAbsent(id, createCircuitBreaker)
}
8 changes: 8 additions & 0 deletions akka-persistence/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ akka.persistence {
max-failures = 10
call-timeout = 10s
reset-timeout = 30s
max-reset-timeout = ${akka.circuit-breaker.default.max-reset-timeout}
exponential-backoff = ${akka.circuit-breaker.default.exponential-backoff}
random-factor = ${akka.circuit-breaker.default.random-factor}
exception-allowlist = []
}

# The replay filter can detect a corrupt event stream by inspecting
Expand Down Expand Up @@ -166,6 +170,10 @@ akka.persistence {
max-failures = 5
call-timeout = 20s
reset-timeout = 60s
max-reset-timeout = ${akka.circuit-breaker.default.max-reset-timeout}
exponential-backoff = ${akka.circuit-breaker.default.exponential-backoff}
random-factor = ${akka.circuit-breaker.default.random-factor}
exception-allowlist = []
}

# Set this to true if successful loading of snapshot is not necessary.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,9 @@ package akka.persistence.journal
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal

import akka.actor._
import akka.pattern.CircuitBreaker
import akka.pattern.CircuitBreakersRegistry
import akka.pattern.pipe
import akka.persistence._
Expand All @@ -30,12 +27,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery {
private val config = extension.configFor(self)

private val breaker = {
val maxFailures = config.getInt("circuit-breaker.max-failures")
val callTimeout = config.getDuration("circuit-breaker.call-timeout", MILLISECONDS).millis
val resetTimeout = config.getDuration("circuit-breaker.reset-timeout", MILLISECONDS).millis
val id = extension.extensionIdFor(self)
CircuitBreakersRegistry(context.system).getOrCreate(id)(() =>
CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout))
CircuitBreakersRegistry(context.system).getOrCreate(id, config.getConfig("circuit-breaker"))
}

private val replayFilterMode: ReplayFilter.Mode =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ package akka.persistence.snapshot

import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
import akka.actor._
import akka.pattern.CircuitBreaker
import akka.pattern.CircuitBreakersRegistry
import akka.pattern.pipe
import akka.persistence._
Expand All @@ -23,13 +21,9 @@ trait SnapshotStore extends Actor with ActorLogging {
private val publish = extension.settings.internal.publishPluginCommands

private val breaker = {
val cfg = extension.configFor(self)
val maxFailures = cfg.getInt("circuit-breaker.max-failures")
val callTimeout = cfg.getDuration("circuit-breaker.call-timeout", MILLISECONDS).millis
val resetTimeout = cfg.getDuration("circuit-breaker.reset-timeout", MILLISECONDS).millis
val config = extension.configFor(self)
val id = extension.extensionIdFor(self)
CircuitBreakersRegistry(context.system).getOrCreate(id)(() =>
CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout))
CircuitBreakersRegistry(context.system).getOrCreate(id, config.getConfig("circuit-breaker"))
}

final def receive = receiveSnapshotStore.orElse[Any, Unit](receivePluginInternal)
Expand Down

0 comments on commit 542aa74

Please sign in to comment.