From 292c4197c8bbfc3b74fd4feae4df96803a17c6ae Mon Sep 17 00:00:00 2001 From: crioux-stripe <115596126+crioux-stripe@users.noreply.github.com> Date: Fri, 10 Jan 2025 12:12:04 -0800 Subject: [PATCH] Add a rebatch call to RemoteObservable connections between stages. (#744) * Add a rebatch call to RemoteObservable connections between stages. * Rename property to workerClient.buffer.size --- .../remote/observable/RemoteObservable.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java b/mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java index 307ece777..61cd15566 100644 --- a/mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java +++ b/mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java @@ -45,6 +45,7 @@ import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import mantis.io.reactivex.netty.RxNetty; import mantis.io.reactivex.netty.channel.ObservableConnection; @@ -72,6 +73,10 @@ public class RemoteObservable { private static boolean enableNettyLogging = false; private static boolean enableCompression = true; private static int maxFrameLength = 5242880; // 5 MB max frame + private static int bufferSize = 0; + private static final String DEFAULT_BUFFER_SIZE_STR = "0"; + + // NJ static { @@ -106,6 +111,9 @@ private static void loadFastProperties() { if (maxFrameLengthStr != null && maxFrameLengthStr.length() > 0) { maxFrameLength = Integer.parseInt(maxFrameLengthStr); } + + String bufferSizeStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("workerClient.buffer.size", DEFAULT_BUFFER_SIZE_STR); + bufferSize = Integer.parseInt(Optional.ofNullable(bufferSizeStr).orElse(DEFAULT_BUFFER_SIZE_STR)); } private static Func1, ? extends Observable> retryLogic(final @@ -249,7 +257,8 @@ public Observable call(final ObservableConnection("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")); + .lift(new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")) + .rebatchRequests(bufferSize <= 0 ? 1 : bufferSize); } }) .doOnCompleted(new Action0() { @@ -394,7 +403,8 @@ public Observable call(final ObservableConnection("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")); + .lift(new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")) + .rebatchRequests(bufferSize <= 0 ? 1 : bufferSize); } }) .doOnCompleted(new Action0() { @@ -518,7 +528,8 @@ public Observable call(final ObservableConnection("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServer")); + .lift(new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServer")) + .rebatchRequests(bufferSize <= 0 ? 1 : bufferSize); } }) .doOnCompleted(new Action0() {