diff --git a/mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java b/mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java index 307ece777..61cd15566 100644 --- a/mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java +++ b/mantis-remote-observable/src/main/java/io/reactivex/mantis/remote/observable/RemoteObservable.java @@ -45,6 +45,7 @@ import java.lang.reflect.InvocationTargetException; import java.nio.ByteBuffer; import java.util.List; +import java.util.Optional; import java.util.concurrent.TimeUnit; import mantis.io.reactivex.netty.RxNetty; import mantis.io.reactivex.netty.channel.ObservableConnection; @@ -72,6 +73,10 @@ public class RemoteObservable { private static boolean enableNettyLogging = false; private static boolean enableCompression = true; private static int maxFrameLength = 5242880; // 5 MB max frame + private static int bufferSize = 0; + private static final String DEFAULT_BUFFER_SIZE_STR = "0"; + + // NJ static { @@ -106,6 +111,9 @@ private static void loadFastProperties() { if (maxFrameLengthStr != null && maxFrameLengthStr.length() > 0) { maxFrameLength = Integer.parseInt(maxFrameLengthStr); } + + String bufferSizeStr = ServiceRegistry.INSTANCE.getPropertiesService().getStringValue("workerClient.buffer.size", DEFAULT_BUFFER_SIZE_STR); + bufferSize = Integer.parseInt(Optional.ofNullable(bufferSizeStr).orElse(DEFAULT_BUFFER_SIZE_STR)); } private static Func1, ? extends Observable> retryLogic(final @@ -249,7 +257,8 @@ public Observable call(final ObservableConnection("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")); + .lift(new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")) + .rebatchRequests(bufferSize <= 0 ? 1 : bufferSize); } }) .doOnCompleted(new Action0() { @@ -394,7 +403,8 @@ public Observable call(final ObservableConnection("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")); + .lift(new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServerGroups")) + .rebatchRequests(bufferSize <= 0 ? 1 : bufferSize); } }) .doOnCompleted(new Action0() { @@ -518,7 +528,8 @@ public Observable call(final ObservableConnection("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServer")); + .lift(new DropOperator("incoming_" + RemoteObservable.class.getCanonicalName() + "_createTcpConnectionToServer")) + .rebatchRequests(bufferSize <= 0 ? 1 : bufferSize); } }) .doOnCompleted(new Action0() {