diff --git a/build.gradle.kts b/build.gradle.kts
index 116e4e7..512a102 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -7,6 +7,7 @@ plugins {
`project-report`
// `build-dashboard` // incompatible with Gradle CC
idea
+ id("org.jetbrains.kotlinx.kover")
}
group = "dev.adamko.kotka"
diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts
index 91c0cf0..9f2c787 100644
--- a/buildSrc/build.gradle.kts
+++ b/buildSrc/build.gradle.kts
@@ -10,6 +10,14 @@ dependencies {
// https://github.com/gradle/gradle/issues/15383#issuecomment-779893192
implementation(files(libs.javaClass.superclass.protectionDomain.codeSource.location))
+
+ val kotlinxKoverVersion = "0.7.5"
+ implementation("org.jetbrains.kotlinx:kover-gradle-plugin:${kotlinxKoverVersion}")
+
+ val kotlinxKnitVersion = "0.5.0"
+ implementation("org.jetbrains.kotlinx:kotlinx-knit:${kotlinxKnitVersion}")
+
+ implementation("dev.jacomet.gradle.plugins:logging-capabilities:0.10.0")
}
kotlin {
diff --git a/buildSrc/settings.gradle.kts b/buildSrc/settings.gradle.kts
index 086d386..fc270e7 100644
--- a/buildSrc/settings.gradle.kts
+++ b/buildSrc/settings.gradle.kts
@@ -12,8 +12,8 @@ dependencyResolutionManagement {
repositoriesMode.set(RepositoriesMode.PREFER_SETTINGS)
repositories {
- gradlePluginPortal()
mavenCentral()
+ gradlePluginPortal()
}
versionCatalogs {
diff --git a/buildSrc/src/main/kotlin/buildsrc/convention/kotlin-jvm.gradle.kts b/buildSrc/src/main/kotlin/buildsrc/convention/kotlin-jvm.gradle.kts
index 753c28b..fbed32e 100644
--- a/buildSrc/src/main/kotlin/buildsrc/convention/kotlin-jvm.gradle.kts
+++ b/buildSrc/src/main/kotlin/buildsrc/convention/kotlin-jvm.gradle.kts
@@ -3,8 +3,6 @@ package buildsrc.convention
import org.gradle.kotlin.dsl.support.kotlinCompilerOptions
import org.jetbrains.kotlin.gradle.dsl.KotlinJvmProjectExtension
import org.jetbrains.kotlin.gradle.dsl.KotlinProjectExtension
-import org.jetbrains.kotlin.gradle.dsl.KotlinVersion.KOTLIN_1_5
-import org.jetbrains.kotlin.gradle.dsl.KotlinVersion.KOTLIN_1_8
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
@@ -12,6 +10,7 @@ plugins {
id("buildsrc.convention.base")
kotlin("jvm")
`java-library`
+ id("org.jetbrains.kotlinx.kover")
}
kotlin {
diff --git a/docs/basics.md b/docs/basics.md
new file mode 100644
index 0000000..895af63
--- /dev/null
+++ b/docs/basics.md
@@ -0,0 +1,174 @@
+
+
+**Table of contents**
+
+
+
+ * [Naming operators](#naming-operators)
+ * [Naming operators 2](#naming-operators-2)
+
+
+
+
+
+
+### Naming operators
+
+https://kafka.apache.org/documentation/streams/developer-guide/dsl-topology-naming.html
+
+* `Grouped`
+* `StreamJoined`
+* `Joined`
+* `StreamJoined`
+* `Materialized`
+* `Named`
+
+
+
+Plain Kafka Streams |
+Kotka Streams |
+
+
+
+
+```kotlin
+val kafkaStreamBuilder = StreamsBuilder()
+val kafkaTransactions: KStream = kafkaStreamBuilder
+ .stream("input", Consumed.`as`("Customer_transactions_input_topic"))
+
+kafkaTransactions
+ .filter(
+ { _, value -> value != "invalid_txn" },
+ Named.`as`("filter_out_invalid_txns")
+ )
+ .mapValues(
+ { _, value -> value.take(6) },
+ Named.`as`("Map_values_to_first_6_characters")
+ )
+ .to("output", Produced.`as`("Mapped_transactions_output_topic"))
+```
+
+ |
+
+
+```kotlin
+val kotkaStreamBuilder = StreamsBuilder()
+val kotkaTransactions: KStream = kotkaStreamBuilder
+ // no need for backticks
+ .stream("input", consumedAs("Customer_transactions_input_topic"))
+
+kotkaTransactions
+ // tasks can be named directly using a string,
+ // and the lambda expression can be placed outside the parentheses
+ .filter(name = "filter_out_invalid_txns") { _, value ->
+ value != "invalid_txn"
+ }
+ .mapValues(name = "Map_values_to_first_6_characters") { _, value ->
+ value.take(6)
+ }
+ .to("output", producedAs("Mapped_transactions_output_topic"))
+```
+
+
+ |
+
+
+
+> You can get the full code [here](./code/example/example-basics-naming-operators-01.kt).
+
+Both examples produce the same topology:
+
+```text
+Topologies:
+ Sub-topology: 0
+ Source: Customer_transactions_input_topic (topics: [input])
+ --> filter_out_invalid_txns
+ Processor: filter_out_invalid_txns (stores: [])
+ --> Map_values_to_first_6_characters
+ <-- Customer_transactions_input_topic
+ Processor: Map_values_to_first_6_characters (stores: [])
+ --> Mapped_transactions_output_topic
+ <-- filter_out_invalid_txns
+ Sink: Mapped_transactions_output_topic (topic: output)
+ <-- Map_values_to_first_6_characters
+```
+
+
+
+### Naming operators 2
+
+
+
+```kotlin
+val kafkaStreamBuilder = StreamsBuilder()
+val kafkaTransactions: KStream = kafkaStreamBuilder
+ .stream("input", Consumed.`as`("Customer_transactions_input_topic"))
+
+kafkaTransactions
+ .filter(
+ { _, v -> v != "invalid_txn" },
+ Named.`as`("filter_out_invalid_txns")
+ )
+ .mapValues(
+ { _, v -> v.take(6) },
+ Named.`as`("Map_values_to_first_6_characters")
+ )
+ .to("output", Produced.`as`("Mapped_transactions_output_topic"))
+```
+
+
+```kotlin
+val kotkaStreamBuilder = StreamsBuilder()
+val kotkaTransactions: KStream = kotkaStreamBuilder
+ // no need for backticks
+ .stream("input", consumedAs("Customer_transactions_input_topic"))
+
+kotkaTransactions
+ // tasks can be named directly using a string,
+ // and the lambda expression can be placed outside the parentheses
+ .filter(name = "filter_out_invalid_txns") { _, value ->
+ value != "invalid_txn"
+ }
+ .mapValues(name = "Map_values_to_first_6_characters") { _, value ->
+ value.take(6)
+ }
+ .to("output", producedAs("Mapped_transactions_output_topic"))
+```
+
+
+> You can get the full code [here](./code/example/example-basics-naming-operators-02.kt).
+
+Both examples produce the same topology:
+
+```text
+Topologies:
+ Sub-topology: 0
+ Source: Customer_transactions_input_topic (topics: [input])
+ --> filter_out_invalid_txns
+ Processor: filter_out_invalid_txns (stores: [])
+ --> Map_values_to_first_6_characters
+ <-- Customer_transactions_input_topic
+ Processor: Map_values_to_first_6_characters (stores: [])
+ --> Mapped_transactions_output_topic
+ <-- filter_out_invalid_txns
+ Sink: Mapped_transactions_output_topic (topic: output)
+ <-- Map_values_to_first_6_characters
+```
+
+
diff --git a/docs/code/build.gradle.kts b/docs/code/build.gradle.kts
new file mode 100644
index 0000000..aea3941
--- /dev/null
+++ b/docs/code/build.gradle.kts
@@ -0,0 +1,57 @@
+plugins {
+ buildsrc.convention.`kotlin-jvm`
+ id("org.jetbrains.kotlinx.knit")
+}
+
+
+dependencies {
+ implementation(projects.modules.kotkaStreamsExtensions)
+
+ testImplementation(kotlin("test"))
+
+ implementation(libs.slf4j.api)
+ implementation(libs.slf4j.simple)
+
+ implementation(libs.kotlinx.knit)
+ testImplementation(libs.kotlinx.knitTest)
+
+ testImplementation(platform(libs.kotest.bom))
+ testImplementation(libs.kotest.assertionsCore)
+}
+
+
+sourceSets.test {
+ java.srcDirs(
+ layout.projectDirectory.dir("example"),
+ layout.projectDirectory.dir("test"),
+ )
+}
+
+knit {
+ val docsDir = rootProject.layout.projectDirectory.dir("docs")
+ rootDir = docsDir.asFile
+ files = project.fileTree(docsDir) {
+ include("*.md")
+ }
+}
+
+tasks.test {
+ dependsOn(tasks.knit)
+ dependsOn(tasks.processResources)
+// finalizedBy(tasks.knitCheck)
+}
+
+tasks.compileKotlin { mustRunAfter(tasks.knit) }
+
+tasks.knitCheck {
+ dependsOn(tasks.test)
+}
+
+tasks.knit {
+ inputs.files(
+ layout.projectDirectory.files(
+ "knit-tests.ftl",
+ "knit-includes.ftl",
+ )
+ ).withPropertyName("knitTemplates")
+}
diff --git a/docs/code/example/example-basics-naming-operators-01.kt b/docs/code/example/example-basics-naming-operators-01.kt
new file mode 100644
index 0000000..d22a9fe
--- /dev/null
+++ b/docs/code/example/example-basics-naming-operators-01.kt
@@ -0,0 +1,49 @@
+// This file was automatically generated from basics.md by Knit tool. Do not edit.
+@file:Suppress("PackageDirectoryMismatch", "unused")
+package dev.adamko.kotka.example.exampleBasicsNamingOperators01
+
+import dev.adamko.kotka.extensions.*
+import org.apache.kafka.streams.*
+import dev.adamko.kotka.extensions.streams.*
+import org.apache.kafka.streams.kstream.*
+
+fun main() {
+
+val kafkaStreamBuilder = StreamsBuilder()
+val kafkaTransactions: KStream = kafkaStreamBuilder
+ .stream("input", Consumed.`as`("Customer_transactions_input_topic"))
+
+kafkaTransactions
+ .filter(
+ { _, value -> value != "invalid_txn" },
+ Named.`as`("filter_out_invalid_txns")
+ )
+ .mapValues(
+ { _, value -> value.take(6) },
+ Named.`as`("Map_values_to_first_6_characters")
+ )
+ .to("output", Produced.`as`("Mapped_transactions_output_topic"))
+
+val kotkaStreamBuilder = StreamsBuilder()
+val kotkaTransactions: KStream = kotkaStreamBuilder
+ // no need for backticks
+ .stream("input", consumedAs("Customer_transactions_input_topic"))
+
+kotkaTransactions
+ // tasks can be named directly using a string,
+ // and the lambda expression can be placed outside the parentheses
+ .filter(name = "filter_out_invalid_txns") { _, value ->
+ value != "invalid_txn"
+ }
+ .mapValues(name = "Map_values_to_first_6_characters") { _, value ->
+ value.take(6)
+ }
+ .to("output", producedAs("Mapped_transactions_output_topic"))
+
+ println("kafkaStreamBuilder described:")
+ println(kafkaStreamBuilder.build().describe())
+ println("~~~~~~")
+ println("kotkaStreamBuilder described:")
+ println(kotkaStreamBuilder.build().describe())
+ println("~~~~~~")
+}
diff --git a/docs/code/example/example-basics-naming-operators-02.kt b/docs/code/example/example-basics-naming-operators-02.kt
new file mode 100644
index 0000000..17e320e
--- /dev/null
+++ b/docs/code/example/example-basics-naming-operators-02.kt
@@ -0,0 +1,49 @@
+// This file was automatically generated from basics.md by Knit tool. Do not edit.
+@file:Suppress("PackageDirectoryMismatch", "unused")
+package dev.adamko.kotka.example.exampleBasicsNamingOperators02
+
+import dev.adamko.kotka.extensions.*
+import org.apache.kafka.streams.*
+import dev.adamko.kotka.extensions.streams.*
+import org.apache.kafka.streams.kstream.*
+
+fun main() {
+
+val kafkaStreamBuilder = StreamsBuilder()
+val kafkaTransactions: KStream = kafkaStreamBuilder
+ .stream("input", Consumed.`as`("Customer_transactions_input_topic"))
+
+kafkaTransactions
+ .filter(
+ { _, v -> v != "invalid_txn" },
+ Named.`as`("filter_out_invalid_txns")
+ )
+ .mapValues(
+ { _, v -> v.take(6) },
+ Named.`as`("Map_values_to_first_6_characters")
+ )
+ .to("output", Produced.`as`("Mapped_transactions_output_topic"))
+
+val kotkaStreamBuilder = StreamsBuilder()
+val kotkaTransactions: KStream = kotkaStreamBuilder
+ // no need for backticks
+ .stream("input", consumedAs("Customer_transactions_input_topic"))
+
+kotkaTransactions
+ // tasks can be named directly using a string,
+ // and the lambda expression can be placed outside the parentheses
+ .filter(name = "filter_out_invalid_txns") { _, value ->
+ value != "invalid_txn"
+ }
+ .mapValues(name = "Map_values_to_first_6_characters") { _, value ->
+ value.take(6)
+ }
+ .to("output", producedAs("Mapped_transactions_output_topic"))
+
+ println("kafkaStreamBuilder described:")
+ println(kafkaStreamBuilder.build().describe())
+ println("~~~~~~")
+ println("kotkaStreamBuilder described:")
+ println(kotkaStreamBuilder.build().describe())
+ println("~~~~~~")
+}
diff --git a/docs/code/knit-include.ftl b/docs/code/knit-include.ftl
new file mode 100644
index 0000000..766a866
--- /dev/null
+++ b/docs/code/knit-include.ftl
@@ -0,0 +1,6 @@
+<#-- @ftlvariable name="knit.name" type="String" -->
+<#-- @ftlvariable name="knit.package" type="String" -->
+<#-- @ftlvariable name="file.name" type="String" -->
+// This file was automatically generated from ${file.name} by Knit tool. Do not edit.
+@file:Suppress("PackageDirectoryMismatch", "unused")
+package ${knit.package}.${knit.name}
diff --git a/docs/code/knit-test.ftl b/docs/code/knit-test.ftl
new file mode 100644
index 0000000..1f49afe
--- /dev/null
+++ b/docs/code/knit-test.ftl
@@ -0,0 +1,55 @@
+<#-- @ftlvariable name="test.name" type="java.lang.String" -->
+<#-- @ftlvariable name="test.package" type="java.lang.String" -->
+<#-- @ftlvariable name="test.language" type="java.lang.String" -->
+// Knit tool automatically generated this file from ${file.name}. Do not edit.
+@file:Suppress("JSUnusedLocalSymbols")
+package ${test.package}
+
+import io.kotest.matchers.*
+import kotlinx.knit.test.*
+import org.junit.jupiter.api.Test
+
+class ${test.name} {
+<#list cases as case><#assign method = test["mode.${case.param}"]!"custom">
+ @Test
+ fun test${case.name}() {
+
+ // language=${test.language}
+ val expected = """
+ <#list case.lines as line>
+ |${line}
+ #list>
+ """.trimMargin().trim()
+
+ val output = captureOutput("${case.name}") {
+ ${case.knit.package}.${case.knit.name}.main()
+ }
+ .joinToString("\n")
+
+ val kafkaStreamDescription = output
+ .substringAfter("kafkaStreamBuilder described:", "")
+ .substringBefore("~~~~~~", missingDelimiterValue = "")
+ .trim()
+ val kotkaStreamDescription = output
+ .substringAfter("kotkaStreamBuilder described:", "")
+ .substringBefore("~~~~~~", missingDelimiterValue = "")
+ .trim()
+
+ kafkaStreamDescription.shouldBe(expected)
+ kafkaStreamDescription.shouldBe(expected)
+ kafkaStreamDescription.shouldBe(kotkaStreamDescription)
+
+<#-- <#if method != "custom">.${method}(-->
+<#-- // language=${test.language}-->
+<#-- """-->
+<#-- """-->
+<#-- )-->
+<#--<#else>.also { lines ->-->
+<#-- check(${case.param})-->
+<#-- }-->
+<#--#if>-->
+ }
+<#sep>
+
+#list>
+}
diff --git a/docs/code/test/BasicsTest.kt b/docs/code/test/BasicsTest.kt
new file mode 100644
index 0000000..edff99e
--- /dev/null
+++ b/docs/code/test/BasicsTest.kt
@@ -0,0 +1,87 @@
+// Knit tool automatically generated this file from basics.md. Do not edit.
+@file:Suppress("JSUnusedLocalSymbols")
+package dev.adamko.kotka.example.test
+
+import io.kotest.matchers.*
+import kotlinx.knit.test.*
+import org.junit.jupiter.api.Test
+
+class BasicsTest {
+ @Test
+ fun testExampleBasicsNamingOperators01() {
+
+ // language=text
+ val expected = """
+ |Topologies:
+ | Sub-topology: 0
+ | Source: Customer_transactions_input_topic (topics: [input])
+ | --> filter_out_invalid_txns
+ | Processor: filter_out_invalid_txns (stores: [])
+ | --> Map_values_to_first_6_characters
+ | <-- Customer_transactions_input_topic
+ | Processor: Map_values_to_first_6_characters (stores: [])
+ | --> Mapped_transactions_output_topic
+ | <-- filter_out_invalid_txns
+ | Sink: Mapped_transactions_output_topic (topic: output)
+ | <-- Map_values_to_first_6_characters
+ """.trimMargin().trim()
+
+ val output = captureOutput("ExampleBasicsNamingOperators01") {
+ dev.adamko.kotka.example.exampleBasicsNamingOperators01.main()
+ }
+ .joinToString("\n")
+
+ val kafkaStreamDescription = output
+ .substringAfter("kafkaStreamBuilder described:", "")
+ .substringBefore("~~~~~~", missingDelimiterValue = "")
+ .trim()
+ val kotkaStreamDescription = output
+ .substringAfter("kotkaStreamBuilder described:", "")
+ .substringBefore("~~~~~~", missingDelimiterValue = "")
+ .trim()
+
+ kafkaStreamDescription.shouldBe(expected)
+ kafkaStreamDescription.shouldBe(expected)
+ kafkaStreamDescription.shouldBe(kotkaStreamDescription)
+
+ }
+
+ @Test
+ fun testExampleBasicsNamingOperators02() {
+
+ // language=text
+ val expected = """
+ |Topologies:
+ | Sub-topology: 0
+ | Source: Customer_transactions_input_topic (topics: [input])
+ | --> filter_out_invalid_txns
+ | Processor: filter_out_invalid_txns (stores: [])
+ | --> Map_values_to_first_6_characters
+ | <-- Customer_transactions_input_topic
+ | Processor: Map_values_to_first_6_characters (stores: [])
+ | --> Mapped_transactions_output_topic
+ | <-- filter_out_invalid_txns
+ | Sink: Mapped_transactions_output_topic (topic: output)
+ | <-- Map_values_to_first_6_characters
+ """.trimMargin().trim()
+
+ val output = captureOutput("ExampleBasicsNamingOperators02") {
+ dev.adamko.kotka.example.exampleBasicsNamingOperators02.main()
+ }
+ .joinToString("\n")
+
+ val kafkaStreamDescription = output
+ .substringAfter("kafkaStreamBuilder described:", "")
+ .substringBefore("~~~~~~", missingDelimiterValue = "")
+ .trim()
+ val kotkaStreamDescription = output
+ .substringAfter("kotkaStreamBuilder described:", "")
+ .substringBefore("~~~~~~", missingDelimiterValue = "")
+ .trim()
+
+ kafkaStreamDescription.shouldBe(expected)
+ kafkaStreamDescription.shouldBe(expected)
+ kafkaStreamDescription.shouldBe(kotkaStreamDescription)
+
+ }
+}
diff --git a/docs/knit.properties b/docs/knit.properties
new file mode 100644
index 0000000..2698000
--- /dev/null
+++ b/docs/knit.properties
@@ -0,0 +1,9 @@
+knit.dir=./code/example/
+test.dir=./code/test/
+knit.package=dev.adamko.kotka.example
+test.package=dev.adamko.kotka.example.test
+#
+test.template=./code/knit-test.ftl
+test.language=text
+knit.include=./code/knit-include.ftl
+test.mode.=joinToString(\"\\n\")\n .trim()\n .shouldBe
diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml
index 6c28465..5e65e10 100644
--- a/gradle/libs.versions.toml
+++ b/gradle/libs.versions.toml
@@ -9,7 +9,7 @@ jvm = "11"
kafka = "3.6.1" # https://kafka.apache.org/downloads
kotlinx-serialization = "1.6.2" # https://github.com/Kotlin/kotlinx.serialization/releases/tag/v1.3.3
-kotlinx-knit = "0.5.1" # https://github.com/Kotlin/kotlinx-knit/releases
+kotlinx-knit = "0.5.0" # https://github.com/Kotlin/kotlinx-knit/releases
kotlin-dokka = "1.9.10" # https://search.maven.org/artifact/org.jetbrains.dokka/dokka-gradle-plugin
slf4j = "2.0.11"
@@ -32,6 +32,7 @@ dokkatoo = "2.0.0"
kotlin-bom = { module = "org.jetbrains.kotlin:kotlin-bom", version.ref = "kotlin" }
kafka-streams = { module = "org.apache.kafka:kafka-streams", version.ref = "kafka" }
+kafka-streamsTestUtils = { module = "org.apache.kafka:kafka-streams-test-utils", version.ref = "kafka" }
kotlinxSerialization-bom = { module = "org.jetbrains.kotlinx:kotlinx-serialization-bom", version.ref = "kotlinx-serialization" }
kotlinxSerialization-core = { module = "org.jetbrains.kotlinx:kotlinx-serialization-core" }
@@ -59,7 +60,6 @@ kotest-frameworkEngine = { module = "io.kotest:kotest-framework-engine" }
kotest-frameworkDatatest = { module = "io.kotest:kotest-framework-datatest" }
kotest-runnerJUnit5 = { module = "io.kotest:kotest-runner-junit5" }
-
# Knit
kotlinx-knit = { module = "org.jetbrains.kotlinx:kotlinx-knit", version.ref = "kotlinx-knit" }
kotlinx-knitTest = { module = "org.jetbrains.kotlinx:kotlinx-knit-test", version.ref = "kotlinx-knit" }
diff --git a/modules/kotka-streams-extensions/build.gradle.kts b/modules/kotka-streams-extensions/build.gradle.kts
index d740cf1..e90cc43 100644
--- a/modules/kotka-streams-extensions/build.gradle.kts
+++ b/modules/kotka-streams-extensions/build.gradle.kts
@@ -8,10 +8,14 @@ description = "Kotlin extensions for Kafka Streams"
dependencies {
+ api(libs.kafka.streams)
+
implementation(platform(projects.modules.versionsPlatform))
implementation(libs.kafka.streams)
+ testImplementation(libs.kafka.streamsTestUtils)
+
testImplementation(libs.kotest.runnerJUnit5)
testImplementation(libs.kotest.assertionsCore)
testImplementation(libs.kotest.property)
diff --git a/modules/kotka-streams-extensions/src/test/kotlin/dev/adamko/kotka/extensions/processor/RecordExtensionsTest.kt b/modules/kotka-streams-extensions/src/test/kotlin/dev/adamko/kotka/extensions/processor/RecordExtensionsTest.kt
index 8fea9b1..7357283 100644
--- a/modules/kotka-streams-extensions/src/test/kotlin/dev/adamko/kotka/extensions/processor/RecordExtensionsTest.kt
+++ b/modules/kotka-streams-extensions/src/test/kotlin/dev/adamko/kotka/extensions/processor/RecordExtensionsTest.kt
@@ -29,7 +29,7 @@ class RecordExtensionsTest : FunSpec({
test("destructuring declaration should return (key, value, timestamp)") {
- val (key, value, timestamp) = record
+ val (key: String, value: String, timestamp: Long) = record
key shouldBe "key 123"
key shouldBe record.key()
diff --git a/modules/kotka-streams-extensions/src/test/kotlin/dev/adamko/kotka/extensions/processor/RecordTests.kt b/modules/kotka-streams-extensions/src/test/kotlin/dev/adamko/kotka/extensions/processor/RecordTests.kt
new file mode 100644
index 0000000..ed5c07e
--- /dev/null
+++ b/modules/kotka-streams-extensions/src/test/kotlin/dev/adamko/kotka/extensions/processor/RecordTests.kt
@@ -0,0 +1,70 @@
+package dev.adamko.kotka.extensions.processor
+
+import io.kotest.core.spec.IsolationMode
+import io.kotest.core.spec.style.BehaviorSpec
+import io.kotest.matchers.shouldBe
+import io.mockk.confirmVerified
+import io.mockk.every
+import io.mockk.mockk
+import io.mockk.verify
+import org.apache.kafka.streams.processor.api.Record
+
+class RecordTests : BehaviorSpec({
+ isolationMode = IsolationMode.InstancePerLeaf // so the mockk is fresh for each test
+
+ Given("a Kafka Streams Processor API Record") {
+ val mockRecord: Record = mockk {
+ every { key() } returns "record-key"
+ every { value() } returns "record-value"
+ every { timestamp() } returns 12345L
+ }
+
+ When("the Record extension functions are used") {
+
+ Then("expect component1 returns the Record's key") {
+ mockRecord.component1()
+
+ verify(exactly = 1) { mockRecord.key() }
+ confirmVerified(mockRecord)
+ }
+
+ Then("expect component2 should return the Record's value") {
+ mockRecord.component2()
+
+ verify(exactly = 1) { mockRecord.value() }
+ confirmVerified(mockRecord)
+ }
+
+ Then("expect component3 should return the Record's timestamp") {
+ mockRecord.component3()
+
+ verify(exactly = 1) { mockRecord.timestamp() }
+ confirmVerified(mockRecord)
+ }
+
+ }
+
+ When("the Record is deconstructed") {
+ val (key, value, timestamp) = mockRecord
+
+ Then("expect the Record's key is extracted") {
+ key shouldBe "record-key"
+ }
+
+ Then("expect the Record's value is extracted") {
+ value shouldBe "record-value"
+ }
+
+ Then("expect the Record's timestamp is extracted") {
+ timestamp shouldBe 12345L
+ }
+
+ Then("expect the extension functions are used") {
+ verify(exactly = 1) { mockRecord.key() }
+ verify(exactly = 1) { mockRecord.value() }
+ verify(exactly = 1) { mockRecord.timestamp() }
+ confirmVerified(mockRecord)
+ }
+ }
+ }
+})
diff --git a/modules/kotka-streams-extensions/src/test/kotlin/dev/adamko/kotka/extensions/tables/KTableMapValuesTest.kt b/modules/kotka-streams-extensions/src/test/kotlin/dev/adamko/kotka/extensions/tables/KTableMapValuesTest.kt
new file mode 100644
index 0000000..8873d37
--- /dev/null
+++ b/modules/kotka-streams-extensions/src/test/kotlin/dev/adamko/kotka/extensions/tables/KTableMapValuesTest.kt
@@ -0,0 +1,24 @@
+package dev.adamko.kotka.extensions.tables
+
+import io.kotest.core.spec.style.BehaviorSpec
+import io.mockk.confirmVerified
+import io.mockk.mockk
+import io.mockk.verify
+import org.apache.kafka.streams.kstream.KTable
+import org.apache.kafka.streams.kstream.ValueMapperWithKey
+
+class KTableMapValuesTest : BehaviorSpec({
+
+ Given("a KTable") {
+ val mockKTable: KTable = mockk(relaxed = true) {}
+
+ When("mapValues, mapper only") {
+ mockKTable.mapValues(mapper = { key, value -> "dummy mapper $key $value" })
+ Then("expect MapValues method is called") {
+ verify(exactly = 1) { mockKTable.mapValues(any>()) }
+ confirmVerified(mockKTable)
+ }
+ }
+ }
+
+})
diff --git a/settings.gradle.kts b/settings.gradle.kts
index ca0b44d..2ed0bf2 100644
--- a/settings.gradle.kts
+++ b/settings.gradle.kts
@@ -20,6 +20,7 @@ dependencyResolutionManagement {
include(
":docs",
+ ":docs:code",
":modules:kotka-streams-extensions",
":modules:kotka-streams-framework",