- 
                Notifications
    You must be signed in to change notification settings 
- Fork 89
          Refactor Subscriber
          #984
        
          New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: series/2.x
Are you sure you want to change the base?
  
    Refactor Subscriber
  
  #984
              Conversation
The main goal is to reduce the overhead when we have a lot of subscriptions. - Avoid locking for `subscribe` and `unsubscribe`. We should be able to subscribe to or unsubscribe from other channels/patterns independent of concurrent subscriptions to other channels/patterns without being blocked. We represent the subscription lifecycle as multiple states. We can move between those state without locking, but still keep operations on the same channel/key wait on state changes by using a `Deferred` as a signal. We don't go back to a single `Ref` with a `Map`, but switch to using a `MapRef` backed by a `ConcurrentHashMap` allowing us to reduce contention if we have a lot of concurrent state changes. - Avoid using multiple `RedisPubSubListener`s. The listeners are only called one by one. Since we have a map with all subscriptions, we can look up the subscription and publish to its topic in one listener.
Scala 3 is still unhappy with value classes and generic methods
| Sorry for delaying the request for so long. Was busy, then got sick. I hope to review this this week. | 
| sub: RedisChannel[String] => IO[Unit], | ||
| unsub: RedisChannel[String] => IO[Unit] | ||
| ): IO[Subscriber.SubscriptionMap[IO, RedisChannel[String], String]] = { | ||
| // import effect.Log.Stdout._ | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove?
| subscription <- map.subscribe(channel1).compile.toList.start | ||
| _ <- waitOnFiber | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of sleep which makes for long tests and race conditions in CI, how about cats effect's testcontrol? Otherwise, if we want to keep the exact runtime:
| subscription <- map.subscribe(channel1).compile.toList.start | |
| _ <- waitOnFiber | |
| waitForSubscribe <- Deferred[IO, Unit] | |
| subscription <- map.subscribe(channel1).compile.toList.flatTap(_ => waitForSubscribe.complete(())).start | |
| _ <- waitForSubscribe.get() | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used the sleep initially because that is what RedisPubSubSpec uses, but unlike those tests we aren't actually communicating with redis here.
The complicating factor is that subscribe is doing quite a lot: it is subscribing, but also returning the messages. In these tests we want to be subscribed (both to redis and to the Topic) before we use any of the other operations where we assume there is a subscription.
I am working on a change to the internal subscribe signature to make it easier to test (separating the subscription from the message consumption effect), but I still need to clean up these tests a bit more before I'll push that work. Thanks for this comment, I should have given this some more thought before.
On a side note, your Deferred suggestion unfortunately doesn't work. The subscribe method never terminates on it is own. We need something to call unsubscribe or end the stream early.
| // Lettuce calls the listeners one by one (for every subscribe, | ||
| // unsubscribe, message, ...), so using multiple listeners when we can | ||
| // find the right subscription easily, is inefficient. | ||
| dispatcher <- Dispatcher.sequential[F] // we have no parallelism in the listener | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not familiar with the pubsub code, but wonder if this code would be simpler if for each pubSub.subscribe(myChannel), we add a listener to Lettuce what creates a sequential  dispatcher and puts messages on a queue which an FS2 stream consumes. This would delegate all the state handling to Lettuce which in turn uses Netty.
Looking at existing code, I think it's somewhat what I'd expect, where a listener and subscriber is created for each subscribe call. All the listener does is publish to a fs2.Topic (which I'm not familiar with, but guessing it's some queue). Also seems like it could use sequential and not parallel as all it does it add to Topic.
So if I understand correctly a single dispatcher could work, but not sure it's necessarily better than multiple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing code uses a listener for every subscription indeed. Lettuce calls every listener for every message though. So the more subscriptions we have, the more listeners will get called where all but one will ignore the message. That feels very inefficient. Since we have a map with all subscriptions, we can pass the message to the Topic from the right subscription directly with a single listener and thus avoid calling N listeners for every message (when we have N distinct subscriptions).
This would delegate all the state handling to Lettuce which in turn uses Netty.
Not sure what you mean here. Lettuce only has a list of listeners, it doesn't know what those listeners are interested in (message, subscribed, unsubscribed, ...).
- https://github.com/redis/lettuce/blob/main/src/main/java/io/lettuce/core/pubsub/RedisPubSubListener.java
- https://github.com/redis/lettuce/blob/8321f5d5b4877cde5a3274bcbbcddcb81a1c4589/src/main/java/io/lettuce/core/pubsub/PubSubEndpoint.java#L251-L286
So if I understand correctly a single dispatcher could work, but not sure it's necessarily better than multiple.
In the existing code a single sequential dispatcher would have worked too I think. Since the listeners get called one by one and we use unsafeRunSync, there is no way for us to ever have multiple dispatcher calls at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, Lettuce does a linear scan, whereas we can do constant time lookup to route message to respective topic. This makes sense from an efficiency perspective, which isn't to say that the Lettuce design is bad.
My initial gut reaction would be to allow supporting both, such that end users could set up multiple listeners if they want multiple consumers for same key, but the default Subscriber goes for efficiency. Will refine these thoughts while reviewing.
Thanks for clarifying, will give PR another look.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is already support for multiple subscribers for the same key. That is why the code is using a Topic, which uses a Channel for every subscription.
| new RedisPubSubAdapter[K, V] { | ||
| override def message(ch: K, msg: V): Unit = | ||
| try | ||
| dispatcher.unsafeRunSync(state.channelSubs.onMessage(RedisChannel(ch), msg)) | 
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mention this above, but since we funnel all messages through a single sequential dispatcher, which is backed by a single queue for the IO effects, this could create "head-of-line" blocking. There could exist a topic with a single subscriber blocking, and that would lead to block subscribers on all other topics. And this is b/c onMessage just routes to a topic, then publishes to it.
I think solutions would be to add queues or do routing at the Java layer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this could create "head-of-line" blocking
We already have head of line blocking, lettuce uses a single connection to redis and calls listeners one by one. A topic with a single blocking subscriber currently also already blocks all subscriptions. This PR doesn't solve that issue no. The main change in this PR is to avoid blocking while subscribing and unsubscribing, the rest of the behaviour should have stayed the same.
I don't think it is an issue that we can solve, I think the best we can do is to document it clearly that blocked subscribers can block all redis subscriptions and that clients should handle this on their end.
| 
 I agree with point (2) and I would expect better performance with a single listener that dispatches to single dispatcher. However, we need to be careful of subscribers of one topic blocking subscribers of other topics. Regarding (1), yeah it's significantly more complicated, which makes it difficult to reason about correctness. Curious how back the blocking could be with AtomicCell and if there is a cat's Read-Write Lock we could use. I like the use of MapRef, but how come we the  | 
| 
 If we don't lock while subscribing and unsubscribing, we need to track that we are subscribing or unsubscribing, so that concurrent calls to subscribe/unsubscribe to they same key can wait until the in progress subscribe/unsubscribe finishes (to use the fresh subscription, retry, ...). This does make it more complicated indeed, that is unfortunately the consequence of removing the single shared lock for all subscriptions. We could wait untl typelevel/cats-effect#4424 lands to make a  
 Yes that is correct, but this should be faster than how we currently call all listerers one by one for all messages. | 
| Sorry for the very long delay, I didn't find time to come back to this. I pushed the changes I mentioned to handle a subscription lifecycle more explicitly (using  It would be nice to expose a  | 
The main goal is to reduce the overhead we add when we have a lot of subscriptions.
subscribeandunsubscribe. We should be able to subscribe to or unsubscribe from channels/patterns independent of concurrent subscriptions to other channels/patterns without being blocked.In this PR we represent the subscription lifecycle as multiple states. We can move between those state without locking, but still keep operations on the same channel/pattern wait on state changes by using a
Deferredas a signal.We don't go back to a single
Refwith aMap, but switch to using aMapRefbacked by aConcurrentHashMapallowing us to reduce contention if we have a lot of concurrent state changes.RedisPubSubListeners.The listeners are only called one by one. Since we have a map with all subscriptions, we can look up the subscription and publish to its topic in one listener.
This is a more ambitious version of what I started in #972.
Unfortunately this made the diff a lot larger than I wanted it to be.
unsubscribefailing currently leaves us in a state where we can't retry, since we keep aRedis4CatsSubscriptionwith a single subscription, without actual subscribers to the topic).Subscriberimplementation into theSubscribercompanion object, which I think makes things easier (but that is probably subjective).The
Redis4CatsSubscriptionclass is now theActivepart ofSubscriptionState.I tried to keep the behavior as closely to the existing implementation as I could. There are a few places that could use some additional work, but I didn't want to make any additional changes to implementation.
unsubscribeinstead of waiting on the last subscription stream finalizer (since no new messages will be processed anyway).We should document that one single subscriber not keeping up with its channel will block not only all subscriptions for the same channel/pattern but for all channels/patterns, since we can't publish to the topic.
We could potentially publish to multiple channels/patterns in parallel (but we should benchmark first)