diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt index df631b1ce0c..e6a4401419c 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.DotNet.verified.txt @@ -2271,6 +2271,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.StreamRefSettings Create(Akka.Configuration.Config config) { } public Akka.Streams.Dsl.StreamRefSettings WithBufferCapacity(int value) { } public Akka.Streams.Dsl.StreamRefSettings WithDemandRedeliveryInterval(System.TimeSpan value) { } + public Akka.Streams.Dsl.StreamRefSettings WithFinalTerminationSignalDeadline(System.TimeSpan value) { } public Akka.Streams.Dsl.StreamRefSettings WithSubscriptionTimeout(System.TimeSpan value) { } } [Akka.Annotations.ApiMayChangeAttribute()] diff --git a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt index 32083672072..03acf361312 100644 --- a/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt +++ b/src/core/Akka.API.Tests/verify/CoreAPISpec.ApproveStreams.Net.verified.txt @@ -2269,6 +2269,7 @@ namespace Akka.Streams.Dsl public static Akka.Streams.Dsl.StreamRefSettings Create(Akka.Configuration.Config config) { } public Akka.Streams.Dsl.StreamRefSettings WithBufferCapacity(int value) { } public Akka.Streams.Dsl.StreamRefSettings WithDemandRedeliveryInterval(System.TimeSpan value) { } + public Akka.Streams.Dsl.StreamRefSettings WithFinalTerminationSignalDeadline(System.TimeSpan value) { } public Akka.Streams.Dsl.StreamRefSettings WithSubscriptionTimeout(System.TimeSpan value) { } } [Akka.Annotations.ApiMayChangeAttribute()] diff --git a/src/core/Akka.Streams.Tests/ActorMaterializerSettingsSpec.cs b/src/core/Akka.Streams.Tests/ActorMaterializerSettingsSpec.cs new file mode 100644 index 00000000000..cadca0a9fed --- /dev/null +++ b/src/core/Akka.Streams.Tests/ActorMaterializerSettingsSpec.cs @@ -0,0 +1,74 @@ +// ----------------------------------------------------------------------- +// +// Copyright (C) 2009-2022 Lightbend Inc. +// Copyright (C) 2013-2025 .NET Foundation +// +// ----------------------------------------------------------------------- + +using System; +using System.Linq; +using Akka.Configuration; +using Akka.Streams.Dsl; +using Xunit; +using Xunit.Abstractions; + +namespace Akka.Streams.Tests; + +public class ActorMaterializerSettingsSpec: Akka.TestKit.Xunit2.TestKit +{ + private readonly ActorMaterializer _materializer; + public ActorMaterializerSettingsSpec(ITestOutputHelper output) : base(Config.Empty, nameof(ActorMaterializerSettingsSpec), output) + { + _materializer = ActorMaterializer.Create(Sys); + } + + [Fact] + public void ActorMaterializerSettings_Should_contain_default_values() + { + var settings = _materializer.Settings; + Assert.Equal(4, settings.InitialInputBufferSize); + Assert.Equal(16, settings.MaxInputBufferSize); + Assert.Equal(string.Empty, settings.Dispatcher); + Assert.False(settings.IsDebugLogging); + Assert.Equal(1000, settings.OutputBurstLimit); + Assert.True(settings.IsAutoFusing); + Assert.Equal(1000000000, settings.MaxFixedBufferSize); + Assert.Equal(1000, settings.SyncProcessingLimit); + Assert.False(settings.IsFuzzingMode); + + var subscriptionTimeoutSettings = settings.SubscriptionTimeoutSettings; + Assert.Equal(StreamSubscriptionTimeoutTerminationMode.CancelTermination, subscriptionTimeoutSettings.Mode); + Assert.Equal(TimeSpan.FromSeconds(5), subscriptionTimeoutSettings.Timeout); + + var streamRefSettings = settings.StreamRefSettings; + Assert.Equal(32, streamRefSettings.BufferCapacity); + Assert.Equal(TimeSpan.FromSeconds(1), streamRefSettings.DemandRedeliveryInterval); + Assert.Equal(TimeSpan.FromSeconds(30), streamRefSettings.SubscriptionTimeout); + Assert.Equal(TimeSpan.FromSeconds(2), streamRefSettings.FinalTerminationSignalDeadline); + } + + [Fact] + public void ActorMaterializer_serialization_binding_should_be_correct() + { + var config = Sys.Settings.Config.GetConfig("akka.actor"); + + // Serializer should be registered + var serializers = config.GetConfig("serializers").AsEnumerable().ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToString()); + Assert.Contains(serializers.Keys, s => s is "akka-stream-ref"); + Assert.Equal("\"Akka.Streams.Serialization.StreamRefSerializer, Akka.Streams\"", serializers["akka-stream-ref"]); + + // Serializer should have proper type binding + var binding = config.GetConfig("serialization-bindings").AsEnumerable().ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToString()); + Assert.Contains(binding.Keys, s => s is "Akka.Streams.Implementation.StreamRef.SinkRefImpl, Akka.Streams"); + Assert.Equal("akka-stream-ref", binding["Akka.Streams.Implementation.StreamRef.SinkRefImpl, Akka.Streams"]); + + Assert.Contains(binding.Keys, s => s is "Akka.Streams.Implementation.StreamRef.SourceRefImpl, Akka.Streams"); + Assert.Equal("akka-stream-ref", binding["Akka.Streams.Implementation.StreamRef.SourceRefImpl, Akka.Streams"]); + + Assert.Contains(binding.Keys, s => s is "Akka.Streams.Implementation.StreamRef.IStreamRefsProtocol, Akka.Streams"); + Assert.Equal("akka-stream-ref", binding["Akka.Streams.Implementation.StreamRef.IStreamRefsProtocol, Akka.Streams"]); + + // Serializer should have correct id + Assert.Equal(30, config.GetInt("serialization-identifiers.\"Akka.Streams.Serialization.StreamRefSerializer, Akka.Streams\"")); + } +} \ No newline at end of file diff --git a/src/core/Akka.Streams/Dsl/StreamRefs.cs b/src/core/Akka.Streams/Dsl/StreamRefs.cs index 96256b5233b..584fd8731e7 100644 --- a/src/core/Akka.Streams/Dsl/StreamRefs.cs +++ b/src/core/Akka.Streams/Dsl/StreamRefs.cs @@ -78,6 +78,7 @@ public StreamRefSettings(int bufferCapacity, TimeSpan demandRedeliveryInterval, BufferCapacity = bufferCapacity; DemandRedeliveryInterval = demandRedeliveryInterval; SubscriptionTimeout = subscriptionTimeout; + FinalTerminationSignalDeadline = finalTerminationSignalDeadline; } public string ProductPrefix => nameof(StreamRefSettings); @@ -85,6 +86,7 @@ public StreamRefSettings(int bufferCapacity, TimeSpan demandRedeliveryInterval, public StreamRefSettings WithBufferCapacity(int value) => Copy(bufferCapacity: value); public StreamRefSettings WithDemandRedeliveryInterval(TimeSpan value) => Copy(demandRedeliveryInterval: value); public StreamRefSettings WithSubscriptionTimeout(TimeSpan value) => Copy(subscriptionTimeout: value); + public StreamRefSettings WithFinalTerminationSignalDeadline(TimeSpan value) => Copy(finalTerminationSignalDeadline: value); public StreamRefSettings Copy(int? bufferCapacity = null, TimeSpan? demandRedeliveryInterval = null, diff --git a/src/core/Akka.Streams/reference.conf b/src/core/Akka.Streams/reference.conf index 7c69c485a3d..0a2b280de2c 100644 --- a/src/core/Akka.Streams/reference.conf +++ b/src/core/Akka.Streams/reference.conf @@ -9,17 +9,17 @@ akka { materializer { # Initial size of buffers used in stream elements - # Note: If you change this value also change the fallback value in ActorMaterializerSettings + # Note: If you change this value also change the fallback value in ActorMaterializerSettings initial-input-buffer-size = 4 # Maximum size of buffers used in stream elements - # Note: If you change this value also change the fallback value in ActorMaterializerSettings + # Note: If you change this value also change the fallback value in ActorMaterializerSettings max-input-buffer-size = 16 # Fully qualified config path which holds the dispatcher configuration # to be used by FlowMaterialiser when creating Actors. # When this value is left empty, the default-dispatcher will be used. - # Note: If you change this value also change the fallback value in ActorMaterializerSettings + # Note: If you change this value also change the fallback value in ActorMaterializerSettings dispatcher = "" blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher" @@ -33,31 +33,31 @@ akka { # `cancel()`ing the subscription right away # warn - log a warning statement about the stale element (then drop the # reference to it) - # noop - do nothing (not recommended) - # Note: If you change this value also change the fallback value in StreamSubscriptionTimeoutSettings + # noop - do nothing (not recommended) + # Note: If you change this value also change the fallback value in StreamSubscriptionTimeoutSettings mode = cancel # time after which a subscriber / publisher is considered stale and eligible - # for cancelation (see `akka.stream.subscription-timeout.mode`) - # Note: If you change this value also change the fallback value in StreamSubscriptionTimeoutSettings + # for cancelation (see `akka.stream.subscription-timeout.mode`) + # Note: If you change this value also change the fallback value in StreamSubscriptionTimeoutSettings timeout = 5s } - + # Enable additional troubleshooting logging at DEBUG log level - # Note: If you change this value also change the fallback value in ActorMaterializerSettings + # Note: If you change this value also change the fallback value in ActorMaterializerSettings debug-logging = off # Maximum number of elements emitted in batch if downstream signals large demand - # Note: If you change this value also change the fallback value in ActorMaterializerSettings + # Note: If you change this value also change the fallback value in ActorMaterializerSettings output-burst-limit = 1000 - + # Enable automatic fusing of all graphs that are run. For short-lived streams # this may cause an initial runtime overhead, but most of the time fusing is # desirable since it reduces the number of Actors that are created. - # Note: If you change this value also change the fallback value in ActorMaterializerSettings + # Note: If you change this value also change the fallback value in ActorMaterializerSettings auto-fusing = on - # Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered, + # Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered, # buffer, flatMapMerge, Source.actorRef, Source.queue, etc.) will preallocate a fixed # buffer upon stream materialization if the requested buffer size is less than this # configuration parameter. The default is very high because failing early is better @@ -65,14 +65,14 @@ akka { # # Buffers sized larger than this will dynamically grow/shrink and consume more memory # per element than the fixed size buffers. - # Note: If you change this value also change the fallback value in ActorMaterializerSettings + # Note: If you change this value also change the fallback value in ActorMaterializerSettings max-fixed-buffer-size = 1000000000 - # Maximum number of sync messages that actor can process for stream to substream communication. - # Parameter allows to interrupt synchronous processing to get upsteam/downstream messages. - # Allows to accelerate message processing that happening withing same actor but keep system responsive. - # Note: If you change this value also change the fallback value in ActorMaterializerSettings - sync-processing-limit = 1000 + # Maximum number of sync messages that actor can process for stream to substream communication. + # Parameter allows to interrupt synchronous processing to get upsteam/downstream messages. + # Allows to accelerate message processing that happening withing same actor but keep system responsive. + # Note: If you change this value also change the fallback value in ActorMaterializerSettings + sync-processing-limit = 1000 debug { # Enables the fuzzing mode which increases the chance of race conditions @@ -82,17 +82,17 @@ akka { # environment! # To get the best results, try combining this setting with a throughput # of 1 on the corresponding dispatchers. - # Note: If you change this value also change the fallback value in ActorMaterializerSettings + # Note: If you change this value also change the fallback value in ActorMaterializerSettings fuzzing-mode = off } - + stream-ref { # Buffer of a SinkRef that is used to batch Request elements from the other side of the stream ref # # The buffer will be attempted to be filled eagerly even while the local stage did not request elements, # because the delay of requesting over network boundaries is much higher. buffer-capacity = 32 - + # Demand is signalled by sending a cumulative demand message ("requesting messages until the n-th sequence number) # Using a cumulative demand model allows us to re-deliver the demand message in case of message loss (which should # be very rare in any case, yet possible -- mostly under connection break-down and re-establishment). @@ -102,13 +102,13 @@ akka { # In normal operation, demand is signalled in response to arriving elements, however if no new elements arrive # within `demand-redelivery-interval` a re-delivery of the demand will be triggered, assuming that it may have gotten lost. demand-redelivery-interval = 1 second - + # Subscription timeout, during which the "remote side" MUST subscribe (materialize) the handed out stream ref. # This timeout does not have to be very low in normal situations, since the remote side may also need to # prepare things before it is ready to materialize the reference. However the timeout is needed to avoid leaking # in-active streams which are never subscribed to. subscription-timeout = 30 seconds - + # In order to guard the receiving end of a stream ref from never terminating (since awaiting a Completion or Failed # message) after / before a Terminated is seen, a special timeout is applied once Terminated is received by it. # This allows us to terminate stream refs that have been targeted to other nodes which are Downed, and as such the @@ -133,17 +133,17 @@ akka { protocol = "TLSv1" } actor { - + serializers { akka-stream-ref = "Akka.Streams.Serialization.StreamRefSerializer, Akka.Streams" } - + serialization-bindings { "Akka.Streams.Implementation.StreamRef.SinkRefImpl, Akka.Streams" = akka-stream-ref "Akka.Streams.Implementation.StreamRef.SourceRefImpl, Akka.Streams" = akka-stream-ref "Akka.Streams.Implementation.StreamRef.IStreamRefsProtocol, Akka.Streams" = akka-stream-ref } - + serialization-identifiers { "Akka.Streams.Serialization.StreamRefSerializer, Akka.Streams" = 30 }