diff --git a/.github/workflows/android.yml b/.github/workflows/android.yml index 020b7a18..e0333265 100644 --- a/.github/workflows/android.yml +++ b/.github/workflows/android.yml @@ -60,9 +60,9 @@ jobs: steps: - uses: actions/checkout@v2 - - uses: actions/setup-node@v3 + - uses: actions/setup-node@v4 with: - node-version: '16.x' + node-version: '18.x' - name: Check Build Website run: | cd docs diff --git a/.github/workflows/publish-docs.yml b/.github/workflows/publish-docs.yml index eece8e0f..6ed97560 100644 --- a/.github/workflows/publish-docs.yml +++ b/.github/workflows/publish-docs.yml @@ -10,9 +10,9 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v2 - - uses: actions/setup-node@v3 + - uses: actions/setup-node@v4 with: - node-version: '16.x' + node-version: '18.x' - name: Build website run: | diff --git a/CONTRIBUTION.md b/CONTRIBUTING.md similarity index 97% rename from CONTRIBUTION.md rename to CONTRIBUTING.md index f194ba54..d4ef1b5b 100644 --- a/CONTRIBUTION.md +++ b/CONTRIBUTING.md @@ -61,4 +61,4 @@ GO-JEK Tech [6]: http://tbaggery.com/2008/04/19/a-note-about-git-commit-messages.html [7]: https://help.github.com/articles/using-pull-requests [8]: https://github.com/diffplug/spotless -[9]: https://github.com/Kotlin/binary-compatibility-validator \ No newline at end of file +[9]: https://github.com/Kotlin/binary-compatibility-validator diff --git a/app/build.gradle b/app/build.gradle index e0dd5ef0..44734271 100644 --- a/app/build.gradle +++ b/app/build.gradle @@ -3,13 +3,14 @@ apply plugin: 'kotlin-android' apply plugin: 'kotlin-android-extensions' android { - compileSdkVersion 31 - buildToolsVersion "30.0.3" + compileSdk 34 + + namespace = "com.gojek.courier.app" defaultConfig { applicationId "com.gojek.courier.app" minSdkVersion 21 - targetSdkVersion 31 + targetSdkVersion 34 versionCode 1 versionName "1.0" multiDexEnabled true @@ -62,6 +63,7 @@ dependencies { implementation project(':courier-stream-adapter-rxjava2') implementation project(':courier-message-adapter-gson') + implementation project(':courier-message-adapter-text') implementation project(':courier-message-adapter-moshi') implementation project(':courier-message-adapter-protobuf') implementation project(':adaptive-keep-alive') diff --git a/app/src/main/java/com/gojek/courier/app/data/network/CourierService.kt b/app/src/main/java/com/gojek/courier/app/data/network/CourierService.kt index ed741451..40b06546 100644 --- a/app/src/main/java/com/gojek/courier/app/data/network/CourierService.kt +++ b/app/src/main/java/com/gojek/courier/app/data/network/CourierService.kt @@ -1,21 +1,28 @@ package com.gojek.courier.app.data.network import com.gojek.courier.QoS +import com.gojek.courier.annotation.Callback import com.gojek.courier.annotation.Data import com.gojek.courier.annotation.Path import com.gojek.courier.annotation.Send import com.gojek.courier.annotation.Subscribe +import com.gojek.courier.annotation.SubscribeMultiple +import com.gojek.courier.annotation.TopicMap import com.gojek.courier.annotation.Unsubscribe import com.gojek.courier.app.data.network.model.Message +import com.gojek.courier.callback.SendMessageCallback import io.reactivex.Observable interface CourierService { @Send(topic = "{topic}", qos = QoS.ONE) - fun publish(@Path("topic") topic: String, @Data message: Message) + fun publish(@Path("topic") topic: String, @Data message: Message, @Callback callback: SendMessageCallback) @Subscribe(topic = "{topic}") - fun subscribe(@Path("topic") topic: String): Observable + fun subscribe(@Path("topic") topic: String): Observable @Unsubscribe(topics = ["{topic}"]) fun unsubscribe(@Path("topic") topic: String) + + @SubscribeMultiple + fun subscribeAll(@TopicMap topicMap: Map): Observable } \ No newline at end of file diff --git a/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt b/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt index 2244ce47..09ee7a9b 100644 --- a/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt +++ b/app/src/main/java/com/gojek/courier/app/ui/MainActivity.kt @@ -1,16 +1,21 @@ package com.gojek.courier.app.ui import android.os.Bundle +import android.util.Log import androidx.appcompat.app.AppCompatActivity import com.gojek.chuckmqtt.external.MqttChuckConfig import com.gojek.chuckmqtt.external.MqttChuckInterceptor import com.gojek.chuckmqtt.external.Period import com.gojek.courier.Courier +import com.gojek.courier.QoS +import com.gojek.courier.QoS.ZERO import com.gojek.courier.app.R import com.gojek.courier.app.data.network.CourierService import com.gojek.courier.app.data.network.model.Message +import com.gojek.courier.callback.SendMessageCallback import com.gojek.courier.logging.ILogger import com.gojek.courier.messageadapter.gson.GsonMessageAdapterFactory +import com.gojek.courier.messageadapter.text.TextMessageAdapterFactory import com.gojek.courier.streamadapter.rxjava2.RxJava2StreamAdapterFactory import com.gojek.mqtt.auth.Authenticator import com.gojek.mqtt.client.MqttClient @@ -24,6 +29,7 @@ import com.gojek.mqtt.model.AdaptiveKeepAliveConfig import com.gojek.mqtt.model.KeepAlive import com.gojek.mqtt.model.MqttConnectOptions import com.gojek.mqtt.model.ServerUri +import com.gojek.mqtt.model.Will import com.gojek.workmanager.pingsender.WorkManagerPingSenderConfig import com.gojek.workmanager.pingsender.WorkPingSenderFactory import kotlinx.android.synthetic.main.activity_main.brokerIP @@ -78,12 +84,37 @@ class MainActivity : AppCompatActivity() { send.setOnClickListener { courierService.publish( topic = topic.text.toString(), - message = Message(123, message.text.toString()) + message = Message(123, message.text.toString()), + callback = object : SendMessageCallback { + override fun onMessageSendTrigger() { + Log.d("Courier", "onMessageSendTrigger") + } + + override fun onMessageWrittenOnSocket() { + Log.d("Courier", "onMessageWrittenOnSocket") + } + + override fun onMessageSendSuccess() { + Log.d("Courier", "onMessageSendSuccess") + } + + override fun onMessageSendFailure(error: Throwable) { + Log.d("Courier", "onMessageSendFailure") + } + } ) } subscribe.setOnClickListener { - courierService.subscribe(topic = topic.text.toString()) + val topics = topic.text.toString().split(",") + val stream = if (topics.size == 1) { + courierService.subscribe(topic = topics[0]) + } else { + val topicMap = mutableMapOf() + for (topic in topics) { topicMap[topic] = ZERO } + courierService.subscribeAll(topicMap = topicMap) + } + stream.subscribe { Log.d("Courier", "Message received: $it") } } unsubscribe.setOnClickListener { @@ -92,12 +123,21 @@ class MainActivity : AppCompatActivity() { } private fun connectMqtt(clientId: String, username: String, password: String, ip: String, port: Int) { + + val will = Will( + topic = "last/will/topic", + message = "Client disconnected unexpectedly", + qos = QoS.ZERO, + retained = false + ) + val connectOptions = MqttConnectOptions.Builder() .serverUris(listOf(ServerUri(ip, port, if (port == 443) "ssl" else "tcp"))) .clientId(clientId) .userName(username) .password(password) .cleanSession(false) + .will(will) .keepAlive(KeepAlive(timeSeconds = 30)) .build() @@ -129,11 +169,12 @@ class MainActivity : AppCompatActivity() { ), inactivityTimeoutSeconds = 45, activityCheckIntervalSeconds = 30, + connectPacketTimeoutSeconds = 5, incomingMessagesTTLSecs = 60, incomingMessagesCleanupIntervalSecs = 10, maxInflightMessagesLimit = 1000, ), - pingSender = WorkPingSenderFactory.createMqttPingSender(applicationContext, WorkManagerPingSenderConfig()) + pingSender = WorkPingSenderFactory.createMqttPingSender(applicationContext, WorkManagerPingSenderConfig(sendForcePing = true)) ) mqttClient = MqttClientFactory.create(this, mqttConfig) mqttClient.addEventHandler(eventHandler) @@ -141,7 +182,7 @@ class MainActivity : AppCompatActivity() { val configuration = Courier.Configuration( client = mqttClient, streamAdapterFactories = listOf(RxJava2StreamAdapterFactory()), - messageAdapterFactories = listOf(GsonMessageAdapterFactory()), + messageAdapterFactories = listOf(TextMessageAdapterFactory(), GsonMessageAdapterFactory()), logger = getLogger() ) val courier = Courier(configuration) diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index 14b0f7c3..dedf0152 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -34,8 +34,8 @@ plugins { } dependencies { - implementation("com.android.tools.build:gradle:7.0.3") - implementation("org.jetbrains.kotlin:kotlin-gradle-plugin:1.5.31") + implementation("com.android.tools.build:gradle:7.4.2") + implementation("org.jetbrains.kotlin:kotlin-gradle-plugin:1.6.21") implementation("io.gitlab.arturbosch.detekt:detekt-gradle-plugin:1.18.0") implementation("com.diffplug.spotless:spotless-plugin-gradle:6.5.0") } diff --git a/buildSrc/src/main/kotlin/deps.kt b/buildSrc/src/main/kotlin/deps.kt index fa26f2b9..08bde1e2 100644 --- a/buildSrc/src/main/kotlin/deps.kt +++ b/buildSrc/src/main/kotlin/deps.kt @@ -4,10 +4,10 @@ object versions { const val jacoco = "0.8.6" const val detekt = "1.18.0" - const val kotlin = "1.4.30" - const val agp = "7.0.3" + const val kotlin = "1.6.21" + const val agp = "7.4.2" const val jetifierProcessor = "1.0.0-beta10" - const val jfrogBuildInfoExtractor = "4.11.0" + const val jfrogBuildInfoExtractor = "4.23.4" const val navigation = "2.1.0-rc01" const val coroutines = "1.3.2" const val broadcast = "1.0.0" @@ -30,7 +30,7 @@ object versions { const val materialVersion = "1.3.0" const val annotationVersion = "1.2.0" const val coreKtxVersion = "1.3.0" - const val apiValidator = "0.6.0" + const val apiValidator = "0.14.0" const val workManager = "2.7.0" } @@ -45,11 +45,11 @@ object deps { const val protobuf = "com.google.protobuf:protobuf-lite:3.0.0" object build { - const val buildToolsVersion = "31.0.0" - const val compileSdkVersion = 31 + const val buildToolsVersion = "33.0.1" + const val compileSdkVersion = 34 const val minSdkVersion = 21 const val sampleMinSdkVersion = 21 - const val targetSdkVersion = 31 + const val targetSdkVersion = 34 } object test { @@ -134,4 +134,4 @@ object deps { const val runtime = "androidx.work:work-runtime:${versions.workManager}" const val runtime_2_6_0 = "androidx.work:work-runtime:2.6.0" } -} \ No newline at end of file +} diff --git a/courier-core-android/api/courier-core-android.api b/courier-core-android/api/courier-core-android.api index 540edd8d..e69de29b 100644 --- a/courier-core-android/api/courier-core-android.api +++ b/courier-core-android/api/courier-core-android.api @@ -1,3 +0,0 @@ -public final class com/gojek/courier/utils/extensions/PendingIntentExtensionsKt { -} - diff --git a/courier-core/api/courier-core.api b/courier-core/api/courier-core.api index edccd6e2..3b438791 100644 --- a/courier-core/api/courier-core.api +++ b/courier-core/api/courier-core.api @@ -52,14 +52,26 @@ public abstract interface class com/gojek/courier/StreamAdapter$Factory { public abstract fun create (Ljava/lang/reflect/Type;)Lcom/gojek/courier/StreamAdapter; } +public final class com/gojek/courier/callback/NoOpSendMessageCallback : com/gojek/courier/callback/SendMessageCallback { + public static final field INSTANCE Lcom/gojek/courier/callback/NoOpSendMessageCallback; + public fun onMessageSendFailure (Ljava/lang/Throwable;)V + public fun onMessageSendSuccess ()V + public fun onMessageSendTrigger ()V + public fun onMessageWrittenOnSocket ()V +} + +public abstract interface class com/gojek/courier/callback/SendMessageCallback { + public abstract fun onMessageSendFailure (Ljava/lang/Throwable;)V + public abstract fun onMessageSendSuccess ()V + public abstract fun onMessageSendTrigger ()V + public abstract fun onMessageWrittenOnSocket ()V +} + public final class com/gojek/courier/extensions/CollectionExtensionsKt { public static final fun toImmutableMap (Ljava/util/Map;)Ljava/util/Map; public static final fun toImmutableSet (Ljava/util/Set;)Ljava/util/Set; } -public final class com/gojek/courier/extensions/TimeUnitExtensionsKt { -} - public abstract interface class com/gojek/courier/logging/ILogger { public abstract fun d (Ljava/lang/String;Ljava/lang/String;)V public abstract fun d (Ljava/lang/String;Ljava/lang/String;Ljava/lang/Throwable;)V @@ -74,6 +86,3 @@ public abstract interface class com/gojek/courier/logging/ILogger { public abstract fun w (Ljava/lang/String;Ljava/lang/Throwable;)V } -public final class com/gojek/courier/utils/TypeUtils { -} - diff --git a/courier-core/src/main/java/com/gojek/courier/callback/NoOpSendMessageCallback.kt b/courier-core/src/main/java/com/gojek/courier/callback/NoOpSendMessageCallback.kt new file mode 100644 index 00000000..9ff86a4b --- /dev/null +++ b/courier-core/src/main/java/com/gojek/courier/callback/NoOpSendMessageCallback.kt @@ -0,0 +1,19 @@ +package com.gojek.courier.callback + +object NoOpSendMessageCallback : SendMessageCallback { + override fun onMessageSendTrigger() { + // no-op + } + + override fun onMessageWrittenOnSocket() { + // no-op + } + + override fun onMessageSendSuccess() { + // no-op + } + + override fun onMessageSendFailure(error: Throwable) { + // no-op + } +} diff --git a/courier-core/src/main/java/com/gojek/courier/callback/SendMessageCallback.kt b/courier-core/src/main/java/com/gojek/courier/callback/SendMessageCallback.kt new file mode 100644 index 00000000..dee5a54e --- /dev/null +++ b/courier-core/src/main/java/com/gojek/courier/callback/SendMessageCallback.kt @@ -0,0 +1,8 @@ +package com.gojek.courier.callback + +interface SendMessageCallback { + fun onMessageSendTrigger() + fun onMessageWrittenOnSocket() + fun onMessageSendSuccess() + fun onMessageSendFailure(error: Throwable) +} diff --git a/courier-core/src/main/java/com/gojek/courier/extensions/StringExtensions.kt b/courier-core/src/main/java/com/gojek/courier/extensions/StringExtensions.kt new file mode 100644 index 00000000..64d366d9 --- /dev/null +++ b/courier-core/src/main/java/com/gojek/courier/extensions/StringExtensions.kt @@ -0,0 +1,9 @@ +package com.gojek.courier.extensions + +import androidx.annotation.RestrictTo + +@RestrictTo(RestrictTo.Scope.LIBRARY) +fun String.isWildCardTopic(): Boolean { + return startsWith("+/") || contains("/+/") || endsWith("/+") || equals("+") || + endsWith("/#") || equals("#") +} diff --git a/courier/api/courier.api b/courier/api/courier.api index 825a2dca..67bfedde 100644 --- a/courier/api/courier.api +++ b/courier/api/courier.api @@ -31,6 +31,9 @@ public final class com/gojek/courier/Courier$Configuration { public fun toString ()Ljava/lang/String; } +public abstract interface annotation class com/gojek/courier/annotation/Callback : java/lang/annotation/Annotation { +} + public abstract interface annotation class com/gojek/courier/annotation/Data : java/lang/annotation/Annotation { } diff --git a/courier/src/main/java/com/gojek/courier/annotation/Callback.kt b/courier/src/main/java/com/gojek/courier/annotation/Callback.kt new file mode 100644 index 00000000..0ea543f7 --- /dev/null +++ b/courier/src/main/java/com/gojek/courier/annotation/Callback.kt @@ -0,0 +1,6 @@ +package com.gojek.courier.annotation + +@MustBeDocumented +@Target(AnnotationTarget.VALUE_PARAMETER) +@kotlin.annotation.Retention(AnnotationRetention.RUNTIME) +annotation class Callback diff --git a/courier/src/main/java/com/gojek/courier/annotation/parser/MethodAnnotationsParser.kt b/courier/src/main/java/com/gojek/courier/annotation/parser/MethodAnnotationsParser.kt index 9f5b3032..42b5afa1 100644 --- a/courier/src/main/java/com/gojek/courier/annotation/parser/MethodAnnotationsParser.kt +++ b/courier/src/main/java/com/gojek/courier/annotation/parser/MethodAnnotationsParser.kt @@ -1,6 +1,7 @@ package com.gojek.courier.annotation.parser import com.gojek.courier.QoS +import com.gojek.courier.annotation.Callback import com.gojek.courier.annotation.Data import com.gojek.courier.annotation.Path import com.gojek.courier.annotation.Receive @@ -13,6 +14,7 @@ import com.gojek.courier.argument.processor.ReceiveArgumentProcessor import com.gojek.courier.argument.processor.SendArgumentProcessor import com.gojek.courier.argument.processor.SubscriptionArgumentProcessor import com.gojek.courier.argument.processor.UnsubscriptionArgumentProcessor +import com.gojek.courier.callback.SendMessageCallback import com.gojek.courier.stub.StubMethod import com.gojek.courier.utils.MessageAdapterResolver import com.gojek.courier.utils.StreamAdapterResolver @@ -87,7 +89,8 @@ internal class MethodAnnotationsParser( val messageType = method.getDataParameterType(dataParameterIndex) val annotations = method.getDataParameterAnnotations(dataParameterIndex) val adapter = messageAdapterResolver.resolve(messageType, annotations) - val argumentProcessor = SendArgumentProcessor(pathMap, topic, dataParameterIndex) + val callbackIndex = method.getCallbackParameterIndex() + val argumentProcessor = SendArgumentProcessor(pathMap, topic, dataParameterIndex, callbackIndex) stubMethod = StubMethod.Send(adapter, qos, argumentProcessor) } @@ -134,28 +137,40 @@ internal class MethodAnnotationsParser( } private fun parseSubscribeAllMethodAnnotations(method: Method) { - method.requireReturnTypeIsOneOf(Void.TYPE) { - "SubscribeAll method must return Void: $method" - } - method.requireParameterTypes(ParameterizedType::class.java) { - "Only one argument with parameterized type is allowed" + method.requireReturnTypeIsOneOf(Void.TYPE, ParameterizedType::class.java) { + "Subscribe method must return Void or ParameterizedType: $method" } val annotations = method.parameterAnnotations[0].filter { it.isParameterAnnotation() } require(annotations.size == 1) { "A parameter must have one and only one parameter annotation" } require(method.parameterTypes[0] == Map::class.java) { - "Parameter should be of Map type $method" + "Parameter should be of Map type $method" } val actualTypeArguments = (method.genericParameterTypes[0] as ParameterizedType).actualTypeArguments require(actualTypeArguments[0] == String::class.java) { - "Parameter should be of Map type $method" + "Parameter should be of Map type $method" } require(actualTypeArguments[1].getRawType() == QoS::class.java) { - "Parameter should be of Map type $method" + "Parameter should be of Map type $method" + } + + if (method.genericReturnType == Void.TYPE) { + stubMethod = StubMethod.SubscribeAll + } else { + method.requireReturnTypeIsResolvable { + "Method return type must not include a type variable or wildcard: ${method.genericReturnType}" + } + + val streamType = method.genericReturnType as ParameterizedType + val messageType = streamType.getFirstTypeArgument() + + val streamAdapter = streamAdapterResolver.resolve(streamType) + val messageAdapter = messageAdapterResolver.resolve(messageType, method.annotations) + + stubMethod = StubMethod.SubscribeAllWithStream(messageAdapter, streamAdapter) } - stubMethod = StubMethod.SubscribeAll } private fun parseUnsubscribeMethodAnnotations(method: Method) { @@ -226,7 +241,7 @@ internal class MethodAnnotationsParser( private val PARAM_NAME_REGEX = Pattern.compile(PARAM) private fun Annotation.isParameterAnnotation(): Boolean { - return this is Path || this is Data || this is TopicMap + return this is Path || this is Data || this is TopicMap || this is Callback } private fun Annotation.isStubMethodAnnotation(): Boolean { @@ -268,6 +283,29 @@ internal class MethodAnnotationsParser( return index } + private fun Method.getCallbackParameterIndex(): Int { + var index = -1 + for (parameterIndex in parameterAnnotations.indices) { + val parameterAnnotations = parameterAnnotations[parameterIndex] + val annotations = parameterAnnotations.filter { it.isParameterAnnotation() } + require(annotations.size == 1) { + "A parameter must have one and only one parameter annotation: $parameterIndex" + } + if (annotations.first() is Callback) { + if (index == -1) { + index = parameterIndex + break + } else { + throw IllegalArgumentException("Multiple parameters found with @Callback annotation") + } + } + } + if (index != -1 && parameterTypes[index] != SendMessageCallback::class.java) { + throw IllegalArgumentException("Parameter annotated with @Callback should be of type SendMessageCallback: ${parameterTypes[index]}") + } + return index + } + private fun Method.getDataParameterType(index: Int): Type { return parameterTypes[index] } diff --git a/courier/src/main/java/com/gojek/courier/argument/processor/SendArgumentProcessor.kt b/courier/src/main/java/com/gojek/courier/argument/processor/SendArgumentProcessor.kt index f7bc0b3a..b5a8a43f 100644 --- a/courier/src/main/java/com/gojek/courier/argument/processor/SendArgumentProcessor.kt +++ b/courier/src/main/java/com/gojek/courier/argument/processor/SendArgumentProcessor.kt @@ -1,9 +1,13 @@ package com.gojek.courier.argument.processor +import com.gojek.courier.callback.NoOpSendMessageCallback +import com.gojek.courier.callback.SendMessageCallback + internal class SendArgumentProcessor( private val pathMap: Map, private val topic: String, - private val dataParameterIndex: Int + private val dataParameterIndex: Int, + private val callbackIndex: Int ) : ArgumentProcessor() { private var parsedTopic = topic @@ -21,4 +25,12 @@ internal class SendArgumentProcessor( fun getDataArgument(args: Array): Any { return args[dataParameterIndex] } + + fun getCallbackArgument(args: Array): SendMessageCallback { + return if (callbackIndex == -1) { + NoOpSendMessageCallback + } else { + args[callbackIndex] as SendMessageCallback + } + } } diff --git a/courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt b/courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt index c221db2a..ba3a15c8 100644 --- a/courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt +++ b/courier/src/main/java/com/gojek/courier/coordinator/Coordinator.kt @@ -16,10 +16,14 @@ import com.gojek.mqtt.client.model.ConnectionState import com.gojek.mqtt.client.model.MqttMessage import com.gojek.mqtt.event.EventHandler import com.gojek.mqtt.event.MqttEvent +import com.gojek.mqtt.event.MqttEvent.MqttSubscribeFailureEvent import io.reactivex.BackpressureStrategy import io.reactivex.Flowable import io.reactivex.FlowableOnSubscribe +import io.reactivex.disposables.CompositeDisposable import io.reactivex.schedulers.Schedulers +import io.reactivex.subjects.PublishSubject +import org.eclipse.paho.client.mqttv3.MqttException import org.reactivestreams.Subscriber import org.reactivestreams.Subscription @@ -28,15 +32,28 @@ internal class Coordinator( private val logger: ILogger ) : StubInterface.Callback { + private val eventSubject = PublishSubject.create { emitter -> + val eventHandler = object : EventHandler { + override fun onEvent(mqttEvent: MqttEvent) { + if (emitter.isDisposed.not()) { + emitter.onNext(mqttEvent) + } + } + } + client.addEventHandler(eventHandler) + emitter.setCancellable { client.removeEventHandler(eventHandler) } + } + @Synchronized override fun send(stubMethod: StubMethod.Send, args: Array): Any { logger.d("Coordinator", "Send method invoked") val data = stubMethod.argumentProcessor.getDataArgument(args) stubMethod.argumentProcessor.inject(args) val topic = stubMethod.argumentProcessor.getTopic() + val callback = stubMethod.argumentProcessor.getCallbackArgument(args) val message = stubMethod.messageAdapter.toMessage(topic, data) val qos = stubMethod.qos - val sent = client.send(message, topic, qos) + val sent = client.send(message, topic, qos, callback) logger.d("Coordinator", "Sending message on topic: $topic, qos: $qos, message: $data") return sent } @@ -105,7 +122,15 @@ internal class Coordinator( } } client.addMessageListener(topic, listener) - emitter.setCancellable { client.removeMessageListener(topic, listener) } + val eventDisposable = eventSubject.filter { event -> + isInvalidSubscriptionFailureEvent(event, topic) + }.subscribe { + client.removeMessageListener(topic, listener) + } + emitter.setCancellable { + client.removeMessageListener(topic, listener) + eventDisposable.dispose() + } }, BackpressureStrategy.BUFFER ) @@ -147,6 +172,57 @@ internal class Coordinator( return status } + override fun subscribeAllWithStream(stubMethod: StubMethod.SubscribeAllWithStream, args: Array): Any { + logger.d("Coordinator", "Subscribe method invoked for multiple topics") + val topicList = (args[0] as Map).toList() + if (topicList.size == 1) { + client.subscribe(topicList[0]) + } else { + client.subscribe(topicList[0], *topicList.toTypedArray().sliceArray(IntRange(1, topicList.size - 1))) + } + logger.d("Coordinator", "Subscribed topics: $topicList") + val flowable = Flowable.create( + FlowableOnSubscribe { emitter -> + val listener = object : MessageListener { + override fun onMessageReceived(mqttMessage: MqttMessage) { + if (emitter.isCancelled.not()) { + emitter.onNext(mqttMessage) + } + } + } + val eventDisposable = CompositeDisposable() + for (topic in topicList) { + client.addMessageListener(topic.first, listener) + eventDisposable.add( + eventSubject.filter { event -> + isInvalidSubscriptionFailureEvent(event, topic.first) + }.subscribe { + client.removeMessageListener(topic.first, listener) + } + ) + } + emitter.setCancellable { + for (topic in topicList) { + client.removeMessageListener(topic.first, listener) + eventDisposable.dispose() + } + } + }, + BackpressureStrategy.BUFFER + ) + + val stream = flowable + .observeOn(Schedulers.computation()) + .flatMap { mqttMessage -> + mqttMessage.message.adapt( + mqttMessage.topic, + stubMethod.messageAdapter + )?.let { Flowable.just(it) } ?: Flowable.empty() + } + .toStream() + return stubMethod.streamAdapter.adapt(stream) + } + override fun getEventStream(): Stream { return object : Stream { override fun start(observer: Observer): Disposable { @@ -210,4 +286,10 @@ internal class Coordinator( null } } + + private fun isInvalidSubscriptionFailureEvent(event: MqttEvent, topic: String): Boolean { + return event is MqttSubscribeFailureEvent && + event.topics.containsKey(topic) && + event.exception.reasonCode == MqttException.REASON_CODE_INVALID_SUBSCRIPTION.toInt() + } } diff --git a/courier/src/main/java/com/gojek/courier/stub/StubInterface.kt b/courier/src/main/java/com/gojek/courier/stub/StubInterface.kt index 35918cb6..9d14201c 100644 --- a/courier/src/main/java/com/gojek/courier/stub/StubInterface.kt +++ b/courier/src/main/java/com/gojek/courier/stub/StubInterface.kt @@ -28,6 +28,9 @@ internal class StubInterface( is StubMethod.SubscribeAll -> { callback.subscribeAll(stubMethod, args) } + is StubMethod.SubscribeAllWithStream -> { + callback.subscribeAllWithStream(stubMethod, args) + } is StubMethod.Unsubscribe -> { callback.unsubscribe(stubMethod, args) } @@ -41,6 +44,7 @@ internal class StubInterface( fun subscribeWithStream(stubMethod: StubMethod.SubscribeWithStream, args: Array): Any fun unsubscribe(stubMethod: StubMethod.Unsubscribe, args: Array): Any fun subscribeAll(stubMethod: StubMethod.SubscribeAll, args: Array): Any + fun subscribeAllWithStream(stubMethod: StubMethod.SubscribeAllWithStream, args: Array): Any fun getEventStream(): Stream fun getConnectionState(): ConnectionState } diff --git a/courier/src/main/java/com/gojek/courier/stub/StubMethod.kt b/courier/src/main/java/com/gojek/courier/stub/StubMethod.kt index 783d0d99..0d44215f 100644 --- a/courier/src/main/java/com/gojek/courier/stub/StubMethod.kt +++ b/courier/src/main/java/com/gojek/courier/stub/StubMethod.kt @@ -40,6 +40,11 @@ internal sealed class StubMethod { object SubscribeAll : StubMethod() + class SubscribeAllWithStream( + val messageAdapter: MessageAdapter, + val streamAdapter: StreamAdapter + ) : StubMethod() + class Unsubscribe( val argumentProcessor: UnsubscriptionArgumentProcessor ) : StubMethod() diff --git a/courier/src/main/java/com/gojek/courier/utils/RuntimePlatform.kt b/courier/src/main/java/com/gojek/courier/utils/RuntimePlatform.kt index f6b5e019..e9a59c34 100644 --- a/courier/src/main/java/com/gojek/courier/utils/RuntimePlatform.kt +++ b/courier/src/main/java/com/gojek/courier/utils/RuntimePlatform.kt @@ -1,7 +1,10 @@ package com.gojek.courier.utils +import android.os.Build.VERSION +import android.os.Build.VERSION_CODES import java.lang.invoke.MethodHandles.Lookup import java.lang.reflect.Method +import java.util.Optional internal sealed class RuntimePlatform { @@ -43,8 +46,12 @@ internal sealed class RuntimePlatform { fun get(): RuntimePlatform = PLATFORM private fun findPlatform(): RuntimePlatform = try { - Class.forName("java.util.Optional") - Java8() + if (VERSION.SDK_INT >= VERSION_CODES.N) { + Class.forName(Optional::class.java.name) + Java8() + } else { + Default() + } } catch (ignored: ClassNotFoundException) { Default() } diff --git a/docs/docs/CourierService.md b/docs/docs/CourierService.md index 2885eca7..72e5677b 100644 --- a/docs/docs/CourierService.md +++ b/docs/docs/CourierService.md @@ -4,7 +4,7 @@ Courier provides the functionalities like Send, Receive, Subscribe, Unsubscribe ### Usage -Declare a service interface for various actions like Send, Receive, Subscribe, Unsubscribe. +Declare a service interface for various actions like Send, Receive, Subscribe, SubscribeMultiple, Unsubscribe. ~~~ kotlin interface MessageService { @@ -16,12 +16,17 @@ interface MessageService { @Subscribe(topic = "topic/{id}/receive", qos = QoS.ONE) fun subscribe(@Path("id") identifier: String): Observable + + @SubscribeMultiple + fun subscribe(@TopicMap topicMap: Map): Observable @Unsubscribe(topics = ["topic/{id}/receive"]) fun unsubscribe(@Path("id") identifier: String) } ~~~ + + Use Courier to create an implementation of service interface. ~~~ kotlin @@ -44,12 +49,14 @@ Following annotations are supported for service interface. - **@Subscribe** : A method annotation used for subscribing a single topic over the MQTT connection. -- **@SubscribeMultiple** : A method annotation used for subscribing multiple topic over the MQTT connection. +- **@SubscribeMultiple** : A method annotation used for subscribing multiple topics over the MQTT connection. -- **@Unsubscribe** : A method annotation used for unsubscribing a single topic over the MQTT connection. +- **@Unsubscribe** : A method annotation used for unsubscribing topics over the MQTT connection. - **@Path** : A parameter annotation used for specifying a path variable in an MQTT topic. - **@Data** : A parameter annotation used for specifying the message object while sending a message over the MQTT connection. -- **@TopicMap** : A parameter annotation used for specifying a topic map. It is always used while subscribing multiple topics. \ No newline at end of file +- **@TopicMap** : A parameter annotation used for specifying a topic map. It is always used while subscribing multiple topics. + +**Note** : While subscribing topics using `@SubscribeMultiple` along with a stream, make sure that messages received on all topics follow same format or a message adapter is added for handling different format. \ No newline at end of file diff --git a/docs/docs/Installation.md b/docs/docs/Installation.md index 447acb62..951a8426 100644 --- a/docs/docs/Installation.md +++ b/docs/docs/Installation.md @@ -3,8 +3,8 @@ ## Supported SDK versions - minSdkVersion: 21 -- targetSdkVersion: 31 -- compileSdkVersion: 31 +- targetSdkVersion: 34 +- compileSdkVersion: 34 ## Download [![Maven Central](https://img.shields.io/maven-central/v/com.gojek.courier/courier.svg?label=Maven%20Central)](https://search.maven.org/search?q=g:%22com.gojek.courier%22%20AND%20a:%courier%22) diff --git a/docs/docs/SubscribeUnsubscribe.md b/docs/docs/SubscribeUnsubscribe.md index f20ab07b..1b2649ee 100644 --- a/docs/docs/SubscribeUnsubscribe.md +++ b/docs/docs/SubscribeUnsubscribe.md @@ -10,7 +10,7 @@ interface MessageService { fun subscribe(@Path("id") identifier: String): Observable @SubscribeMultiple - fun subscribeMultiple(@TopicMap topics: Map) + fun subscribeMultiple(@TopicMap topics: Map): Observable @Unsubscribe(topics = ["topic/{id}/receive"]) fun unsubscribe(@Path("id") identifier: String) @@ -22,7 +22,9 @@ messageService.subscribe("user-id").subscribe { message -> print(message) } -messageService.subscribeMultiple(mapOf("topic1" to QoS.ONE, "topic2" to QoS.TWO)) +messageService.subscribeMultiple(mapOf("topic1" to QoS.ONE, "topic2" to QoS.TWO)).subscribe { message -> + print(message) +} messageService.unsubscribe("user-id") ~~~ @@ -35,6 +37,6 @@ mqttClient.subscribe("topic1" to QoS.ZERO, "topic2" to QoS.ONE) mqttClient.unsubscribe("topic1", "topic2") ~~~ - +**Note** : While subscribing topics using `@SubscribeMultiple` along with a stream, make sure that messages received on all topics follow same format or a message adapter is added for handling different format. diff --git a/gradle/ci-configs/gradle-wrapper.properties b/gradle/ci-configs/gradle-wrapper.properties index ab716dda..53d86592 100644 --- a/gradle/ci-configs/gradle-wrapper.properties +++ b/gradle/ci-configs/gradle-wrapper.properties @@ -2,4 +2,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.0.2-all.zip \ No newline at end of file +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.4-bin.zip \ No newline at end of file diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index db449532..49ac158f 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.0.2-all.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.4-bin.zip diff --git a/mqtt-client/api/mqtt-client.api b/mqtt-client/api/mqtt-client.api index a90dd941..a6a89361 100644 --- a/mqtt-client/api/mqtt-client.api +++ b/mqtt-client/api/mqtt-client.api @@ -12,13 +12,14 @@ public abstract interface class com/gojek/mqtt/client/MqttClient { public abstract fun reconnect ()V public abstract fun removeEventHandler (Lcom/gojek/mqtt/event/EventHandler;)V public abstract fun removeMessageListener (Ljava/lang/String;Lcom/gojek/mqtt/client/listener/MessageListener;)V - public abstract fun send (Lcom/gojek/courier/Message;Ljava/lang/String;Lcom/gojek/courier/QoS;)Z + public abstract fun send (Lcom/gojek/courier/Message;Ljava/lang/String;Lcom/gojek/courier/QoS;Lcom/gojek/courier/callback/SendMessageCallback;)Z public abstract fun subscribe (Lkotlin/Pair;[Lkotlin/Pair;)V public abstract fun unsubscribe (Ljava/lang/String;[Ljava/lang/String;)V } public final class com/gojek/mqtt/client/MqttClient$DefaultImpls { public static synthetic fun disconnect$default (Lcom/gojek/mqtt/client/MqttClient;ZILjava/lang/Object;)V + public static synthetic fun send$default (Lcom/gojek/mqtt/client/MqttClient;Lcom/gojek/courier/Message;Ljava/lang/String;Lcom/gojek/courier/QoS;Lcom/gojek/courier/callback/SendMessageCallback;ILjava/lang/Object;)Z } public abstract interface class com/gojek/mqtt/client/MqttInterceptor { @@ -28,22 +29,24 @@ public abstract interface class com/gojek/mqtt/client/MqttInterceptor { public final class com/gojek/mqtt/client/config/ExperimentConfigs { public fun ()V - public fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJZI)V - public synthetic fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJZIILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)V + public synthetic fun (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()Lcom/gojek/mqtt/client/config/SubscriptionStore; + public final fun component10 ()I public final fun component2 ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig; public final fun component3 ()I public final fun component4 ()I public final fun component5 ()I - public final fun component6 ()J + public final fun component6 ()I public final fun component7 ()J - public final fun component8 ()Z - public final fun component9 ()I - public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJZI)Lcom/gojek/mqtt/client/config/ExperimentConfigs; - public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIJJZIILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs; + public final fun component8 ()J + public final fun component9 ()Z + public final fun copy (Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZI)Lcom/gojek/mqtt/client/config/ExperimentConfigs; + public static synthetic fun copy$default (Lcom/gojek/mqtt/client/config/ExperimentConfigs;Lcom/gojek/mqtt/client/config/SubscriptionStore;Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig;IIIIJJZIILjava/lang/Object;)Lcom/gojek/mqtt/client/config/ExperimentConfigs; public fun equals (Ljava/lang/Object;)Z public final fun getActivityCheckIntervalSeconds ()I public final fun getAdaptiveKeepAliveConfig ()Lcom/gojek/mqtt/model/AdaptiveKeepAliveConfig; + public final fun getConnectPacketTimeoutSeconds ()I public final fun getInactivityTimeoutSeconds ()I public final fun getIncomingMessagesCleanupIntervalSecs ()J public final fun getIncomingMessagesTTLSecs ()J @@ -132,7 +135,7 @@ public final class com/gojek/mqtt/client/config/v3/MqttV3Configuration : com/goj } public final class com/gojek/mqtt/client/connectioninfo/ConnectionInfo { - public fun (Ljava/lang/String;Ljava/lang/String;IILjava/lang/String;ILjava/lang/String;)V + public fun (Ljava/lang/String;Ljava/lang/String;IILjava/lang/String;ILjava/lang/String;Z)V public final fun component1 ()Ljava/lang/String; public final fun component2 ()Ljava/lang/String; public final fun component3 ()I @@ -140,9 +143,11 @@ public final class com/gojek/mqtt/client/connectioninfo/ConnectionInfo { public final fun component5 ()Ljava/lang/String; public final fun component6 ()I public final fun component7 ()Ljava/lang/String; - public final fun copy (Ljava/lang/String;Ljava/lang/String;IILjava/lang/String;ILjava/lang/String;)Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; - public static synthetic fun copy$default (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;Ljava/lang/String;Ljava/lang/String;IILjava/lang/String;ILjava/lang/String;ILjava/lang/Object;)Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public final fun component8 ()Z + public final fun copy (Ljava/lang/String;Ljava/lang/String;IILjava/lang/String;ILjava/lang/String;Z)Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public static synthetic fun copy$default (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;Ljava/lang/String;Ljava/lang/String;IILjava/lang/String;ILjava/lang/String;ZILjava/lang/Object;)Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; public fun equals (Ljava/lang/Object;)Z + public final fun getCleanSession ()Z public final fun getClientId ()Ljava/lang/String; public final fun getConnectTimeout ()I public final fun getHost ()Ljava/lang/String; @@ -378,16 +383,18 @@ public final class com/gojek/mqtt/event/MqttEvent$MqttConnectFailureEvent : com/ } public final class com/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent : com/gojek/mqtt/event/MqttEvent { - public fun (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V - public synthetic fun (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V + public synthetic fun (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()Lcom/gojek/mqtt/network/ActiveNetInfo; public final fun component2 ()Lcom/gojek/mqtt/model/ServerUri; public final fun component3 ()J - public final fun component4 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; - public final fun copy (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent; - public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent;Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent; + public final fun component4 ()J + public final fun component5 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public final fun copy (Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent; + public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent;Lcom/gojek/mqtt/network/ActiveNetInfo;Lcom/gojek/mqtt/model/ServerUri;JJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttConnectSuccessEvent; public fun equals (Ljava/lang/Object;)Z public final fun getActiveNetInfo ()Lcom/gojek/mqtt/network/ActiveNetInfo; + public final fun getConnectPacketRTTime ()J public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; public final fun getServerUri ()Lcom/gojek/mqtt/model/ServerUri; public final fun getTimeTakenMillis ()J @@ -520,20 +527,22 @@ public final class com/gojek/mqtt/event/MqttEvent$MqttMessageSendEvent : com/goj } public final class com/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent : com/gojek/mqtt/event/MqttEvent { - public fun (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V - public synthetic fun (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V + public synthetic fun (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()Ljava/lang/String; public final fun component2 ()I public final fun component3 ()I public final fun component4 ()Lcom/gojek/mqtt/exception/CourierException; - public final fun component5 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; - public final fun copy (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent; - public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent;Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent; + public final fun component5 ()J + public final fun component6 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public final fun copy (Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent; + public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent;Ljava/lang/String;IILcom/gojek/mqtt/exception/CourierException;JLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent; public fun equals (Ljava/lang/Object;)Z public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; public final fun getException ()Lcom/gojek/mqtt/exception/CourierException; public final fun getQos ()I public final fun getSizeBytes ()I + public final fun getTimeTakenMillis ()J public final fun getTopic ()Ljava/lang/String; public fun hashCode ()I public fun setConnectionInfo (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V @@ -541,18 +550,20 @@ public final class com/gojek/mqtt/event/MqttEvent$MqttMessageSendFailureEvent : } public final class com/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent : com/gojek/mqtt/event/MqttEvent { - public fun (Ljava/lang/String;IILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V - public synthetic fun (Ljava/lang/String;IILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (Ljava/lang/String;IIJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V + public synthetic fun (Ljava/lang/String;IIJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()Ljava/lang/String; public final fun component2 ()I public final fun component3 ()I - public final fun component4 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; - public final fun copy (Ljava/lang/String;IILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent; - public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent;Ljava/lang/String;IILcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent; + public final fun component4 ()J + public final fun component5 ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; + public final fun copy (Ljava/lang/String;IIJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent; + public static synthetic fun copy$default (Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent;Ljava/lang/String;IIJLcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;ILjava/lang/Object;)Lcom/gojek/mqtt/event/MqttEvent$MqttMessageSendSuccessEvent; public fun equals (Ljava/lang/Object;)Z public fun getConnectionInfo ()Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo; public final fun getQos ()I public final fun getSizeBytes ()I + public final fun getTimeTakenMillis ()J public final fun getTopic ()Ljava/lang/String; public fun hashCode ()I public fun setConnectionInfo (Lcom/gojek/mqtt/client/connectioninfo/ConnectionInfo;)V @@ -1059,6 +1070,7 @@ public final class com/gojek/mqtt/model/MqttConnectOptions { public final fun getUserPropertiesMap ()Ljava/util/Map; public final fun getUsername ()Ljava/lang/String; public final fun getVersion ()Lcom/gojek/mqtt/model/MqttVersion; + public final fun getWill ()Lcom/gojek/mqtt/model/Will; public final fun getX509TrustManager ()Ljavax/net/ssl/X509TrustManager; public final fun isCleanSession ()Z public final fun newBuilder ()Lcom/gojek/mqtt/model/MqttConnectOptions$Builder; @@ -1069,6 +1081,7 @@ public final class com/gojek/mqtt/model/MqttConnectOptions$Builder { public final fun alpnProtocols (Ljava/util/List;)Lcom/gojek/mqtt/model/MqttConnectOptions$Builder; public final fun build ()Lcom/gojek/mqtt/model/MqttConnectOptions; public final fun cleanSession (Z)Lcom/gojek/mqtt/model/MqttConnectOptions$Builder; + public final fun clearWill ()Lcom/gojek/mqtt/model/MqttConnectOptions$Builder; public final fun clientId (Ljava/lang/String;)Lcom/gojek/mqtt/model/MqttConnectOptions$Builder; public final fun connectionSpec (Lin/mohalla/paho/client/mqttv3/ConnectionSpec;)Lcom/gojek/mqtt/model/MqttConnectOptions$Builder; public final fun keepAlive (Lcom/gojek/mqtt/model/KeepAlive;)Lcom/gojek/mqtt/model/MqttConnectOptions$Builder; @@ -1080,6 +1093,7 @@ public final class com/gojek/mqtt/model/MqttConnectOptions$Builder { public final fun sslSocketFactory (Ljavax/net/ssl/SSLSocketFactory;Ljavax/net/ssl/X509TrustManager;)Lcom/gojek/mqtt/model/MqttConnectOptions$Builder; public final fun userName (Ljava/lang/String;)Lcom/gojek/mqtt/model/MqttConnectOptions$Builder; public final fun userProperties (Ljava/util/Map;)Lcom/gojek/mqtt/model/MqttConnectOptions$Builder; + public final fun will (Lcom/gojek/mqtt/model/Will;)Lcom/gojek/mqtt/model/MqttConnectOptions$Builder; } public final class com/gojek/mqtt/model/MqttConnectOptions$Companion { @@ -1108,6 +1122,23 @@ public final class com/gojek/mqtt/model/ServerUri { public fun toString ()Ljava/lang/String; } +public final class com/gojek/mqtt/model/Will { + public fun (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/courier/QoS;Z)V + public final fun component1 ()Ljava/lang/String; + public final fun component2 ()Ljava/lang/String; + public final fun component3 ()Lcom/gojek/courier/QoS; + public final fun component4 ()Z + public final fun copy (Ljava/lang/String;Ljava/lang/String;Lcom/gojek/courier/QoS;Z)Lcom/gojek/mqtt/model/Will; + public static synthetic fun copy$default (Lcom/gojek/mqtt/model/Will;Ljava/lang/String;Ljava/lang/String;Lcom/gojek/courier/QoS;ZILjava/lang/Object;)Lcom/gojek/mqtt/model/Will; + public fun equals (Ljava/lang/Object;)Z + public final fun getMessage ()Ljava/lang/String; + public final fun getQos ()Lcom/gojek/courier/QoS; + public final fun getRetained ()Z + public final fun getTopic ()Ljava/lang/String; + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + public final class com/gojek/mqtt/network/ActiveNetInfo { public fun (ZZS)V public final fun component1 ()Z @@ -1156,18 +1187,16 @@ public final class com/gojek/mqtt/policies/connectretrytime/ConnectRetryTimeConf public final class com/gojek/mqtt/policies/connectretrytime/ConnectRetryTimePolicy : com/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy { public fun (Lcom/gojek/mqtt/policies/connectretrytime/ConnectRetryTimeConfig;)V - public fun getConnRetryTimeSecs ()I public fun getConnRetryTimeSecs (Z)I - public fun getCurrentRetryTime ()I - public fun getRetryCount ()I public fun resetParams ()V } public abstract interface class com/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy : com/gojek/mqtt/policies/IFallbackPolicy { - public abstract fun getConnRetryTimeSecs ()I public abstract fun getConnRetryTimeSecs (Z)I - public abstract fun getCurrentRetryTime ()I - public abstract fun getRetryCount ()I +} + +public final class com/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy$DefaultImpls { + public static synthetic fun getConnRetryTimeSecs$default (Lcom/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy;ZILjava/lang/Object;)I } public final class com/gojek/mqtt/policies/connecttimeout/ConnectTimeoutConfig { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/IncomingMsgControllerImpl.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/IncomingMsgControllerImpl.kt index d36a1688..f8782c99 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/IncomingMsgControllerImpl.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/IncomingMsgControllerImpl.kt @@ -1,6 +1,7 @@ package com.gojek.mqtt.client import com.gojek.courier.extensions.fromSecondsToNanos +import com.gojek.courier.extensions.isWildCardTopic import com.gojek.courier.logging.ILogger import com.gojek.courier.utils.Clock import com.gojek.mqtt.client.listener.MessageListener @@ -47,6 +48,8 @@ internal class IncomingMsgControllerImpl( private val listenerMap = ConcurrentHashMap>() + private val wildcardTopicListenerMap = ConcurrentHashMap>() + private var cleanupFuture: ScheduledFuture<*>? = null override fun triggerHandleMessage() { @@ -64,40 +67,68 @@ internal class IncomingMsgControllerImpl( @Synchronized override fun registerListener(topic: String, listener: MessageListener) { - listenerMap[topic] = (listenerMap[topic] ?: emptyList()) + listener + if (topic.isWildCardTopic()) { + wildcardTopicListenerMap[topic] = (wildcardTopicListenerMap[topic] ?: emptyList()) + listener + } else { + listenerMap[topic] = (listenerMap[topic] ?: emptyList()) + listener + } triggerHandleMessage() } @Synchronized override fun unregisterListener(topic: String, listener: MessageListener) { - listenerMap[topic] = (listenerMap[topic] ?: emptyList()) - listener - if (listenerMap[topic]!!.isEmpty()) { - listenerMap.remove(topic) + if (topic.isWildCardTopic()) { + wildcardTopicListenerMap[topic] = (wildcardTopicListenerMap[topic] ?: emptyList()) - listener + if (wildcardTopicListenerMap[topic]!!.isEmpty()) { + wildcardTopicListenerMap.remove(topic) + } + } else { + listenerMap[topic] = (listenerMap[topic] ?: emptyList()) - listener + if (listenerMap[topic]!!.isEmpty()) { + listenerMap.remove(topic) + } } } private inner class HandleMessage : Runnable { override fun run() { try { - if (listenerMap.keys.isEmpty()) { + if (listenerMap.keys.isEmpty() && wildcardTopicListenerMap.isEmpty()) { logger.d(TAG, "No listeners registered") return } val messages: List = mqttReceivePersistence.getAllIncomingMessagesWithTopicFilter(listenerMap.keys) - if (mqttUtils.isEmpty(messages)) { - logger.d(TAG, "No Messages in Table") - return - } val deletedMsgIds = mutableListOf() for (message in messages) { logger.d(TAG, "Going to process ${message.messageId}") - val listenersNotified = notifyListeners(message) + val listenersNotified = notifyListeners(message, listenerMap[message.topic]!!) if (listenersNotified) { deletedMsgIds.add(message.messageId) } logger.d(TAG, "Successfully Processed Message ${message.messageId}") } + // processing messages for wildcard topic subscription + for (wildCardTopic in wildcardTopicListenerMap.keys()) { + val topicForDBQuery = parseWildCardTopicForDBQuery(wildCardTopic) + val wildcardMessages: List = + mqttReceivePersistence.getAllIncomingMessagesForWildCardTopic(topicForDBQuery) + for (message in wildcardMessages) { + logger.d(TAG, "Going to process ${message.messageId}") + val wildCardTopicRegex = parseWildCardTopicForRegex(wildCardTopic) + if (wildCardTopicRegex.matches(message.topic)) { + logger.d(TAG, "Wildcard topic: $wildCardTopic matches ${message.topic}") + val listenersNotified = + notifyListeners(message, wildcardTopicListenerMap[wildCardTopic]!!) + if (listenersNotified) { + deletedMsgIds.add(message.messageId) + } + } else { + logger.d(TAG, "Wildcard topic: $wildCardTopic does not match ${message.topic}") + } + logger.d(TAG, "Successfully Processed Message ${message.messageId}") + } + } if (deletedMsgIds.isNotEmpty()) { val deletedMessagesCount = deleteMessages(deletedMsgIds) logger.d(TAG, "Deleted $deletedMessagesCount messages") @@ -112,6 +143,18 @@ internal class IncomingMsgControllerImpl( } } + private fun parseWildCardTopicForDBQuery(topic: String): String { + var updatedTopic: String = topic.replace("+", "%") + updatedTopic = updatedTopic.replace("#", "%") + return updatedTopic + } + + private fun parseWildCardTopicForRegex(topic: String): Regex { + var updatedTopic: String = topic.replace("+", "[^\\/]+") + updatedTopic = updatedTopic.replace("#", "([^\\/]+(\\/?[^\\/])*)+") + return Regex(updatedTopic) + } + private inner class CleanupExpiredMessages : Runnable { override fun run() { logger.d(TAG, "Deleting expired messages") @@ -123,10 +166,10 @@ internal class IncomingMsgControllerImpl( } } - private fun notifyListeners(message: MqttReceivePacket): Boolean { + private fun notifyListeners(message: MqttReceivePacket, listeners: List): Boolean { var notified = false try { - listenerMap[message.topic]!!.forEach { + listeners.forEach { notified = true it.onMessageReceived(message.toMqttMessage()) } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttClient.kt index 2aadb27f..f8138c1c 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttClient.kt @@ -2,6 +2,8 @@ package com.gojek.mqtt.client import com.gojek.courier.Message import com.gojek.courier.QoS +import com.gojek.courier.callback.NoOpSendMessageCallback +import com.gojek.courier.callback.SendMessageCallback import com.gojek.mqtt.client.listener.MessageListener import com.gojek.mqtt.client.model.ConnectionState import com.gojek.mqtt.event.EventHandler @@ -14,7 +16,12 @@ interface MqttClient { fun reconnect() fun subscribe(topic: Pair, vararg topics: Pair) fun unsubscribe(topic: String, vararg topics: String) - fun send(message: Message, topic: String, qos: QoS): Boolean + fun send( + message: Message, + topic: String, + qos: QoS, + sendMessageCallback: SendMessageCallback = NoOpSendMessageCallback + ): Boolean fun addMessageListener(topic: String, listener: MessageListener) fun removeMessageListener(topic: String, listener: MessageListener) fun addGlobalMessageListener(listener: MessageListener) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttCourierClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttCourierClient.kt index a0643119..d8d87c3f 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttCourierClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/MqttCourierClient.kt @@ -2,6 +2,7 @@ package com.gojek.mqtt.client import com.gojek.courier.Message import com.gojek.courier.QoS +import com.gojek.courier.callback.SendMessageCallback import com.gojek.mqtt.client.internal.MqttClientInternal import com.gojek.mqtt.client.listener.MessageListener import com.gojek.mqtt.client.model.ConnectionState @@ -36,8 +37,8 @@ internal class MqttCourierClient( mqttClient.unsubscribe(topic, *topics) } - override fun send(message: Message, topic: String, qos: QoS): Boolean { - return mqttClient.send(MqttPacket((message as Message.Bytes).value, topic, qos)) + override fun send(message: Message, topic: String, qos: QoS, sendMessageCallback: SendMessageCallback): Boolean { + return mqttClient.send(MqttPacket((message as Message.Bytes).value, topic, qos), sendMessageCallback) } override fun addMessageListener(topic: String, listener: MessageListener) { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt index d0421ca0..61299a8a 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/config/ExperimentConfigs.kt @@ -12,6 +12,7 @@ data class ExperimentConfigs( val adaptiveKeepAliveConfig: AdaptiveKeepAliveConfig? = null, val activityCheckIntervalSeconds: Int = DEFAULT_ACTIVITY_CHECK_INTERVAL_SECS, val inactivityTimeoutSeconds: Int = DEFAULT_INACTIVITY_TIMEOUT_SECS, + val connectPacketTimeoutSeconds: Int = DEFAULT_INACTIVITY_TIMEOUT_SECS, val policyResetTimeSeconds: Int = DEFAULT_POLICY_RESET_TIME_SECS, val incomingMessagesTTLSecs: Long = 360, val incomingMessagesCleanupIntervalSecs: Long = 60, diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/connectioninfo/ConnectionInfo.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/connectioninfo/ConnectionInfo.kt index 2a5ba884..80c209cb 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/connectioninfo/ConnectionInfo.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/connectioninfo/ConnectionInfo.kt @@ -7,5 +7,6 @@ data class ConnectionInfo( val connectTimeout: Int, val host: String, val port: Int, - val scheme: String + val scheme: String, + val cleanSession: Boolean ) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/event/adapter/MqttClientEventAdapter.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/event/adapter/MqttClientEventAdapter.kt index 8e29bfda..e4ca35e0 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/event/adapter/MqttClientEventAdapter.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/event/adapter/MqttClientEventAdapter.kt @@ -1,6 +1,7 @@ package com.gojek.mqtt.client.event.adapter import com.gojek.courier.QoS +import com.gojek.courier.extensions.fromNanosToMillis import com.gojek.mqtt.connection.event.ConnectionEventHandler import com.gojek.mqtt.event.EventHandler import com.gojek.mqtt.event.MqttEvent.ConnectPacketSendEvent @@ -34,6 +35,9 @@ internal class MqttClientEventAdapter( private val eventHandler: EventHandler, private val networkHandler: NetworkHandler ) { + + private var connectPacketSendTime = 0L + fun adapt(): ConnectionEventHandler { return object : ConnectionEventHandler { override fun onMqttConnectAttempt( @@ -57,7 +61,8 @@ internal class MqttClientEventAdapter( MqttConnectSuccessEvent( activeNetInfo = networkHandler.getActiveNetworkInfo(), serverUri = serverUri, - timeTakenMillis = timeTakenMillis + timeTakenMillis = timeTakenMillis, + connectPacketRTTime = (System.nanoTime() - connectPacketSendTime).fromNanosToMillis() ) ) } @@ -213,6 +218,7 @@ internal class MqttClientEventAdapter( } override fun onConnectPacketSend() { + connectPacketSendTime = System.nanoTime() eventHandler.onEvent(ConnectPacketSendEvent()) } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/event/interceptor/MqttEventHandler.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/event/interceptor/MqttEventHandler.kt index 028c5cd0..f67e7f5f 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/event/interceptor/MqttEventHandler.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/event/interceptor/MqttEventHandler.kt @@ -2,10 +2,24 @@ package com.gojek.mqtt.client.event.interceptor import com.gojek.mqtt.event.EventHandler import com.gojek.mqtt.event.MqttEvent +import com.gojek.mqtt.utils.MqttUtils import java.util.concurrent.CopyOnWriteArrayList - -internal class MqttEventHandler : EventHandler { - +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit.SECONDS + +internal class MqttEventHandler( + mqttUtils: MqttUtils +) : EventHandler { + + private val eventScheduler = ThreadPoolExecutor( + /* corePoolSize = */ 1, + /* maximumPoolSize = */ 1, + /* keepAliveTime = */ 300, + /* unit = */ SECONDS, + /* workQueue = */ LinkedBlockingQueue(), + /* threadFactory = */ mqttUtils.threadFactory("mqtt-event-handler", false) + ).apply { allowCoreThreadTimeOut(true) } private val interceptorList = CopyOnWriteArrayList() private val eventHandlers = CopyOnWriteArrayList() @@ -18,7 +32,11 @@ internal class MqttEventHandler : EventHandler { interceptorList.forEach { event = it.intercept(event) } - eventHandlers.forEach { it.onEvent(mqttEvent) } + if (eventHandlers.isNotEmpty()) { + eventScheduler.submit { + eventHandlers.forEach { it.onEvent(mqttEvent) } + } + } } fun addEventHandler(handler: EventHandler) { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt index ea2259b2..cbd51033 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/internal/MqttClientInternal.kt @@ -2,6 +2,7 @@ package com.gojek.mqtt.client.internal import android.content.Context import com.gojek.courier.QoS +import com.gojek.courier.callback.SendMessageCallback import com.gojek.keepalive.KeepAliveFailureHandler import com.gojek.keepalive.NoOpKeepAliveFailureHandler import com.gojek.keepalive.OptimalKeepAliveFailureHandler @@ -21,6 +22,7 @@ import com.gojek.mqtt.event.PingEventHandler import com.gojek.mqtt.model.AdaptiveKeepAliveConfig import com.gojek.mqtt.model.MqttConnectOptions import com.gojek.mqtt.model.MqttPacket +import com.gojek.mqtt.utils.MqttUtils import com.gojek.networktracker.NetworkStateTrackerFactory internal class MqttClientInternal( @@ -41,7 +43,7 @@ internal class MqttClientInternal( private var keepAliveProvider: KeepAliveProvider = NonAdaptiveKeepAliveProvider() private var keepAliveFailureHandler: KeepAliveFailureHandler = NoOpKeepAliveFailureHandler() - private val eventHandler = MqttEventHandler() + private val eventHandler = MqttEventHandler(MqttUtils()) private val optimalKeepAliveObserver = object : OptimalKeepAliveObserver { override fun onOptimalKeepAliveFound( @@ -97,8 +99,8 @@ internal class MqttClientInternal( androidMqttClient.unsubscribe(listOf(*topics)) } - fun send(mqttPacket: MqttPacket): Boolean { - return androidMqttClient.send(mqttPacket) + fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean { + return androidMqttClient.send(mqttPacket, sendMessageCallback) } fun addMessageListener(topic: String, listener: MessageListener) { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/model/MqttSendPacket.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/model/MqttSendPacket.kt index 50c96220..e89c20b6 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/model/MqttSendPacket.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/model/MqttSendPacket.kt @@ -2,6 +2,8 @@ package com.gojek.mqtt.client.model import android.os.Parcel import android.os.Parcelable +import com.gojek.courier.callback.NoOpSendMessageCallback +import com.gojek.courier.callback.SendMessageCallback internal data class MqttSendPacket( var message: ByteArray, @@ -9,7 +11,9 @@ internal data class MqttSendPacket( var timestamp: Long, var qos: Int, var topic: String, - var type: Int + var type: Int, + var triggerTime: Long, + var sendMessageCallback: SendMessageCallback ) : Parcelable { constructor(parcel: Parcel) : this( parcel.createByteArray()!!, @@ -17,7 +21,9 @@ internal data class MqttSendPacket( parcel.readLong(), parcel.readInt(), parcel.readString()!!, - parcel.readInt() + parcel.readInt(), + parcel.readLong(), + NoOpSendMessageCallback ) override fun writeToParcel(parcel: Parcel, flags: Int) { @@ -27,6 +33,7 @@ internal data class MqttSendPacket( parcel.writeInt(qos) parcel.writeString(topic) parcel.writeInt(type) + parcel.writeLong(triggerTime) } override fun describeContents(): Int { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/IAndroidMqttClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/IAndroidMqttClient.kt index 54443e40..feb6b164 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/IAndroidMqttClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/IAndroidMqttClient.kt @@ -1,6 +1,7 @@ package com.gojek.mqtt.client.v3 import com.gojek.courier.QoS +import com.gojek.courier.callback.SendMessageCallback import com.gojek.mqtt.client.listener.MessageListener import com.gojek.mqtt.client.model.ConnectionState import com.gojek.mqtt.constants.MQTT_WAIT_BEFORE_RECONNECT_TIME_MS @@ -12,7 +13,7 @@ internal interface IAndroidMqttClient { fun connect(timeMillis: Long = MQTT_WAIT_BEFORE_RECONNECT_TIME_MS) fun reconnect() fun disconnect(clearState: Boolean = false) - fun send(mqttPacket: MqttPacket): Boolean + fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean fun addMessageListener(topic: String, listener: MessageListener) fun removeMessageListener(topic: String, listener: MessageListener) fun addGlobalMessageListener(listener: MessageListener) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt index 0725cfcb..ffe4eef7 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/client/v3/impl/AndroidMqttClient.kt @@ -10,6 +10,7 @@ import android.os.Messenger import android.os.RemoteException import androidx.annotation.RequiresApi import com.gojek.courier.QoS +import com.gojek.courier.callback.SendMessageCallback import com.gojek.courier.exception.AuthApiException import com.gojek.courier.extensions.fromNanosToMillis import com.gojek.courier.logging.ILogger @@ -178,7 +179,8 @@ internal class AndroidMqttClient( persistenceOptions = mqttConfiguration.persistenceOptions, inactivityTimeoutSeconds = experimentConfigs.inactivityTimeoutSeconds, policyResetTimeSeconds = experimentConfigs.policyResetTimeSeconds, - shouldUseNewSSLFlow = experimentConfigs.shouldUseNewSSLFlow + shouldUseNewSSLFlow = experimentConfigs.shouldUseNewSSLFlow, + connectPacketTimeoutSeconds = experimentConfigs.connectPacketTimeoutSeconds ) mqttConnection = MqttConnection( @@ -238,7 +240,6 @@ internal class AndroidMqttClient( if (!isConnected()) { connectMqtt() } - try { logger.d( TAG, @@ -250,31 +251,37 @@ internal class AndroidMqttClient( MqttMessageSendEvent(topic, qos, message.size) ) } + mqttPacket.sendMessageCallback.onMessageSendTrigger() mqttConnection.publish(mqttPacket) } catch (e: MqttPersistenceException) { + mqttPacket.sendMessageCallback.onMessageSendFailure(e) with(mqttPacket) { eventHandler.onEvent( MqttMessageSendFailureEvent( topic = topic, qos = qos, sizeBytes = message.size, + timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis(), exception = e.toCourierException() ) ) } } catch (e: MqttException) { + mqttPacket.sendMessageCallback.onMessageSendFailure(e) with(mqttPacket) { eventHandler.onEvent( MqttMessageSendFailureEvent( topic = topic, qos = qos, sizeBytes = message.size, + timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis(), exception = e.toCourierException() ) ) } runnableScheduler.scheduleMqttHandleExceptionRunnable(e, true) } catch (e: java.lang.Exception) { + mqttPacket.sendMessageCallback.onMessageSendFailure(e) // this might happen if mqtt object becomes null while disconnect, so just ignore with(mqttPacket) { eventHandler.onEvent( @@ -282,6 +289,7 @@ internal class AndroidMqttClient( topic = topic, qos = qos, sizeBytes = message.size, + timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis(), exception = e.toCourierException() ) ) @@ -290,14 +298,16 @@ internal class AndroidMqttClient( } // This can be invoked on any thread - override fun send(mqttPacket: MqttPacket): Boolean { + override fun send(mqttPacket: MqttPacket, sendMessageCallback: SendMessageCallback): Boolean { val mqttSendPacket = MqttSendPacket( - mqttPacket.message, - 0, - System.currentTimeMillis(), - mqttPacket.qos.value, - mqttPacket.topic, - mqttPacket.qos.type + message = mqttPacket.message, + messageId = 0, + timestamp = System.currentTimeMillis(), + qos = mqttPacket.qos.value, + topic = mqttPacket.topic, + type = mqttPacket.qos.type, + triggerTime = System.nanoTime(), + sendMessageCallback = sendMessageCallback ) val msg = Message.obtain() @@ -512,6 +522,7 @@ internal class AndroidMqttClient( .keepAlive(keepAliveProvider.getKeepAlive(connectOptions)) .clientId(connectOptions.clientId + ":adaptive") .cleanSession(true) + .clearWill() .build() } else { connectOptions.newBuilder() @@ -528,7 +539,8 @@ internal class AndroidMqttClient( connectTimeout = mqttConfiguration.connectTimeoutPolicy.getConnectTimeOut(), host = hostFallbackPolicy!!.getServerUri().host, port = hostFallbackPolicy!!.getServerUri().port, - scheme = hostFallbackPolicy!!.getServerUri().scheme + scheme = hostFallbackPolicy!!.getServerUri().scheme, + cleanSession = mqttConnectOptions.isCleanSession ) ) } @@ -576,22 +588,38 @@ internal class AndroidMqttClient( inner class MqttMessageSendListener : IMessageSendListener { override fun onSuccess(packet: MqttSendPacket) { + packet.sendMessageCallback.onMessageSendSuccess() with(packet) { eventHandler.onEvent( MqttMessageSendSuccessEvent( - topic, - qos, - message.size + topic = topic, + qos = qos, + sizeBytes = message.size, + timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis() ) ) } } override fun onFailure(packet: MqttSendPacket, exception: Throwable) { + packet.sendMessageCallback.onMessageSendFailure(exception) + with(packet) { + eventHandler.onEvent( + MqttMessageSendFailureEvent( + topic = topic, + qos = qos, + sizeBytes = message.size, + timeTakenMillis = (System.nanoTime() - triggerTime).fromNanosToMillis(), + exception = exception.toCourierException() + ) + ) + } runnableScheduler.connectMqtt() } - override fun notifyWrittenOnSocket(packet: MqttSendPacket) {} + override fun notifyWrittenOnSocket(packet: MqttSendPacket) { + packet.sendMessageCallback.onMessageWrittenOnSocket() + } } companion object { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/connection/MqttConnection.kt b/mqtt-client/src/main/java/com/gojek/mqtt/connection/MqttConnection.kt index ab0e5490..361747ec 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/connection/MqttConnection.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/connection/MqttConnection.kt @@ -129,8 +129,9 @@ internal class MqttConnection( } val clientId: String = connectOptions.clientId + val username: String = connectOptions.username serverUri = getServerUri() - logger.d(TAG, "clientId : $clientId serverUri $serverUri") + logger.d(TAG, "clientId : $clientId, username: $username, serverUri $serverUri") if (mqtt == null) { mqtt = getMqttAsyncClient(clientId, serverUri.toString()) mqtt!!.setCallback(getMqttCallback(messageReceiveListener)) @@ -183,6 +184,16 @@ internal class MqttConnection( connectionSpec = mqttConnectOptions.connectionSpec alpnProtocolList = mqttConnectOptions.protocols } + + mqttConnectOptions.will?.apply { + options!!.setWill( + topic, + message.toByteArray(), + qos.value, + retained + ) + } + // Setting some connection options which we need to reset on every connect logger.d(TAG, "MQTT connecting on : " + mqtt!!.serverURI) @@ -192,7 +203,7 @@ internal class MqttConnection( connectOptions.keepAlive.isOptimal, serverUri ) - mqtt!!.connect(options, null, getConnectListener(subscriptionTopicMap)) + mqtt!!.connect(options, null, getConnectListener()) runnableScheduler.scheduleNextActivityCheck() } catch (e: MqttSecurityException) { logger.e(TAG, "mqtt security exception while connecting $e") @@ -407,7 +418,7 @@ internal class MqttConnection( return mqttAsyncClient } - private fun getConnectListener(subscriptionTopicMap: Map): IMqttActionListener? { + private fun getConnectListener(): IMqttActionListener { return object : IMqttActionListener { override fun onSuccess(iMqttToken: IMqttToken) { try { @@ -515,6 +526,7 @@ internal class MqttConnection( ), timeTakenMillis = (clock.nanoTime() - subscribeStartTime).fromNanosToMillis() ) + subscriptionStore.getListener().onInvalidTopicsSubscribeFailure(topicMap) } } } @@ -546,6 +558,7 @@ internal class MqttConnection( ), timeTakenMillis = (clock.nanoTime() - unsubscribeStartTime).fromNanosToMillis() ) + subscriptionStore.getListener().onInvalidTopicsUnsubscribeFailure(topics) } } } @@ -576,11 +589,12 @@ internal class MqttConnection( connectionConfig.connectionEventHandler.onMqttSubscribeFailure( topics = failTopicMap, timeTakenMillis = (clock.nanoTime() - context.startTime).fromNanosToMillis(), - throwable = MqttException(MqttException.REASON_CODE_INVALID_SUBSCRIPTION.toInt()) + throwable = MqttException(REASON_CODE_INVALID_SUBSCRIPTION.toInt()) ) } subscriptionStore.getListener().onTopicsSubscribed(successTopicMap) + subscriptionStore.getListener().onInvalidTopicsSubscribeFailure(failTopicMap) subscriptionPolicy.resetParams() } @@ -682,6 +696,10 @@ internal class MqttConnection( return connectionConfig.inactivityTimeoutSeconds } + override fun connectPacketTimeoutSecs(): Int { + return connectionConfig.connectPacketTimeoutSeconds + } + override fun useNewSSLFlow(): Boolean { return connectionConfig.shouldUseNewSSLFlow } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/connection/config/v3/ConnectionConfig.kt b/mqtt-client/src/main/java/com/gojek/mqtt/connection/config/v3/ConnectionConfig.kt index 82bd9eb5..387272ea 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/connection/config/v3/ConnectionConfig.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/connection/config/v3/ConnectionConfig.kt @@ -25,5 +25,6 @@ internal data class ConnectionConfig( val persistenceOptions: PersistenceOptions, val inactivityTimeoutSeconds: Int, val policyResetTimeSeconds: Int, - val shouldUseNewSSLFlow: Boolean + val shouldUseNewSSLFlow: Boolean, + val connectPacketTimeoutSeconds: Int ) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt b/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt index f954599e..20e71e9e 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/event/MqttEvent.kt @@ -25,6 +25,7 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) { val activeNetInfo: ActiveNetInfo, val serverUri: ServerUri?, val timeTakenMillis: Long, + val connectPacketRTTime: Long, override var connectionInfo: ConnectionInfo? = null ) : MqttEvent(connectionInfo) @@ -171,6 +172,7 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) { val topic: String, val qos: Int, val sizeBytes: Int, + val timeTakenMillis: Long, override var connectionInfo: ConnectionInfo? = null ) : MqttEvent(connectionInfo) @@ -179,6 +181,7 @@ sealed class MqttEvent(open var connectionInfo: ConnectionInfo?) { val qos: Int, val sizeBytes: Int, val exception: CourierException, + val timeTakenMillis: Long, override var connectionInfo: ConnectionInfo? = null ) : MqttEvent(connectionInfo) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/exception/handler/v3/impl/MqttExceptionHandlerImpl.kt b/mqtt-client/src/main/java/com/gojek/mqtt/exception/handler/v3/impl/MqttExceptionHandlerImpl.kt index 988f51bd..2bf77316 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/exception/handler/v3/impl/MqttExceptionHandlerImpl.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/exception/handler/v3/impl/MqttExceptionHandlerImpl.kt @@ -120,7 +120,19 @@ internal class MqttExceptionHandlerImpl( */ } MqttException.REASON_CODE_FAILED_AUTHENTICATION -> { - runnableScheduler.scheduleAuthFailureRunnable() + runnableScheduler.scheduleAuthFailureRunnable( + connectRetryTimePolicy.getConnRetryTimeSecs(true) * 1000L + ) + } + MqttException.REASON_CODE_NOT_AUTHORIZED -> { + runnableScheduler.scheduleAuthFailureRunnable( + connectRetryTimePolicy.getConnRetryTimeSecs(true) * 1000L + ) + } + MqttException.REASON_CODE_INVALID_CONNECT_OPTIONS -> { + runnableScheduler.scheduleAuthFailureRunnable( + connectRetryTimePolicy.getConnRetryTimeSecs(true) * 1000L + ) } MqttException.REASON_CODE_INVALID_CLIENT_ID -> { } @@ -130,9 +142,6 @@ internal class MqttExceptionHandlerImpl( } MqttException.REASON_CODE_NO_MESSAGE_IDS_AVAILABLE -> { } - MqttException.REASON_CODE_NOT_AUTHORIZED -> { - runnableScheduler.scheduleAuthFailureRunnable() - } MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH -> { } MqttException.REASON_CODE_SSL_CONFIG_ERROR -> { diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/model/MqttConnectOptions.kt b/mqtt-client/src/main/java/com/gojek/mqtt/model/MqttConnectOptions.kt index 2a453e24..907a92a4 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/model/MqttConnectOptions.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/model/MqttConnectOptions.kt @@ -41,6 +41,8 @@ class MqttConnectOptions private constructor( val protocols: List = builder.protocols + val will: Will? = builder.will + init { if (connectionSpec.isTls.not()) { this.sslSocketFactory = null @@ -77,6 +79,7 @@ class MqttConnectOptions private constructor( internal var x509TrustManagerOrNull: X509TrustManager? = null internal var connectionSpec: ConnectionSpec = DEFAULT_CONNECTION_SPECS internal var protocols: List = emptyList() + internal var will: Will? = null internal constructor(mqttConnectOptions: MqttConnectOptions) : this() { this.serverUris = mqttConnectOptions.serverUris @@ -93,6 +96,7 @@ class MqttConnectOptions private constructor( this.x509TrustManagerOrNull = mqttConnectOptions.x509TrustManager this.connectionSpec = mqttConnectOptions.connectionSpec this.protocols = mqttConnectOptions.protocols + this.will = mqttConnectOptions.will } fun serverUris(serverUris: List) = apply { @@ -204,6 +208,14 @@ class MqttConnectOptions private constructor( this.protocols = protocols } + fun will(will: Will) = apply { + this.will = will + } + + fun clearWill() = apply { + this.will = null + } + fun build(): MqttConnectOptions = MqttConnectOptions(this) } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/model/Will.kt b/mqtt-client/src/main/java/com/gojek/mqtt/model/Will.kt new file mode 100644 index 00000000..64941247 --- /dev/null +++ b/mqtt-client/src/main/java/com/gojek/mqtt/model/Will.kt @@ -0,0 +1,10 @@ +package com.gojek.mqtt.model + +import com.gojek.courier.QoS + +data class Will( + val topic: String, + val message: String, + val qos: QoS, + val retained: Boolean +) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/persistence/IMqttReceivePersistence.kt b/mqtt-client/src/main/java/com/gojek/mqtt/persistence/IMqttReceivePersistence.kt index e7dcb280..c0a8a605 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/persistence/IMqttReceivePersistence.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/persistence/IMqttReceivePersistence.kt @@ -7,4 +7,5 @@ internal interface IMqttReceivePersistence { fun getAllIncomingMessagesWithTopicFilter(topics: Set): List fun removeReceivedMessages(messageIds: List): Int fun removeMessagesWithOlderTimestamp(timestampNanos: Long): Int + fun getAllIncomingMessagesForWildCardTopic(topic: String): List } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/persistence/dao/IncomingMessagesDao.kt b/mqtt-client/src/main/java/com/gojek/mqtt/persistence/dao/IncomingMessagesDao.kt index 5aedd4cb..8f8783a9 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/persistence/dao/IncomingMessagesDao.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/persistence/dao/IncomingMessagesDao.kt @@ -13,6 +13,9 @@ internal interface IncomingMessagesDao { @Query("SELECT * from incoming_messages where topic in (:topics)") fun getAllMessagesWithTopicFilter(topics: Set): List + @Query("SELECT * from incoming_messages where topic LIKE :topic") + fun getAllIncomingMessagesForWildCardTopic(topic: String): List + @Query("DELETE from incoming_messages") fun clearAllMessages() diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/persistence/impl/PahoPersistence.kt b/mqtt-client/src/main/java/com/gojek/mqtt/persistence/impl/PahoPersistence.kt index e4981649..ded9546b 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/persistence/impl/PahoPersistence.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/persistence/impl/PahoPersistence.kt @@ -86,6 +86,10 @@ internal class PahoPersistence(private val context: Context) : return incomingMessagesDao.getAllMessagesWithTopicFilter(topics) } + override fun getAllIncomingMessagesForWildCardTopic(topic: String): List { + return incomingMessagesDao.getAllIncomingMessagesForWildCardTopic(topic) + } + override fun removeReceivedMessages(messageIds: List): Int { return incomingMessagesDao.removeMessagesById(messageIds) } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/policies/connectretrytime/ConnectRetryTimePolicy.kt b/mqtt-client/src/main/java/com/gojek/mqtt/policies/connectretrytime/ConnectRetryTimePolicy.kt index f3edc3c2..d56252e2 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/policies/connectretrytime/ConnectRetryTimePolicy.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/policies/connectretrytime/ConnectRetryTimePolicy.kt @@ -9,14 +9,16 @@ class ConnectRetryTimePolicy( private val reconnectTime: AtomicInteger = AtomicInteger(0) private val retryCount: AtomicInteger = AtomicInteger(0) - // this function works on exponential retrying - override fun getConnRetryTimeSecs(forceExp: Boolean): Int { + override fun getConnRetryTimeSecs(isAuthFailure: Boolean): Int { + if (isAuthFailure) { + return 0 + } val maxRetryCount = connectRetryTimeConfig.maxRetryCount val reconnectTimeFixed = connectRetryTimeConfig.reconnectTimeFixed val reconnectTimeRandom = connectRetryTimeConfig.reconnectTimeRandom val maxReconnectTime = connectRetryTimeConfig.maxReconnectTime val random = Random() - if ((reconnectTime.get() == 0 || retryCount.get() < maxRetryCount) && !forceExp) { + if ((reconnectTime.get() == 0 || retryCount.get() < maxRetryCount)) { reconnectTime.set(reconnectTimeFixed + random.nextInt(reconnectTimeRandom) + 1) retryCount.getAndIncrement() } else { @@ -32,18 +34,6 @@ class ConnectRetryTimePolicy( return reconnectTime.get() } - override fun getRetryCount(): Int { - return retryCount.get() - } - - override fun getConnRetryTimeSecs(): Int { - return getConnRetryTimeSecs(false) - } - - override fun getCurrentRetryTime(): Int { - return reconnectTime.get() - } - private fun updateReconnectTimeExponentially(exponentialFactor: Int): Int { while (true) { val current = reconnectTime.get() diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy.kt b/mqtt-client/src/main/java/com/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy.kt index 44669fa6..57ef3137 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/policies/connectretrytime/IConnectRetryTimePolicy.kt @@ -3,8 +3,5 @@ package com.gojek.mqtt.policies.connectretrytime import com.gojek.mqtt.policies.IFallbackPolicy interface IConnectRetryTimePolicy : IFallbackPolicy { - fun getConnRetryTimeSecs(): Int - fun getConnRetryTimeSecs(forceExp: Boolean): Int - fun getRetryCount(): Int - fun getCurrentRetryTime(): Int + fun getConnRetryTimeSecs(isAuthFailure: Boolean = false): Int } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/IRunnableScheduler.kt b/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/IRunnableScheduler.kt index c7e89cef..70b8b93c 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/IRunnableScheduler.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/IRunnableScheduler.kt @@ -23,5 +23,5 @@ internal interface IRunnableScheduler { fun scheduleResetParams(delayMillis: Long) - fun scheduleAuthFailureRunnable() + fun scheduleAuthFailureRunnable(delayMillis: Long) } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/MqttRunnableScheduler.kt b/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/MqttRunnableScheduler.kt index 5c7dfab1..f1be998d 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/MqttRunnableScheduler.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/scheduler/MqttRunnableScheduler.kt @@ -144,11 +144,11 @@ internal class MqttRunnableScheduler( } } - override fun scheduleAuthFailureRunnable() { + override fun scheduleAuthFailureRunnable(delayMillis: Long) { try { sendThreadEventIfNotAlive() mqttThreadHandler.removeCallbacks(authFailureRunnable) - mqttThreadHandler.post(authFailureRunnable) + mqttThreadHandler.postDelayed(authFailureRunnable, delayMillis) } catch (ex: Exception) { logger.e(TAG, "Exception while scheduleAuthFailureRunnable", ex) } diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/subscription/InMemorySubscriptionStore.kt b/mqtt-client/src/main/java/com/gojek/mqtt/subscription/InMemorySubscriptionStore.kt index f948d381..3d88cf94 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/subscription/InMemorySubscriptionStore.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/subscription/InMemorySubscriptionStore.kt @@ -4,7 +4,13 @@ import com.gojek.courier.QoS internal class InMemorySubscriptionStore : SubscriptionStore { private var state = State(mapOf()) - private val listener = object : SubscriptionStoreListener {} + private val listener = object : SubscriptionStoreListener { + override fun onInvalidTopicsSubscribeFailure(topicMap: Map) { + state = state.copy( + subscriptionTopics = state.subscriptionTopics - topicMap.keys + ) + } + } private data class State(val subscriptionTopics: Map) diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStore.kt b/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStore.kt index 6afaffe5..ebb011f9 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStore.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStore.kt @@ -15,6 +15,17 @@ internal class PersistableSubscriptionStore(context: Context) : SubscriptionStor override fun onTopicsUnsubscribed(topics: Set) { onTopicsUnsubscribedInternal(topics) } + + override fun onInvalidTopicsSubscribeFailure(topicMap: Map) { + state = state.copy( + subscriptionTopics = state.subscriptionTopics - topicMap.keys, + pendingUnsubscribeTopics = state.pendingUnsubscribeTopics + ) + } + + override fun onInvalidTopicsUnsubscribeFailure(topics: Set) { + onTopicsUnsubscribedInternal(topics) + } } private data class State( diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreV2.kt b/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreV2.kt index f26368e5..23c40372 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreV2.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/subscription/PersistableSubscriptionStoreV2.kt @@ -13,6 +13,17 @@ internal class PersistableSubscriptionStoreV2(context: Context) : SubscriptionSt override fun onTopicsUnsubscribed(topics: Set) { onTopicsUnsubscribedInternal(topics) } + + override fun onInvalidTopicsSubscribeFailure(topicMap: Map) { + state = state.copy( + subscriptionTopics = state.subscriptionTopics - topicMap.keys, + pendingUnsubscribeTopics = state.pendingUnsubscribeTopics + ) + } + + override fun onInvalidTopicsUnsubscribeFailure(topics: Set) { + onTopicsUnsubscribedInternal(topics) + } } private data class State( diff --git a/mqtt-client/src/main/java/com/gojek/mqtt/subscription/SubscriptionStore.kt b/mqtt-client/src/main/java/com/gojek/mqtt/subscription/SubscriptionStore.kt index e8da0fcb..9378dd5a 100644 --- a/mqtt-client/src/main/java/com/gojek/mqtt/subscription/SubscriptionStore.kt +++ b/mqtt-client/src/main/java/com/gojek/mqtt/subscription/SubscriptionStore.kt @@ -13,5 +13,7 @@ internal interface SubscriptionStore { internal interface SubscriptionStoreListener { fun onTopicsSubscribed(topicMap: Map) = Unit + fun onInvalidTopicsSubscribeFailure(topicMap: Map) = Unit fun onTopicsUnsubscribed(topics: Set) = Unit + fun onInvalidTopicsUnsubscribeFailure(topics: Set) = Unit } diff --git a/mqtt-client/src/test/java/com/gojek/mqtt/client/MqttCourierClientTest.kt b/mqtt-client/src/test/java/com/gojek/mqtt/client/MqttCourierClientTest.kt index c1726d22..ccbdef06 100644 --- a/mqtt-client/src/test/java/com/gojek/mqtt/client/MqttCourierClientTest.kt +++ b/mqtt-client/src/test/java/com/gojek/mqtt/client/MqttCourierClientTest.kt @@ -2,12 +2,14 @@ package com.gojek.mqtt.client import com.gojek.courier.Message import com.gojek.courier.QoS +import com.gojek.courier.callback.SendMessageCallback import com.gojek.mqtt.client.internal.MqttClientInternal import com.gojek.mqtt.client.listener.MessageListener import com.gojek.mqtt.client.model.ConnectionState import com.gojek.mqtt.model.MqttConnectOptions import com.gojek.mqtt.model.MqttPacket import com.nhaarman.mockitokotlin2.argumentCaptor +import com.nhaarman.mockitokotlin2.eq import com.nhaarman.mockitokotlin2.mock import com.nhaarman.mockitokotlin2.verify import com.nhaarman.mockitokotlin2.whenever @@ -71,13 +73,14 @@ class MqttCourierClientTest { @Test fun `test send`() { val message = mock() + val callback = mock() val byteArray = ByteArray(10) whenever(message.value).thenReturn(byteArray) val topic = "test/topic" val qos = QoS.ZERO - mqttCourierClient.send(message, topic, qos) + mqttCourierClient.send(message, topic, qos, callback) val argumentCaptor = argumentCaptor() - verify(mqttClientInternal).send(argumentCaptor.capture()) + verify(mqttClientInternal).send(argumentCaptor.capture(), eq(callback)) assertEquals(argumentCaptor.lastValue.message, byteArray) assertEquals(argumentCaptor.lastValue.topic, topic) assertEquals(argumentCaptor.lastValue.qos, qos) diff --git a/mqtt-client/src/test/java/com/gojek/mqtt/exception/handler/v3/impl/MqttExceptionHandlerImplTest.kt b/mqtt-client/src/test/java/com/gojek/mqtt/exception/handler/v3/impl/MqttExceptionHandlerImplTest.kt index b6af64e7..4350300d 100644 --- a/mqtt-client/src/test/java/com/gojek/mqtt/exception/handler/v3/impl/MqttExceptionHandlerImplTest.kt +++ b/mqtt-client/src/test/java/com/gojek/mqtt/exception/handler/v3/impl/MqttExceptionHandlerImplTest.kt @@ -37,6 +37,31 @@ import java.net.UnknownHostException import java.nio.channels.UnresolvedAddressException import java.util.Random import javax.net.ssl.SSLHandshakeException +import org.eclipse.paho.client.mqttv3.MqttException +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_BROKER_UNAVAILABLE +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CLIENT_ALREADY_DISCONNECTED +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CLIENT_CLOSED +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CLIENT_CONNECTED +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CLIENT_DISCONNECTING +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CLIENT_DISCONNECT_PROHIBITED +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CLIENT_EXCEPTION +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CLIENT_NOT_CONNECTED +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CLIENT_TIMEOUT +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CONNECTION_LOST +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_CONNECT_IN_PROGRESS +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_FAILED_AUTHENTICATION +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_INVALID_CLIENT_ID +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_INVALID_CONNECT_OPTIONS +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_INVALID_MESSAGE +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_INVALID_PROTOCOL_VERSION +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_MAX_INFLIGHT +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_NOT_AUTHORIZED +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_NO_MESSAGE_IDS_AVAILABLE +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_SERVER_CONNECT_ERROR +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_SOCKET_FACTORY_MISMATCH +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_SSL_CONFIG_ERROR +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_TOKEN_INUSE +import org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_UNEXPECTED_ERROR import org.junit.Test import org.junit.runner.RunWith import org.mockito.Mockito.verifyNoInteractions @@ -177,8 +202,9 @@ class MqttExceptionHandlerImplTest { val exception = MqttException( REASON_CODE_FAILED_AUTHENTICATION.toInt() ) + whenever(connectRetryTimePolicy.getConnRetryTimeSecs(true)).thenReturn(20) mqttExceptionHandlerImpl.handleException(exception, true) - verify(runnableScheduler).scheduleAuthFailureRunnable() + verify(runnableScheduler).scheduleAuthFailureRunnable(20000) } @Test @@ -186,8 +212,17 @@ class MqttExceptionHandlerImplTest { val exception = MqttException( REASON_CODE_NOT_AUTHORIZED.toInt() ) + whenever(connectRetryTimePolicy.getConnRetryTimeSecs(true)).thenReturn(20) mqttExceptionHandlerImpl.handleException(exception, true) - verify(runnableScheduler).scheduleAuthFailureRunnable() + verify(runnableScheduler).scheduleAuthFailureRunnable(20000) + } + + @Test + fun `test exception with reason code 32205`() { + val exception = MqttException(REASON_CODE_INVALID_CONNECT_OPTIONS.toInt()) + whenever(connectRetryTimePolicy.getConnRetryTimeSecs(true)).thenReturn(20) + mqttExceptionHandlerImpl.handleException(exception, true) + verify(runnableScheduler).scheduleAuthFailureRunnable(20000) } @Test diff --git a/paho/src/main/java/in/mohalla/paho/client/mqttv3/IExperimentsConfig.java b/paho/src/main/java/in/mohalla/paho/client/mqttv3/IExperimentsConfig.java index db55f4b4..46d2e282 100644 --- a/paho/src/main/java/in/mohalla/paho/client/mqttv3/IExperimentsConfig.java +++ b/paho/src/main/java/in/mohalla/paho/client/mqttv3/IExperimentsConfig.java @@ -3,5 +3,7 @@ public interface IExperimentsConfig { int inactivityTimeoutSecs(); + int connectPacketTimeoutSecs(); + Boolean useNewSSLFlow(); } diff --git a/paho/src/main/java/in/mohalla/paho/client/mqttv3/MqttAsyncClient.java b/paho/src/main/java/in/mohalla/paho/client/mqttv3/MqttAsyncClient.java index 003d98dc..75eeb297 100644 --- a/paho/src/main/java/in/mohalla/paho/client/mqttv3/MqttAsyncClient.java +++ b/paho/src/main/java/in/mohalla/paho/client/mqttv3/MqttAsyncClient.java @@ -872,7 +872,7 @@ public IMqttToken checkPing(Object userContext, IMqttActionListener callback) th // @TRACE 117=> logger.d(TAG, "checking for ping"); - token = comms.checkForActivity(); + token = comms.checkForActivity(false); // @TRACE 118=< return token; diff --git a/paho/src/main/java/in/mohalla/paho/client/mqttv3/MqttConnectOptions.java b/paho/src/main/java/in/mohalla/paho/client/mqttv3/MqttConnectOptions.java index a5a27a88..1f79232f 100644 --- a/paho/src/main/java/in/mohalla/paho/client/mqttv3/MqttConnectOptions.java +++ b/paho/src/main/java/in/mohalla/paho/client/mqttv3/MqttConnectOptions.java @@ -26,6 +26,8 @@ import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.X509TrustManager; +import static org.eclipse.paho.client.mqttv3.MqttException.REASON_CODE_INVALID_CONNECT_OPTIONS; + /** * Holds the set of options that control how the client connects to a server. */ @@ -157,11 +159,11 @@ public String getUserName() * @throws IllegalArgumentException * if the user name is blank or only contains whitespace characters. */ - public void setUserName(String userName) + public void setUserName(String userName) throws MqttException { if ((userName != null) && (userName.trim().equals(""))) { - throw new IllegalArgumentException("Username is empty"); + throw new MqttException(REASON_CODE_INVALID_CONNECT_OPTIONS, new IllegalArgumentException("Username is empty")); } this.userName = userName; } @@ -259,11 +261,11 @@ public int getKeepAliveIntervalServer() * @param keepAliveInterval * the interval, measured in seconds, must be >= 0. */ - public void setKeepAliveInterval(int keepAliveInterval) throws IllegalArgumentException + public void setKeepAliveInterval(int keepAliveInterval) throws MqttException { if (keepAliveInterval < 0) { - throw new IllegalArgumentException(); + throw new MqttException(REASON_CODE_INVALID_CONNECT_OPTIONS); } this.keepAliveInterval = keepAliveInterval; } @@ -271,11 +273,11 @@ public void setKeepAliveInterval(int keepAliveInterval) throws IllegalArgumentEx /* * This interval is used for sending to server in connect packet. */ - public void setKeepAliveIntervalServer(int keepAliveInterval) throws IllegalArgumentException + public void setKeepAliveIntervalServer(int keepAliveInterval) throws MqttException { if (keepAliveInterval < 0) { - throw new IllegalArgumentException(); + throw new MqttException(REASON_CODE_INVALID_CONNECT_OPTIONS); } this.keepAliveIntervalServer = keepAliveInterval; } @@ -352,11 +354,11 @@ public void setHandshakeTimeout(int handshakeTimeout) * @param readTimeout * the timeout value, measured in seconds. It must be >0; */ - public void setReadTimeout(int readTimeout) + public void setReadTimeout(int readTimeout) throws MqttException { if (readTimeout < 0) { - throw new IllegalArgumentException(); + throw new MqttException(REASON_CODE_INVALID_CONNECT_OPTIONS); } this.readTimeout = readTimeout; } diff --git a/paho/src/main/java/in/mohalla/paho/client/mqttv3/MqttException.java b/paho/src/main/java/in/mohalla/paho/client/mqttv3/MqttException.java index 4a200a71..48ceaad8 100644 --- a/paho/src/main/java/in/mohalla/paho/client/mqttv3/MqttException.java +++ b/paho/src/main/java/in/mohalla/paho/client/mqttv3/MqttException.java @@ -147,6 +147,11 @@ public class MqttException extends Exception */ public static final short REASON_CODE_INVALID_SUBSCRIPTION = 32204; + /** + * The Client has attempted to connect with invalid arguments. + */ + public static final short REASON_CODE_INVALID_CONNECT_OPTIONS = 32205; + private int reasonCode; private Throwable cause; diff --git a/paho/src/main/java/in/mohalla/paho/client/mqttv3/TimerPingSender.java b/paho/src/main/java/in/mohalla/paho/client/mqttv3/TimerPingSender.java index 0479197b..e37e04aa 100644 --- a/paho/src/main/java/in/mohalla/paho/client/mqttv3/TimerPingSender.java +++ b/paho/src/main/java/in/mohalla/paho/client/mqttv3/TimerPingSender.java @@ -86,7 +86,7 @@ public void run() { // @Trace 660=Check schedule at {0} logger.d(TAG, "in ping timer task run function"); - comms.checkForActivity(); + comms.checkForActivity(false); } } } diff --git a/paho/src/main/java/in/mohalla/paho/client/mqttv3/internal/ClientComms.java b/paho/src/main/java/in/mohalla/paho/client/mqttv3/internal/ClientComms.java index 179470ed..88326d8f 100644 --- a/paho/src/main/java/in/mohalla/paho/client/mqttv3/internal/ClientComms.java +++ b/paho/src/main/java/in/mohalla/paho/client/mqttv3/internal/ClientComms.java @@ -758,10 +758,10 @@ public void run() { /* * Check and send a ping if needed and check for ping timeout. Need to send a ping if nothing has been sent or received in the last keepalive interval. */ - public MqttToken checkForActivity() { + public MqttToken checkForActivity(Boolean forcePing) { MqttToken token = null; try { - token = clientState.checkForActivity(); + token = clientState.checkForActivity(forcePing); } catch (MqttException e) { handleRunException(e); } catch (Exception e) { diff --git a/paho/src/main/java/in/mohalla/paho/client/mqttv3/internal/ClientState.java b/paho/src/main/java/in/mohalla/paho/client/mqttv3/internal/ClientState.java index f5ea4f1c..d2c9bea4 100644 --- a/paho/src/main/java/in/mohalla/paho/client/mqttv3/internal/ClientState.java +++ b/paho/src/main/java/in/mohalla/paho/client/mqttv3/internal/ClientState.java @@ -45,7 +45,6 @@ import java.io.EOFException; import java.util.Enumeration; import java.util.Hashtable; -import java.util.Map; import java.util.Properties; import java.util.Vector; @@ -150,6 +149,8 @@ public class ClientState private final static String className = ClientState.class.getName(); private long inactivityTimeout = DEFAULT_INACTIVITY_TIMEOUT; + + private long connectPacketTimeout = DEFAULT_INACTIVITY_TIMEOUT; private final static long DEFAULT_INACTIVITY_TIMEOUT = 60 * 1000; private IPahoEvents pahoEvents; @@ -191,6 +192,7 @@ protected ClientState( if (experimentsConfig != null) { inactivityTimeout = experimentsConfig.inactivityTimeoutSecs() * 1000L; + connectPacketTimeout = experimentsConfig.connectPacketTimeoutSecs() * 1000L; } restoreState(); @@ -647,7 +649,7 @@ protected void undo(MqttPublish message) throws MqttPersistenceException * * @return token of ping command, null if no ping command has been sent. */ - public MqttToken checkForActivity() throws MqttException + public MqttToken checkForActivity(Boolean forcePing) throws MqttException { final String methodName = "checkForActivity"; @@ -670,7 +672,7 @@ public MqttToken checkForActivity() throws MqttException long lastActivity = lastInboundActivity; // Is a ping required? - if (time - lastActivity + keepAliveMargin >= this.keepAlive) + if (forcePing || (time - lastActivity + keepAliveMargin >= this.keepAlive)) { // @TRACE 620=ping needed. keepAlive={0} lastOutboundActivity={1} lastInboundActivity={2} @@ -749,7 +751,7 @@ public MqttToken sendForcePingRequest() throws MqttException // Wake sender thread since it may be in wait state (in ClientState.get()) notifyQueueLock(); } - else if ((time - lastPing >= keepAlive + delta) && (time - lastInboundActivity >= keepAlive + delta) && (time - lastOutboundActivity >= keepAlive + delta)) + else if ((time - lastPing >= keepAlive + delta) && (time - lastInboundActivity >= keepAlive + delta)) { // any of the conditions is true means the client is active // lastInboundActivity will be updated once receiving is done. @@ -775,8 +777,13 @@ public void checkActivity() throws MqttException { if (fastReconnectCheckStartTime > lastInboundActivity) { - if(System.currentTimeMillis() - fastReconnectCheckStartTime >= inactivityTimeout) - + long timeout; + if (clientComms.isConnecting()) { + timeout = connectPacketTimeout; + } else { + timeout = inactivityTimeout; + } + if(System.currentTimeMillis() - fastReconnectCheckStartTime >= timeout) { logger.logFastReconnectEvent(fastReconnectCheckStartTime, lastInboundActivity); logger.e(TAG, "not recieved ack for 1 min so disconnecting"); @@ -1345,6 +1352,10 @@ public void disconnected(MqttException reason) // Reset pingOutstanding to allow reconnects to assume no previous ping. pingOutstanding = Boolean.FALSE; } + fastReconnectCheckStartTime = 0; + lastInboundActivity = 0; + lastOutboundActivity = 0; + lastPing = 0; } catch (MqttException e) { diff --git a/paho/src/main/java/in/mohalla/paho/client/mqttv3/internal/CommsSender.java b/paho/src/main/java/in/mohalla/paho/client/mqttv3/internal/CommsSender.java index 0c3bdd4c..d562fb62 100644 --- a/paho/src/main/java/in/mohalla/paho/client/mqttv3/internal/CommsSender.java +++ b/paho/src/main/java/in/mohalla/paho/client/mqttv3/internal/CommsSender.java @@ -286,7 +286,9 @@ private void handleRunException(MqttWireMessage message, Exception ex) ) { clientState.releaseMessageId(message.getMessageId()); } - clientState.releaseMessageId(message.getType()); + if (message != null) { + clientState.releaseMessageId(message.getType()); + } running = false; clientComms.shutdownConnection(null, mex); diff --git a/pingsender/alarm-pingsender/api/alarm-pingsender.api b/pingsender/alarm-pingsender/api/alarm-pingsender.api index fed9f85d..492c3e1f 100644 --- a/pingsender/alarm-pingsender/api/alarm-pingsender.api +++ b/pingsender/alarm-pingsender/api/alarm-pingsender.api @@ -1,15 +1,17 @@ public final class com/gojek/alarm/pingsender/AlarmPingSenderConfig { public fun ()V - public fun (ZZIZ)V - public synthetic fun (ZZIZILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (ZZIZZ)V + public synthetic fun (ZZIZZILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()Z public final fun component2 ()Z public final fun component3 ()I public final fun component4 ()Z - public final fun copy (ZZIZ)Lcom/gojek/alarm/pingsender/AlarmPingSenderConfig; - public static synthetic fun copy$default (Lcom/gojek/alarm/pingsender/AlarmPingSenderConfig;ZZIZILjava/lang/Object;)Lcom/gojek/alarm/pingsender/AlarmPingSenderConfig; + public final fun component5 ()Z + public final fun copy (ZZIZZ)Lcom/gojek/alarm/pingsender/AlarmPingSenderConfig; + public static synthetic fun copy$default (Lcom/gojek/alarm/pingsender/AlarmPingSenderConfig;ZZIZZILjava/lang/Object;)Lcom/gojek/alarm/pingsender/AlarmPingSenderConfig; public fun equals (Ljava/lang/Object;)Z public final fun getPingWakeLockTimeout ()I + public final fun getSendForcePing ()Z public final fun getUseElapsedRealTimeAlarm ()Z public fun hashCode ()I public final fun isMqttAllowWhileIdle ()Z diff --git a/pingsender/alarm-pingsender/src/main/java/com/gojek/alarm/pingsender/AdaptiveAlarmPingSender.kt b/pingsender/alarm-pingsender/src/main/java/com/gojek/alarm/pingsender/AdaptiveAlarmPingSender.kt index 1c18a172..3f139b06 100644 --- a/pingsender/alarm-pingsender/src/main/java/com/gojek/alarm/pingsender/AdaptiveAlarmPingSender.kt +++ b/pingsender/alarm-pingsender/src/main/java/com/gojek/alarm/pingsender/AdaptiveAlarmPingSender.kt @@ -103,8 +103,8 @@ internal class AdaptiveAlarmPingSender( applicationContext.getSystemService(Service.ALARM_SERVICE) as AlarmManager // pending intent can be null if we get a security exception in onstart-->defensive check - if (pendingIntent != null) { - alarmManager.cancel(pendingIntent) + pendingIntent?.let { + alarmManager.cancel(it) } } catch (ex: Exception) { logger.d(TAG, "Unregister alarmreceiver to MqttService$ex") @@ -147,21 +147,23 @@ internal class AdaptiveAlarmPingSender( applicationContext.getSystemService(Service.ALARM_SERVICE) as AlarmManager val alarmType = getAlarmType() val isMqttAllowWhileIdle = pingSenderConfig.isMqttAllowWhileIdle - if (isMqttAllowWhileIdle && buildInfoProvider.isMarshmallowOrHigher) { - alarmManager.setExactAndAllowWhileIdle( - alarmType, - nextAlarmInMilliseconds, - pendingIntent - ) - } else if (buildInfoProvider.isKitkatOrHigher) { - alarmManager.setExact( - alarmType, - nextAlarmInMilliseconds, - pendingIntent - ) - } else { - alarmManager[alarmType, nextAlarmInMilliseconds] = - pendingIntent + pendingIntent?.let { pendingIntent -> + if (isMqttAllowWhileIdle && buildInfoProvider.isMarshmallowOrHigher) { + alarmManager.setExactAndAllowWhileIdle( + alarmType, + nextAlarmInMilliseconds, + pendingIntent + ) + } else if (buildInfoProvider.isKitkatOrHigher) { + alarmManager.setExact( + alarmType, + nextAlarmInMilliseconds, + pendingIntent + ) + } else { + alarmManager[alarmType, nextAlarmInMilliseconds] = + pendingIntent + } } } catch (ex: Exception) { logger.d( diff --git a/pingsender/alarm-pingsender/src/main/java/com/gojek/alarm/pingsender/AlarmPingSender.kt b/pingsender/alarm-pingsender/src/main/java/com/gojek/alarm/pingsender/AlarmPingSender.kt index 8a079d65..0e0052fc 100644 --- a/pingsender/alarm-pingsender/src/main/java/com/gojek/alarm/pingsender/AlarmPingSender.kt +++ b/pingsender/alarm-pingsender/src/main/java/com/gojek/alarm/pingsender/AlarmPingSender.kt @@ -92,8 +92,8 @@ internal class AlarmPingSender( applicationContext.getSystemService(Service.ALARM_SERVICE) as AlarmManager // pending intent can be null if we get a security exception in onstart-->defensive check - if (pendingIntent != null) { - alarmManager.cancel(pendingIntent) + pendingIntent?.let { + alarmManager.cancel(it) } } catch (ex: Exception) { logger.d(TAG, "Unregister alarmreceiver to MqttService$ex") @@ -133,23 +133,25 @@ internal class AlarmPingSender( applicationContext.getSystemService(Service.ALARM_SERVICE) as AlarmManager val alarmType = getAlarmType() val isMqttAllowWhileIdle = alarmPingSenderConfig.isMqttAllowWhileIdle - if (isMqttAllowWhileIdle && buildInfoProvider.isMarshmallowOrHigher) { - alarmManager.setExactAndAllowWhileIdle( - alarmType, - nextAlarmInMilliseconds, - pendingIntent - ) - } else if (buildInfoProvider.isKitkatOrHigher) { - alarmManager.setExact( - alarmType, - nextAlarmInMilliseconds, - pendingIntent - ) - } else { - alarmManager[alarmType, nextAlarmInMilliseconds] = - pendingIntent + pendingIntent?.let { pendingIntent -> + if (isMqttAllowWhileIdle && buildInfoProvider.isMarshmallowOrHigher) { + alarmManager.setExactAndAllowWhileIdle( + alarmType, + nextAlarmInMilliseconds, + pendingIntent + ) + } else if (buildInfoProvider.isKitkatOrHigher) { + alarmManager.setExact( + alarmType, + nextAlarmInMilliseconds, + pendingIntent + ) + } else { + alarmManager[alarmType, nextAlarmInMilliseconds] = + pendingIntent + } + pingSenderEvents.mqttPingScheduled(delayInMilliseconds.fromMillisToSeconds(), comms.keepAlive.fromMillisToSeconds()) } - pingSenderEvents.mqttPingScheduled(delayInMilliseconds.fromMillisToSeconds(), comms.keepAlive.fromMillisToSeconds()) } catch (ex: Exception) { logger.d( TAG, @@ -201,7 +203,7 @@ internal class AlarmPingSender( serverUri = comms.client.serverURI } pingSenderEvents.mqttPingInitiated(serverUri, comms.keepAlive.fromMillisToSeconds()) - val token: IMqttToken? = comms.checkForActivity() + val token = comms.checkForActivity(alarmPingSenderConfig.sendForcePing) // No ping has been sent. if (token == null) { diff --git a/pingsender/alarm-pingsender/src/main/java/com/gojek/alarm/pingsender/AlarmPingSenderConfig.kt b/pingsender/alarm-pingsender/src/main/java/com/gojek/alarm/pingsender/AlarmPingSenderConfig.kt index 0f033f9c..5d9f78ab 100644 --- a/pingsender/alarm-pingsender/src/main/java/com/gojek/alarm/pingsender/AlarmPingSenderConfig.kt +++ b/pingsender/alarm-pingsender/src/main/java/com/gojek/alarm/pingsender/AlarmPingSenderConfig.kt @@ -4,7 +4,8 @@ data class AlarmPingSenderConfig( val isMqttPingWakeUp: Boolean = true, val isMqttAllowWhileIdle: Boolean = true, val pingWakeLockTimeout: Int = DEFAULT_PING_WAKELOCK_TIMEOUT_IN_SECONDS, - val useElapsedRealTimeAlarm: Boolean = false + val useElapsedRealTimeAlarm: Boolean = false, + val sendForcePing: Boolean = false ) private const val DEFAULT_PING_WAKELOCK_TIMEOUT_IN_SECONDS = 0 // 0 seconds diff --git a/pingsender/mqtt-pingsender/api/mqtt-pingsender.api b/pingsender/mqtt-pingsender/api/mqtt-pingsender.api index ded64793..cfe90452 100644 --- a/pingsender/mqtt-pingsender/api/mqtt-pingsender.api +++ b/pingsender/mqtt-pingsender/api/mqtt-pingsender.api @@ -1,9 +1,6 @@ public abstract interface class com/gojek/mqtt/pingsender/AdaptiveMqttPingSender : com/gojek/mqtt/pingsender/MqttPingSender { } -public final class com/gojek/mqtt/pingsender/KeepAliveKt { -} - public abstract interface class com/gojek/mqtt/pingsender/MqttPingSender { public abstract fun init (Lin/mohalla/paho/client/mqttv3/internal/ClientComms;Lin/mohalla/paho/client/mqttv3/ILogger;)V public abstract fun schedule (J)V @@ -11,6 +8,3 @@ public abstract interface class com/gojek/mqtt/pingsender/MqttPingSender { public abstract fun stop ()V } -public final class com/gojek/mqtt/pingsender/MqttPingSenderKt { -} - diff --git a/pingsender/timer-pingsender/api/timer-pingsender.api b/pingsender/timer-pingsender/api/timer-pingsender.api index ba6eacd3..c80c7588 100644 --- a/pingsender/timer-pingsender/api/timer-pingsender.api +++ b/pingsender/timer-pingsender/api/timer-pingsender.api @@ -1,8 +1,21 @@ +public final class com/gojek/timer/pingsender/TimerPingSenderConfig { + public fun ()V + public fun (Z)V + public synthetic fun (ZILkotlin/jvm/internal/DefaultConstructorMarker;)V + public final fun component1 ()Z + public final fun copy (Z)Lcom/gojek/timer/pingsender/TimerPingSenderConfig; + public static synthetic fun copy$default (Lcom/gojek/timer/pingsender/TimerPingSenderConfig;ZILjava/lang/Object;)Lcom/gojek/timer/pingsender/TimerPingSenderConfig; + public fun equals (Ljava/lang/Object;)Z + public final fun getSendForcePing ()Z + public fun hashCode ()I + public fun toString ()Ljava/lang/String; +} + public final class com/gojek/timer/pingsender/TimerPingSenderFactory { public static final field Companion Lcom/gojek/timer/pingsender/TimerPingSenderFactory$Companion; } public final class com/gojek/timer/pingsender/TimerPingSenderFactory$Companion { - public final fun create ()Lcom/gojek/mqtt/pingsender/MqttPingSender; + public final fun create (Lcom/gojek/timer/pingsender/TimerPingSenderConfig;)Lcom/gojek/mqtt/pingsender/MqttPingSender; } diff --git a/pingsender/timer-pingsender/src/main/java/com/gojek/timer/pingsender/TimerPingSender.kt b/pingsender/timer-pingsender/src/main/java/com/gojek/timer/pingsender/TimerPingSender.kt index 04e14fa9..5e5a76ad 100644 --- a/pingsender/timer-pingsender/src/main/java/com/gojek/timer/pingsender/TimerPingSender.kt +++ b/pingsender/timer-pingsender/src/main/java/com/gojek/timer/pingsender/TimerPingSender.kt @@ -22,6 +22,7 @@ import java.util.TimerTask * @see MqttPingSender */ internal class TimerPingSender( + private val pingSenderConfig: TimerPingSenderConfig, private val clock: Clock = Clock(), private val timerFactory: TimerFactory = TimerFactory() ) : MqttPingSender { @@ -71,7 +72,7 @@ internal class TimerPingSender( val serverUri = comms.client?.serverURI ?: "" val keepAliveMillis = comms.keepAlive pingSenderEvents.mqttPingInitiated(comms.client.serverURI, keepAliveMillis.fromMillisToSeconds()) - val token = comms.checkForActivity() + val token = comms.checkForActivity(pingSenderConfig.sendForcePing) if (token == null) { logger.d(TAG, "Mqtt Ping Token null") pingSenderEvents.pingMqttTokenNull(serverUri, keepAliveMillis.fromMillisToSeconds()) diff --git a/pingsender/timer-pingsender/src/main/java/com/gojek/timer/pingsender/TimerPingSenderConfig.kt b/pingsender/timer-pingsender/src/main/java/com/gojek/timer/pingsender/TimerPingSenderConfig.kt new file mode 100644 index 00000000..4c3af5b8 --- /dev/null +++ b/pingsender/timer-pingsender/src/main/java/com/gojek/timer/pingsender/TimerPingSenderConfig.kt @@ -0,0 +1,5 @@ +package com.gojek.timer.pingsender + +data class TimerPingSenderConfig( + val sendForcePing: Boolean = false +) diff --git a/pingsender/timer-pingsender/src/main/java/com/gojek/timer/pingsender/TimerPingSenderFactory.kt b/pingsender/timer-pingsender/src/main/java/com/gojek/timer/pingsender/TimerPingSenderFactory.kt index e30bcca2..4c8c8631 100644 --- a/pingsender/timer-pingsender/src/main/java/com/gojek/timer/pingsender/TimerPingSenderFactory.kt +++ b/pingsender/timer-pingsender/src/main/java/com/gojek/timer/pingsender/TimerPingSenderFactory.kt @@ -4,8 +4,8 @@ import com.gojek.mqtt.pingsender.MqttPingSender class TimerPingSenderFactory private constructor() { companion object { - fun create(): MqttPingSender { - return TimerPingSender() + fun create(timerPingSenderConfig: TimerPingSenderConfig): MqttPingSender { + return TimerPingSender(timerPingSenderConfig) } } } diff --git a/pingsender/timer-pingsender/src/test/java/com/gojek/timer/pingsender/TimerPingSenderTest.kt b/pingsender/timer-pingsender/src/test/java/com/gojek/timer/pingsender/TimerPingSenderTest.kt index e297698b..303829ec 100644 --- a/pingsender/timer-pingsender/src/test/java/com/gojek/timer/pingsender/TimerPingSenderTest.kt +++ b/pingsender/timer-pingsender/src/test/java/com/gojek/timer/pingsender/TimerPingSenderTest.kt @@ -28,8 +28,9 @@ class TimerPingSenderTest { private val comms = mock() private val logger = mock() private val pingSenderEvents = mock() + private val pingSenderConfig = mock() - private val pingSender = TimerPingSender(clock, timerFactory) + private val pingSender = TimerPingSender(pingSenderConfig, clock, timerFactory) @Before fun setup() { @@ -72,10 +73,11 @@ class TimerPingSenderTest { val mqttClient = mock() val testUri = "test-uri" val keepaliveMillis = 30000L + whenever(pingSenderConfig.sendForcePing).thenReturn(false) whenever(comms.client).thenReturn(mqttClient) whenever(mqttClient.serverURI).thenReturn(testUri) whenever(comms.keepAlive).thenReturn(keepaliveMillis) - whenever(comms.checkForActivity()).thenReturn(null) + whenever(comms.checkForActivity(false)).thenReturn(null) pingSender.PingTask().run() @@ -91,10 +93,36 @@ class TimerPingSenderTest { val keepaliveMillis = 30000L val startTime = TimeUnit.MILLISECONDS.toNanos(100) val endTime = TimeUnit.MILLISECONDS.toNanos(110) + whenever(pingSenderConfig.sendForcePing).thenReturn(false) whenever(comms.client).thenReturn(mqttClient) whenever(mqttClient.serverURI).thenReturn(testUri) whenever(comms.keepAlive).thenReturn(keepaliveMillis) - whenever(comms.checkForActivity()).thenReturn(mqttToken) + whenever(comms.checkForActivity(false)).thenReturn(mqttToken) + whenever(clock.nanoTime()).thenReturn(startTime, endTime) + + pingSender.PingTask().run() + + verify(pingSenderEvents).mqttPingInitiated(testUri, keepaliveMillis / 1000) + + val argumentCaptor = argumentCaptor() + verify(mqttToken).actionCallback = argumentCaptor.capture() + argumentCaptor.lastValue.onSuccess(mqttToken) + verify(pingSenderEvents).pingEventSuccess(testUri, 10, keepaliveMillis / 1000) + } + + @Test + fun `test sendPing when ping can be sent successfully with sendForcePing=true`() { + val mqttClient = mock() + val mqttToken = mock() + val testUri = "test-uri" + val keepaliveMillis = 30000L + val startTime = TimeUnit.MILLISECONDS.toNanos(100) + val endTime = TimeUnit.MILLISECONDS.toNanos(110) + whenever(pingSenderConfig.sendForcePing).thenReturn(true) + whenever(comms.client).thenReturn(mqttClient) + whenever(mqttClient.serverURI).thenReturn(testUri) + whenever(comms.keepAlive).thenReturn(keepaliveMillis) + whenever(comms.checkForActivity(true)).thenReturn(mqttToken) whenever(clock.nanoTime()).thenReturn(startTime, endTime) pingSender.PingTask().run() @@ -115,10 +143,11 @@ class TimerPingSenderTest { val keepaliveMillis = 30000L val startTime = TimeUnit.MILLISECONDS.toNanos(100) val endTime = TimeUnit.MILLISECONDS.toNanos(110) + whenever(pingSenderConfig.sendForcePing).thenReturn(false) whenever(comms.client).thenReturn(mqttClient) whenever(mqttClient.serverURI).thenReturn(testUri) whenever(comms.keepAlive).thenReturn(keepaliveMillis) - whenever(comms.checkForActivity()).thenReturn(mqttToken) + whenever(comms.checkForActivity(false)).thenReturn(mqttToken) whenever(clock.nanoTime()).thenReturn(startTime, endTime) pingSender.PingTask().run() diff --git a/pingsender/workmanager-2.6.0-pingsender/api/workmanager-2.6.0-pingsender.api b/pingsender/workmanager-2.6.0-pingsender/api/workmanager-2.6.0-pingsender.api index b9909fe7..b1f33754 100644 --- a/pingsender/workmanager-2.6.0-pingsender/api/workmanager-2.6.0-pingsender.api +++ b/pingsender/workmanager-2.6.0-pingsender/api/workmanager-2.6.0-pingsender.api @@ -1,11 +1,13 @@ public final class com/gojek/workmanager/pingsender/WorkManagerPingSenderConfig { public fun ()V - public fun (J)V - public synthetic fun (JILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (JZ)V + public synthetic fun (JZILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()J - public final fun copy (J)Lcom/gojek/workmanager/pingsender/WorkManagerPingSenderConfig; - public static synthetic fun copy$default (Lcom/gojek/workmanager/pingsender/WorkManagerPingSenderConfig;JILjava/lang/Object;)Lcom/gojek/workmanager/pingsender/WorkManagerPingSenderConfig; + public final fun component2 ()Z + public final fun copy (JZ)Lcom/gojek/workmanager/pingsender/WorkManagerPingSenderConfig; + public static synthetic fun copy$default (Lcom/gojek/workmanager/pingsender/WorkManagerPingSenderConfig;JZILjava/lang/Object;)Lcom/gojek/workmanager/pingsender/WorkManagerPingSenderConfig; public fun equals (Ljava/lang/Object;)Z + public final fun getSendForcePing ()Z public final fun getTimeoutSeconds ()J public fun hashCode ()I public fun toString ()Ljava/lang/String; diff --git a/pingsender/workmanager-2.6.0-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSender.kt b/pingsender/workmanager-2.6.0-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSender.kt index 0ff07957..4815b67d 100644 --- a/pingsender/workmanager-2.6.0-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSender.kt +++ b/pingsender/workmanager-2.6.0-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSender.kt @@ -54,7 +54,7 @@ internal class WorkManagerPingSender( val keepAliveMillis = comms.keepAlive pingSenderEvents.mqttPingInitiated(serverUri, keepAliveMillis.fromMillisToSeconds()) - val token = comms.checkForActivity() + val token = comms.checkForActivity(pingSenderConfig.sendForcePing) if (token == null) { logger.d(TAG, "Mqtt Ping Token null") pingSenderEvents.pingMqttTokenNull(serverUri, keepAliveMillis.fromMillisToSeconds()) diff --git a/pingsender/workmanager-2.6.0-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderConfig.kt b/pingsender/workmanager-2.6.0-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderConfig.kt index 6da389fb..ae1dbbd3 100644 --- a/pingsender/workmanager-2.6.0-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderConfig.kt +++ b/pingsender/workmanager-2.6.0-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderConfig.kt @@ -1,7 +1,8 @@ package com.gojek.workmanager.pingsender data class WorkManagerPingSenderConfig( - val timeoutSeconds: Long = DEFAULT_PING_TIMEOUT_SECS + val timeoutSeconds: Long = DEFAULT_PING_TIMEOUT_SECS, + val sendForcePing: Boolean = false ) internal const val DEFAULT_PING_TIMEOUT_SECS = 30L diff --git a/pingsender/workmanager-2.6.0-pingsender/src/test/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderTest.kt b/pingsender/workmanager-2.6.0-pingsender/src/test/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderTest.kt index f6b24bae..ae97ed1d 100644 --- a/pingsender/workmanager-2.6.0-pingsender/src/test/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderTest.kt +++ b/pingsender/workmanager-2.6.0-pingsender/src/test/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderTest.kt @@ -61,10 +61,11 @@ class WorkManagerPingSenderTest { val mqttClient = mock() val testUri = "test-uri" val keepaliveMillis = 30000L + whenever(pingSenderConfig.sendForcePing).thenReturn(false) whenever(comms.client).thenReturn(mqttClient) whenever(mqttClient.serverURI).thenReturn(testUri) whenever(comms.keepAlive).thenReturn(keepaliveMillis) - whenever(comms.checkForActivity()).thenReturn(null) + whenever(comms.checkForActivity(false)).thenReturn(null) pingSender.sendPing { // do nothing @@ -82,10 +83,40 @@ class WorkManagerPingSenderTest { val keepaliveMillis = 30000L val startTime = TimeUnit.MILLISECONDS.toNanos(100) val endTime = TimeUnit.MILLISECONDS.toNanos(110) + whenever(pingSenderConfig.sendForcePing).thenReturn(false) whenever(comms.client).thenReturn(mqttClient) whenever(mqttClient.serverURI).thenReturn(testUri) whenever(comms.keepAlive).thenReturn(keepaliveMillis) - whenever(comms.checkForActivity()).thenReturn(mqttToken) + whenever(comms.checkForActivity(false)).thenReturn(mqttToken) + whenever(clock.nanoTime()).thenReturn(startTime, endTime) + + var success: Boolean? = null + pingSender.sendPing { + success = it + } + + verify(pingSenderEvents).mqttPingInitiated(testUri, keepaliveMillis / 1000) + + val argumentCaptor = argumentCaptor() + verify(mqttToken).actionCallback = argumentCaptor.capture() + argumentCaptor.lastValue.onSuccess(mqttToken) + assertTrue(success!!) + verify(pingSenderEvents).pingEventSuccess(testUri, 10, keepaliveMillis / 1000) + } + + @Test + fun `test sendPing when ping can be sent successfully with sendForcePing=true`() { + val mqttClient = mock() + val mqttToken = mock() + val testUri = "test-uri" + val keepaliveMillis = 30000L + val startTime = TimeUnit.MILLISECONDS.toNanos(100) + val endTime = TimeUnit.MILLISECONDS.toNanos(110) + whenever(pingSenderConfig.sendForcePing).thenReturn(true) + whenever(comms.client).thenReturn(mqttClient) + whenever(mqttClient.serverURI).thenReturn(testUri) + whenever(comms.keepAlive).thenReturn(keepaliveMillis) + whenever(comms.checkForActivity(true)).thenReturn(mqttToken) whenever(clock.nanoTime()).thenReturn(startTime, endTime) var success: Boolean? = null @@ -110,10 +141,11 @@ class WorkManagerPingSenderTest { val keepaliveMillis = 30000L val startTime = TimeUnit.MILLISECONDS.toNanos(100) val endTime = TimeUnit.MILLISECONDS.toNanos(110) + whenever(pingSenderConfig.sendForcePing).thenReturn(false) whenever(comms.client).thenReturn(mqttClient) whenever(mqttClient.serverURI).thenReturn(testUri) whenever(comms.keepAlive).thenReturn(keepaliveMillis) - whenever(comms.checkForActivity()).thenReturn(mqttToken) + whenever(comms.checkForActivity(false)).thenReturn(mqttToken) whenever(clock.nanoTime()).thenReturn(startTime, endTime) var success: Boolean? = null diff --git a/pingsender/workmanager-pingsender/api/workmanager-pingsender.api b/pingsender/workmanager-pingsender/api/workmanager-pingsender.api index b9909fe7..b1f33754 100644 --- a/pingsender/workmanager-pingsender/api/workmanager-pingsender.api +++ b/pingsender/workmanager-pingsender/api/workmanager-pingsender.api @@ -1,11 +1,13 @@ public final class com/gojek/workmanager/pingsender/WorkManagerPingSenderConfig { public fun ()V - public fun (J)V - public synthetic fun (JILkotlin/jvm/internal/DefaultConstructorMarker;)V + public fun (JZ)V + public synthetic fun (JZILkotlin/jvm/internal/DefaultConstructorMarker;)V public final fun component1 ()J - public final fun copy (J)Lcom/gojek/workmanager/pingsender/WorkManagerPingSenderConfig; - public static synthetic fun copy$default (Lcom/gojek/workmanager/pingsender/WorkManagerPingSenderConfig;JILjava/lang/Object;)Lcom/gojek/workmanager/pingsender/WorkManagerPingSenderConfig; + public final fun component2 ()Z + public final fun copy (JZ)Lcom/gojek/workmanager/pingsender/WorkManagerPingSenderConfig; + public static synthetic fun copy$default (Lcom/gojek/workmanager/pingsender/WorkManagerPingSenderConfig;JZILjava/lang/Object;)Lcom/gojek/workmanager/pingsender/WorkManagerPingSenderConfig; public fun equals (Ljava/lang/Object;)Z + public final fun getSendForcePing ()Z public final fun getTimeoutSeconds ()J public fun hashCode ()I public fun toString ()Ljava/lang/String; diff --git a/pingsender/workmanager-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSender.kt b/pingsender/workmanager-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSender.kt index 0ff07957..4815b67d 100644 --- a/pingsender/workmanager-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSender.kt +++ b/pingsender/workmanager-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSender.kt @@ -54,7 +54,7 @@ internal class WorkManagerPingSender( val keepAliveMillis = comms.keepAlive pingSenderEvents.mqttPingInitiated(serverUri, keepAliveMillis.fromMillisToSeconds()) - val token = comms.checkForActivity() + val token = comms.checkForActivity(pingSenderConfig.sendForcePing) if (token == null) { logger.d(TAG, "Mqtt Ping Token null") pingSenderEvents.pingMqttTokenNull(serverUri, keepAliveMillis.fromMillisToSeconds()) diff --git a/pingsender/workmanager-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderConfig.kt b/pingsender/workmanager-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderConfig.kt index 6da389fb..ae1dbbd3 100644 --- a/pingsender/workmanager-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderConfig.kt +++ b/pingsender/workmanager-pingsender/src/main/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderConfig.kt @@ -1,7 +1,8 @@ package com.gojek.workmanager.pingsender data class WorkManagerPingSenderConfig( - val timeoutSeconds: Long = DEFAULT_PING_TIMEOUT_SECS + val timeoutSeconds: Long = DEFAULT_PING_TIMEOUT_SECS, + val sendForcePing: Boolean = false ) internal const val DEFAULT_PING_TIMEOUT_SECS = 30L diff --git a/pingsender/workmanager-pingsender/src/test/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderTest.kt b/pingsender/workmanager-pingsender/src/test/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderTest.kt index f6b24bae..ae97ed1d 100644 --- a/pingsender/workmanager-pingsender/src/test/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderTest.kt +++ b/pingsender/workmanager-pingsender/src/test/java/com/gojek/workmanager/pingsender/WorkManagerPingSenderTest.kt @@ -61,10 +61,11 @@ class WorkManagerPingSenderTest { val mqttClient = mock() val testUri = "test-uri" val keepaliveMillis = 30000L + whenever(pingSenderConfig.sendForcePing).thenReturn(false) whenever(comms.client).thenReturn(mqttClient) whenever(mqttClient.serverURI).thenReturn(testUri) whenever(comms.keepAlive).thenReturn(keepaliveMillis) - whenever(comms.checkForActivity()).thenReturn(null) + whenever(comms.checkForActivity(false)).thenReturn(null) pingSender.sendPing { // do nothing @@ -82,10 +83,40 @@ class WorkManagerPingSenderTest { val keepaliveMillis = 30000L val startTime = TimeUnit.MILLISECONDS.toNanos(100) val endTime = TimeUnit.MILLISECONDS.toNanos(110) + whenever(pingSenderConfig.sendForcePing).thenReturn(false) whenever(comms.client).thenReturn(mqttClient) whenever(mqttClient.serverURI).thenReturn(testUri) whenever(comms.keepAlive).thenReturn(keepaliveMillis) - whenever(comms.checkForActivity()).thenReturn(mqttToken) + whenever(comms.checkForActivity(false)).thenReturn(mqttToken) + whenever(clock.nanoTime()).thenReturn(startTime, endTime) + + var success: Boolean? = null + pingSender.sendPing { + success = it + } + + verify(pingSenderEvents).mqttPingInitiated(testUri, keepaliveMillis / 1000) + + val argumentCaptor = argumentCaptor() + verify(mqttToken).actionCallback = argumentCaptor.capture() + argumentCaptor.lastValue.onSuccess(mqttToken) + assertTrue(success!!) + verify(pingSenderEvents).pingEventSuccess(testUri, 10, keepaliveMillis / 1000) + } + + @Test + fun `test sendPing when ping can be sent successfully with sendForcePing=true`() { + val mqttClient = mock() + val mqttToken = mock() + val testUri = "test-uri" + val keepaliveMillis = 30000L + val startTime = TimeUnit.MILLISECONDS.toNanos(100) + val endTime = TimeUnit.MILLISECONDS.toNanos(110) + whenever(pingSenderConfig.sendForcePing).thenReturn(true) + whenever(comms.client).thenReturn(mqttClient) + whenever(mqttClient.serverURI).thenReturn(testUri) + whenever(comms.keepAlive).thenReturn(keepaliveMillis) + whenever(comms.checkForActivity(true)).thenReturn(mqttToken) whenever(clock.nanoTime()).thenReturn(startTime, endTime) var success: Boolean? = null @@ -110,10 +141,11 @@ class WorkManagerPingSenderTest { val keepaliveMillis = 30000L val startTime = TimeUnit.MILLISECONDS.toNanos(100) val endTime = TimeUnit.MILLISECONDS.toNanos(110) + whenever(pingSenderConfig.sendForcePing).thenReturn(false) whenever(comms.client).thenReturn(mqttClient) whenever(mqttClient.serverURI).thenReturn(testUri) whenever(comms.keepAlive).thenReturn(keepaliveMillis) - whenever(comms.checkForActivity()).thenReturn(mqttToken) + whenever(comms.checkForActivity(false)).thenReturn(mqttToken) whenever(clock.nanoTime()).thenReturn(startTime, endTime) var success: Boolean? = null