Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MultiProducerSingleConsumerChannel #305

Open
wants to merge 15 commits into
base: main
Choose a base branch
from

Conversation

FranzBusch
Copy link
Member

Motivation

The pitch to add external backpressure support to the standard libraries AsyncStream got returned for revision since there are larger open questions around AsyncSequence. However, having external backpressure in a source asynchronous sequence is becoming more and more important.

Modification

This PR adds a modified proposal and implementation that brings the Swift Evolution proposal over to Swift Async Algorithms.

@FranzBusch FranzBusch force-pushed the fb-async-backpressured-stream branch from ddd7523 to 65e8957 Compare December 19, 2023 11:49
@FranzBusch
Copy link
Member Author

@swift-ci please test

@FranzBusch FranzBusch requested a review from phausler December 20, 2023 09:57
@FranzBusch FranzBusch force-pushed the fb-async-backpressured-stream branch from 4904039 to 662693e Compare June 14, 2024 08:15
@FranzBusch FranzBusch changed the title Add AsyncBackpressuredStream proposal and implementation Add MultiProducerSingleConsumerChannel Jun 14, 2024

### Upstream producer termination

The producer will get notified about termination through the `onTerminate`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can you add calls to onTerminate to the snippets below?

})
}
} catch {
// `send(contentsOf:)` throws if the asynchronous stream already terminated
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this be clearer as another case on the sendResult?

/// - Parameters:
/// - low: When the number of buffered elements drops below the low watermark, producers will be resumed.
/// - high: When the number of buffered elements rises above the high watermark, producers will be suspended.
/// - waterLevelForElement: A closure used to compute the contribution of each buffered element to the current water level.
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, neat. This works nicely for collections.

//
//===----------------------------------------------------------------------===//

#if compiler(>=6.0)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please can we consider offering a pre-Swift 6 variant of this type. There are several places we'd like to adopt this that need to also support Swift 5.9+ for the foreseeable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm not sure yet. Especially since it is part of the public API in the Source. Making the Source copyable makes the type behave way differently.

@ser-0xff
Copy link

Hi!
Thank you for the PR!
We are looking for the functionality you did in MultiProducerSingleConsumerChannel and tried to play with it.
Unfortunately we were not able to compile it because we need it in the framework which is compiled with evolution mode.
I did some changes to make it compile, but not sure these changes are good enough to be a part of public framework.
Do you have any plans to support evolution mode for the AsyncAlgorithms?
We prepared a small reproducing project for you for a case if you will have some time to look at that.
Just

git clone https://github.com/ordo-one/external-reproducers.git
cd external-reproducers/swift/async-algorithms-evolution
xcodebuild archive -quiet -scheme async-algorithms-evolution -configuration Debug -destination 'platform=macOS,arch=arm64' BUILD_LIBRARY_FOR_DISTRIBUTION=YES

@FranzBusch
Copy link
Member Author

@ser-0xff Thanks for trying it out. We don't plan on support library evolution mode in any of our packages. What we normally advise is to not compile packages as frameworks but rather add this library as an internal import only dependency inside the frameworks that need it.

@ser-0xff
Copy link

Thank you for the reply...
Unfortunately internal import does not work if we want to use package type (like MultiProducerSingleConsumerChannel) in the framework public API...

@hassila
Copy link

hassila commented Oct 24, 2024

Thank you for the reply... Unfortunately internal import does not work if we want to use package type (like MultiProducerSingleConsumerChannel) in the framework public API...

@ser-0xff I guess we need to wrap up the API surface we want to support then and just do internal import as suggested? But let's take that offline from this PR - thanks @FranzBusch for quick reply.

@ser-0xff
Copy link

ser-0xff commented Oct 24, 2024

Hi, @FranzBusch
We run into another issue when we played with the MultiProducerSingleConsumerChannel.
Our projects are already on Swift6 which has a stricter non-copyable types checks. The MultiProducerSingleConsumerChannel<>.Source type is non-copyable so we can't share it across many tasks, as a result we can use it only in single producer mode.
How could we solve that issue and produce data within multiple tasks?

Here is a minimized test showing the usage pattern we want if you will find some time to look at.

@FranzBusch
Copy link
Member Author

We run into another issue when we played with the MultiProducerSingleConsumerChannel.
Our projects are already on Swift6 which has a stricter non-copyable types checks. The MultiProducerSingleConsumerChannel<>.Source type is non-copyable so we can't share it across many tasks, as a result we can use it only in single producer mode.
How could we solve that issue and produce data within multiple tasks?

The source has a copy method that allows you to get a second source. Now the problem is that closures cannot be marked as ~Copyable or rathe @calledOnce. To solution to this is to box the source into a class right now and move it into a task. This either requires a lock or an @unchecked Sendable annotation right now.

@ser-0xff
Copy link

ser-0xff commented Oct 28, 2024

Missed copy() method in source code.
Will look at that, thank you for the reply.

@ser-0xff
Copy link

ser-0xff commented Nov 5, 2024

HI, Franz!
We tried using MultiProducersSingleConsumerChannel instead of AsyncStream and encountered behavior unexpected for us. We expected the channel to remain functional as long as at least one associated source was active. However, in the current implementation, it appears that the channel transitions to a finished state as soon as any single source instance is deinitialized.
I created a PR with minimized test into your feature branch for a case if you could find some time to look at that and may be later will include that test to the test suite for MultiProducersSingleConsumerChannel.

@FranzBusch
Copy link
Member Author

HI, Franz! We tried using MultiProducersSingleConsumerChannel instead of AsyncStream and encountered behavior unexpected for us. We expected the channel to remain functional as long as at least one associated source was active. However, in the current implementation, it appears that the channel transitions to a finished state as soon as any single source instance is deinitialized. I created a PR with minimized test into your feature branch for a case if you could find some time to look at that and may be later will include that test to the test suite for MultiProducersSingleConsumerChannel.

You are right. I haven't gotten around to implement the correct handling of multiple sources yet.

Copy link
Member

@phausler phausler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with some minor edits the pitch is good to go; id like to at the bare minimum see some of that testing reenabled to ensure it isn't leaking.

/// It supports arbitrary many elements but if only up to one ``Element`` is stored it does **not** allocate separate storage on the heap
/// and instead stores the ``Element`` inline.
@usableFromInline
struct _TinyArray<Element> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this could perhaps be subsumed by InlineArray instead since the original intent was to have a faster storage (this is not a blocking concept)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InlineArray isn't capable of fully replacing the need here. The idea behind TinyArray is to have a fast path for a single element that doesn't allocate and one that allocates if there are more. Since we can't tell at compile time how many producers we have we need this runtime dynamism.

@@ -17,24 +17,35 @@ import Glibc
import WinSDK
#endif

internal struct Lock {
@usableFromInline
internal class Lock {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

umm why is this a non-final class? If we are moving to a class then we should just use Mutex from Synchronization

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed all of the changes here since the latest code now uses Mutex.

final class MultiProducerSingleConsumerChannelTests: XCTestCase {
// MARK: - sequenceDeinitialized

// Following tests are disabled since the channel is not getting deinited due to a known bug
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a requirement to landing; id rather not introduce a type with a known leak

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added significantly more tests now and also re-enabled those.


```swift
// Writing new values and providing a callback when to produce more
try source.send(contentsOf: sequence, onProduceMore: { result in
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be written without ambiguity as a trailing closure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes in a synchronous context both send methods can be called without ambiguity, e.g. this compiles:

try source.send(1)
source.send(2) { result in
    print(result)
}

The only thing that needs some special handling is if you want to call a sync send from an async context since the async methods are preferred; however, users can workaround this by creating an inline closure, e.g.:

try { try source.send(1) }()

@FranzBusch FranzBusch force-pushed the fb-async-backpressured-stream branch from 662693e to 944b727 Compare March 25, 2025 14:00
# Motivation

The pitch to add external backpressure support to the standard libraries `AsyncStream` got returned for revision since there are larger open questions around `AsyncSequence`. However, having external backpressure in a source asynchronous sequence is becoming more and more important.

# Modification

This PR adds a modified proposal and implementation that brings the Swift Evolution proposal over to Swift Async Algorithms.
@FranzBusch FranzBusch force-pushed the fb-async-backpressured-stream branch 2 times, most recently from f086eda to 216e38f Compare March 29, 2025 12:15
@FranzBusch FranzBusch force-pushed the fb-async-backpressured-stream branch from 216e38f to ae3f677 Compare March 29, 2025 13:26
@FranzBusch FranzBusch force-pushed the fb-async-backpressured-stream branch from ae3f677 to b30ec17 Compare March 29, 2025 13:30
@FranzBusch FranzBusch force-pushed the fb-async-backpressured-stream branch from 6b24e93 to 8d2f4d6 Compare March 29, 2025 14:07
/// The channel will only automatically be finished if all existing sources have been deinited.
///
/// - Returns: A new source for sending elements to the channel.
public mutating func copy() -> sending Self {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here (and a in a few other places) we have sending Self where Self conforms to Sendable. in such cases is the sending doing anything meaningful, since the type can already be freely passed across isolations? i think adding a diagnostic to flag such cases as redundant has also been proposed/drafted. i suppose if the Sendable conformance were to be dropped in the future, maybe such a change would be 'less breaking' if things are spelled this way?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for calling this out. I think the right thing is to actually remove the Sendable conformance on Source since it is in fact not Sendable.

/// - Parameters:
/// - low: When the number of buffered elements drops below the low watermark, producers will be resumed.
/// - high: When the number of buffered elements rises above the high watermark, producers will be suspended.
public static func watermark(low: Int, high: Int) -> BackpressureStrategy {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apologies if i missed this having been discussed somewhere in the prior history of this work, but did you consider using a ClosedRange of an appropriate unsigned integer type to model the watermark state? naively it seems that might obviate some of the invariant checks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's interesting but I personally prefer having to Int's here since the watermark strategy is not really about the range but a low and high limit where the production is started and stopped respectively.

mutating func didSend(elements: Deque<Element>.SubSequence) -> Bool {
if let waterLevelForElement = self._waterLevelForElement {
for element in elements {
self._currentWatermark += waterLevelForElement(element)
Copy link
Contributor

@jamieQ jamieQ Mar 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looked to me like this logic runs at various points while the state machine lock is held – any concerns about invoking client code while holding a lock?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I updated the public docs to call this out. I think the reality is that we need to call this code while being under a lock since the state management otherwise would become almost impossible with potentially any code interleaving.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants