From 542aa745730238e1afe549536c43359c326ae371 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Tue, 8 Oct 2024 19:18:03 +0200 Subject: [PATCH] fix: Circuit breaker with id for journal and snapshot store (#32555) --- .../pattern/CircuitBreakersRegistry.scala | 36 +++++++++++-------- .../src/main/resources/reference.conf | 8 +++++ .../journal/AsyncWriteJournal.scala | 9 +---- .../persistence/snapshot/SnapshotStore.scala | 10 ++---- 4 files changed, 32 insertions(+), 31 deletions(-) diff --git a/akka-actor/src/main/scala/akka/pattern/CircuitBreakersRegistry.scala b/akka-actor/src/main/scala/akka/pattern/CircuitBreakersRegistry.scala index a29e16885a5..20017047a28 100644 --- a/akka-actor/src/main/scala/akka/pattern/CircuitBreakersRegistry.scala +++ b/akka-actor/src/main/scala/akka/pattern/CircuitBreakersRegistry.scala @@ -16,6 +16,8 @@ import akka.actor.{ } import akka.annotation.InternalApi import akka.pattern.internal.CircuitBreakerTelemetryProvider +import com.typesafe.config.Config + import scala.jdk.CollectionConverters._ /** @@ -63,14 +65,26 @@ final class CircuitBreakersRegistry(system: ExtendedActorSystem) extends Extensi if (config.hasPath(id)) config.getConfig(id).withFallback(defaultBreakerConfig) else defaultBreakerConfig - val maxFailures = breakerConfig.getInt("max-failures") - val callTimeout = breakerConfig.getDuration("call-timeout", MILLISECONDS).millis - val resetTimeout = breakerConfig.getDuration("reset-timeout", MILLISECONDS).millis - val maxResetTimeout = breakerConfig.getDuration("max-reset-timeout", MILLISECONDS).millis - val exponentialBackoffFactor = breakerConfig.getDouble("exponential-backoff") - val randomFactor = breakerConfig.getDouble("random-factor") + breakerFromConfig(id, breakerConfig) + } + + /** INTERNAL API */ + @InternalApi + private[akka] def getOrCreate(id: String, config: Config): CircuitBreaker = + breakers.computeIfAbsent(id, _ => breakerFromConfig(id, config)) + + private[akka] def get(id: String): CircuitBreaker = + breakers.computeIfAbsent(id, createCircuitBreaker) - val allowExceptions: Set[String] = breakerConfig.getStringList("exception-allowlist").asScala.toSet + private def breakerFromConfig(id: String, config: Config): CircuitBreaker = { + val maxFailures = config.getInt("max-failures") + val callTimeout = config.getDuration("call-timeout", MILLISECONDS).millis + val resetTimeout = config.getDuration("reset-timeout", MILLISECONDS).millis + val maxResetTimeout = config.getDuration("max-reset-timeout", MILLISECONDS).millis + val exponentialBackoffFactor = config.getDouble("exponential-backoff") + val randomFactor = config.getDouble("random-factor") + + val allowExceptions: Set[String] = config.getStringList("exception-allowlist").asScala.toSet val telemetry = CircuitBreakerTelemetryProvider.start(id, system) new CircuitBreaker( @@ -84,12 +98,4 @@ final class CircuitBreakersRegistry(system: ExtendedActorSystem) extends Extensi allowExceptions, telemetry)(system.dispatcher) } - - /** INTERNAL API */ - @InternalApi - private[akka] def getOrCreate(id: String)(customCreate: () => CircuitBreaker): CircuitBreaker = - breakers.computeIfAbsent(id, _ => customCreate()) - - private[akka] def get(id: String): CircuitBreaker = - breakers.computeIfAbsent(id, createCircuitBreaker) } diff --git a/akka-persistence/src/main/resources/reference.conf b/akka-persistence/src/main/resources/reference.conf index 59923b782fa..57ecf3ef90a 100644 --- a/akka-persistence/src/main/resources/reference.conf +++ b/akka-persistence/src/main/resources/reference.conf @@ -121,6 +121,10 @@ akka.persistence { max-failures = 10 call-timeout = 10s reset-timeout = 30s + max-reset-timeout = ${akka.circuit-breaker.default.max-reset-timeout} + exponential-backoff = ${akka.circuit-breaker.default.exponential-backoff} + random-factor = ${akka.circuit-breaker.default.random-factor} + exception-allowlist = [] } # The replay filter can detect a corrupt event stream by inspecting @@ -166,6 +170,10 @@ akka.persistence { max-failures = 5 call-timeout = 20s reset-timeout = 60s + max-reset-timeout = ${akka.circuit-breaker.default.max-reset-timeout} + exponential-backoff = ${akka.circuit-breaker.default.exponential-backoff} + random-factor = ${akka.circuit-breaker.default.random-factor} + exception-allowlist = [] } # Set this to true if successful loading of snapshot is not necessary. diff --git a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala index 878104fbfb6..fbe977bcee1 100644 --- a/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala +++ b/akka-persistence/src/main/scala/akka/persistence/journal/AsyncWriteJournal.scala @@ -7,12 +7,9 @@ package akka.persistence.journal import scala.collection.immutable import scala.concurrent.ExecutionContext import scala.concurrent.Future -import scala.concurrent.duration._ import scala.util.{ Failure, Success, Try } import scala.util.control.NonFatal - import akka.actor._ -import akka.pattern.CircuitBreaker import akka.pattern.CircuitBreakersRegistry import akka.pattern.pipe import akka.persistence._ @@ -30,12 +27,8 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase with AsyncRecovery { private val config = extension.configFor(self) private val breaker = { - val maxFailures = config.getInt("circuit-breaker.max-failures") - val callTimeout = config.getDuration("circuit-breaker.call-timeout", MILLISECONDS).millis - val resetTimeout = config.getDuration("circuit-breaker.reset-timeout", MILLISECONDS).millis val id = extension.extensionIdFor(self) - CircuitBreakersRegistry(context.system).getOrCreate(id)(() => - CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout)) + CircuitBreakersRegistry(context.system).getOrCreate(id, config.getConfig("circuit-breaker")) } private val replayFilterMode: ReplayFilter.Mode = diff --git a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala index e4e6d17cd4f..47e14a58822 100644 --- a/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala +++ b/akka-persistence/src/main/scala/akka/persistence/snapshot/SnapshotStore.scala @@ -6,9 +6,7 @@ package akka.persistence.snapshot import scala.concurrent.ExecutionContext import scala.concurrent.Future -import scala.concurrent.duration._ import akka.actor._ -import akka.pattern.CircuitBreaker import akka.pattern.CircuitBreakersRegistry import akka.pattern.pipe import akka.persistence._ @@ -23,13 +21,9 @@ trait SnapshotStore extends Actor with ActorLogging { private val publish = extension.settings.internal.publishPluginCommands private val breaker = { - val cfg = extension.configFor(self) - val maxFailures = cfg.getInt("circuit-breaker.max-failures") - val callTimeout = cfg.getDuration("circuit-breaker.call-timeout", MILLISECONDS).millis - val resetTimeout = cfg.getDuration("circuit-breaker.reset-timeout", MILLISECONDS).millis + val config = extension.configFor(self) val id = extension.extensionIdFor(self) - CircuitBreakersRegistry(context.system).getOrCreate(id)(() => - CircuitBreaker(context.system.scheduler, maxFailures, callTimeout, resetTimeout)) + CircuitBreakersRegistry(context.system).getOrCreate(id, config.getConfig("circuit-breaker")) } final def receive = receiveSnapshotStore.orElse[Any, Unit](receivePluginInternal)