Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tests #15

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ plugins {
`project-report`
// `build-dashboard` // incompatible with Gradle CC
idea
id("org.jetbrains.kotlinx.kover")
}

group = "dev.adamko.kotka"
Expand Down
8 changes: 8 additions & 0 deletions buildSrc/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ dependencies {

// https://github.com/gradle/gradle/issues/15383#issuecomment-779893192
implementation(files(libs.javaClass.superclass.protectionDomain.codeSource.location))

val kotlinxKoverVersion = "0.7.5"
implementation("org.jetbrains.kotlinx:kover-gradle-plugin:${kotlinxKoverVersion}")

val kotlinxKnitVersion = "0.5.0"
implementation("org.jetbrains.kotlinx:kotlinx-knit:${kotlinxKnitVersion}")

implementation("dev.jacomet.gradle.plugins:logging-capabilities:0.10.0")
}

kotlin {
Expand Down
2 changes: 1 addition & 1 deletion buildSrc/settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ dependencyResolutionManagement {
repositoriesMode.set(RepositoriesMode.PREFER_SETTINGS)

repositories {
gradlePluginPortal()
mavenCentral()
gradlePluginPortal()
}

versionCatalogs {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,14 @@ package buildsrc.convention
import org.gradle.kotlin.dsl.support.kotlinCompilerOptions
import org.jetbrains.kotlin.gradle.dsl.KotlinJvmProjectExtension
import org.jetbrains.kotlin.gradle.dsl.KotlinProjectExtension
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion.KOTLIN_1_5
import org.jetbrains.kotlin.gradle.dsl.KotlinVersion.KOTLIN_1_8
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile


plugins {
id("buildsrc.convention.base")
kotlin("jvm")
`java-library`
id("org.jetbrains.kotlinx.kover")
}

kotlin {
Expand Down
174 changes: 174 additions & 0 deletions docs/basics.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
<!--- TEST_NAME BasicsTest -->

**Table of contents**

<!--- TOC -->

* [Naming operators](#naming-operators)
* [Naming operators 2](#naming-operators-2)

<!--- END -->


<!--- INCLUDE .*\.kt
import dev.adamko.kotka.extensions.*
import org.apache.kafka.streams.*
import dev.adamko.kotka.extensions.streams.*
import org.apache.kafka.streams.kstream.*

fun main() {
----- SUFFIX .*\.kt

println("kafkaStreamBuilder described:")
println(kafkaStreamBuilder.build().describe())
println("~~~~~~")
println("kotkaStreamBuilder described:")
println(kotkaStreamBuilder.build().describe())
println("~~~~~~")
}
-->

### Naming operators

https://kafka.apache.org/documentation/streams/developer-guide/dsl-topology-naming.html

* `Grouped`
* `StreamJoined`
* `Joined`
* `StreamJoined`
* `Materialized`
* `Named`

<table>
<tr>
<th>Plain Kafka Streams</th>
<th>Kotka Streams</th>
</tr>
<tr>
<td>

```kotlin
val kafkaStreamBuilder = StreamsBuilder()
val kafkaTransactions: KStream<String, String> = kafkaStreamBuilder
.stream("input", Consumed.`as`("Customer_transactions_input_topic"))

kafkaTransactions
.filter(
{ _, value -> value != "invalid_txn" },
Named.`as`("filter_out_invalid_txns")
)
.mapValues(
{ _, value -> value.take(6) },
Named.`as`("Map_values_to_first_6_characters")
)
.to("output", Produced.`as`("Mapped_transactions_output_topic"))
```

</td>
<td>

```kotlin
val kotkaStreamBuilder = StreamsBuilder()
val kotkaTransactions: KStream<String, String> = kotkaStreamBuilder
// no need for backticks
.stream("input", consumedAs("Customer_transactions_input_topic"))

kotkaTransactions
// tasks can be named directly using a string,
// and the lambda expression can be placed outside the parentheses
.filter(name = "filter_out_invalid_txns") { _, value ->
value != "invalid_txn"
}
.mapValues(name = "Map_values_to_first_6_characters") { _, value ->
value.take(6)
}
.to("output", producedAs("Mapped_transactions_output_topic"))
```


</td>
</tr>
</table>

> You can get the full code [here](./code/example/example-basics-naming-operators-01.kt).

Both examples produce the same topology:

```text
Topologies:
Sub-topology: 0
Source: Customer_transactions_input_topic (topics: [input])
--> filter_out_invalid_txns
Processor: filter_out_invalid_txns (stores: [])
--> Map_values_to_first_6_characters
<-- Customer_transactions_input_topic
Processor: Map_values_to_first_6_characters (stores: [])
--> Mapped_transactions_output_topic
<-- filter_out_invalid_txns
Sink: Mapped_transactions_output_topic (topic: output)
<-- Map_values_to_first_6_characters
```

<!--- TEST -->

### Naming operators 2



```kotlin
val kafkaStreamBuilder = StreamsBuilder()
val kafkaTransactions: KStream<String, String> = kafkaStreamBuilder
.stream("input", Consumed.`as`("Customer_transactions_input_topic"))

kafkaTransactions
.filter(
{ _, v -> v != "invalid_txn" },
Named.`as`("filter_out_invalid_txns")
)
.mapValues(
{ _, v -> v.take(6) },
Named.`as`("Map_values_to_first_6_characters")
)
.to("output", Produced.`as`("Mapped_transactions_output_topic"))
```


```kotlin
val kotkaStreamBuilder = StreamsBuilder()
val kotkaTransactions: KStream<String, String> = kotkaStreamBuilder
// no need for backticks
.stream("input", consumedAs("Customer_transactions_input_topic"))

kotkaTransactions
// tasks can be named directly using a string,
// and the lambda expression can be placed outside the parentheses
.filter(name = "filter_out_invalid_txns") { _, value ->
value != "invalid_txn"
}
.mapValues(name = "Map_values_to_first_6_characters") { _, value ->
value.take(6)
}
.to("output", producedAs("Mapped_transactions_output_topic"))
```


> You can get the full code [here](./code/example/example-basics-naming-operators-02.kt).

Both examples produce the same topology:

```text
Topologies:
Sub-topology: 0
Source: Customer_transactions_input_topic (topics: [input])
--> filter_out_invalid_txns
Processor: filter_out_invalid_txns (stores: [])
--> Map_values_to_first_6_characters
<-- Customer_transactions_input_topic
Processor: Map_values_to_first_6_characters (stores: [])
--> Mapped_transactions_output_topic
<-- filter_out_invalid_txns
Sink: Mapped_transactions_output_topic (topic: output)
<-- Map_values_to_first_6_characters
```

<!--- TEST -->
57 changes: 57 additions & 0 deletions docs/code/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
plugins {
buildsrc.convention.`kotlin-jvm`
id("org.jetbrains.kotlinx.knit")
}


dependencies {
implementation(projects.modules.kotkaStreamsExtensions)

testImplementation(kotlin("test"))

implementation(libs.slf4j.api)
implementation(libs.slf4j.simple)

implementation(libs.kotlinx.knit)
testImplementation(libs.kotlinx.knitTest)

testImplementation(platform(libs.kotest.bom))
testImplementation(libs.kotest.assertionsCore)
}


sourceSets.test {
java.srcDirs(
layout.projectDirectory.dir("example"),
layout.projectDirectory.dir("test"),
)
}

knit {
val docsDir = rootProject.layout.projectDirectory.dir("docs")
rootDir = docsDir.asFile
files = project.fileTree(docsDir) {
include("*.md")
}
}

tasks.test {
dependsOn(tasks.knit)
dependsOn(tasks.processResources)
// finalizedBy(tasks.knitCheck)
}

tasks.compileKotlin { mustRunAfter(tasks.knit) }

tasks.knitCheck {
dependsOn(tasks.test)
}

tasks.knit {
inputs.files(
layout.projectDirectory.files(
"knit-tests.ftl",
"knit-includes.ftl",
)
).withPropertyName("knitTemplates")
}
49 changes: 49 additions & 0 deletions docs/code/example/example-basics-naming-operators-01.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// This file was automatically generated from basics.md by Knit tool. Do not edit.
@file:Suppress("PackageDirectoryMismatch", "unused")
package dev.adamko.kotka.example.exampleBasicsNamingOperators01

import dev.adamko.kotka.extensions.*
import org.apache.kafka.streams.*
import dev.adamko.kotka.extensions.streams.*
import org.apache.kafka.streams.kstream.*

fun main() {

val kafkaStreamBuilder = StreamsBuilder()
val kafkaTransactions: KStream<String, String> = kafkaStreamBuilder
.stream("input", Consumed.`as`("Customer_transactions_input_topic"))

kafkaTransactions
.filter(
{ _, value -> value != "invalid_txn" },
Named.`as`("filter_out_invalid_txns")
)
.mapValues(
{ _, value -> value.take(6) },
Named.`as`("Map_values_to_first_6_characters")
)
.to("output", Produced.`as`("Mapped_transactions_output_topic"))

val kotkaStreamBuilder = StreamsBuilder()
val kotkaTransactions: KStream<String, String> = kotkaStreamBuilder
// no need for backticks
.stream("input", consumedAs("Customer_transactions_input_topic"))

kotkaTransactions
// tasks can be named directly using a string,
// and the lambda expression can be placed outside the parentheses
.filter(name = "filter_out_invalid_txns") { _, value ->
value != "invalid_txn"
}
.mapValues(name = "Map_values_to_first_6_characters") { _, value ->
value.take(6)
}
.to("output", producedAs("Mapped_transactions_output_topic"))

println("kafkaStreamBuilder described:")
println(kafkaStreamBuilder.build().describe())
println("~~~~~~")
println("kotkaStreamBuilder described:")
println(kotkaStreamBuilder.build().describe())
println("~~~~~~")
}
49 changes: 49 additions & 0 deletions docs/code/example/example-basics-naming-operators-02.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// This file was automatically generated from basics.md by Knit tool. Do not edit.
@file:Suppress("PackageDirectoryMismatch", "unused")
package dev.adamko.kotka.example.exampleBasicsNamingOperators02

import dev.adamko.kotka.extensions.*
import org.apache.kafka.streams.*
import dev.adamko.kotka.extensions.streams.*
import org.apache.kafka.streams.kstream.*

fun main() {

val kafkaStreamBuilder = StreamsBuilder()
val kafkaTransactions: KStream<String, String> = kafkaStreamBuilder
.stream("input", Consumed.`as`("Customer_transactions_input_topic"))

kafkaTransactions
.filter(
{ _, v -> v != "invalid_txn" },
Named.`as`("filter_out_invalid_txns")
)
.mapValues(
{ _, v -> v.take(6) },
Named.`as`("Map_values_to_first_6_characters")
)
.to("output", Produced.`as`("Mapped_transactions_output_topic"))

val kotkaStreamBuilder = StreamsBuilder()
val kotkaTransactions: KStream<String, String> = kotkaStreamBuilder
// no need for backticks
.stream("input", consumedAs("Customer_transactions_input_topic"))

kotkaTransactions
// tasks can be named directly using a string,
// and the lambda expression can be placed outside the parentheses
.filter(name = "filter_out_invalid_txns") { _, value ->
value != "invalid_txn"
}
.mapValues(name = "Map_values_to_first_6_characters") { _, value ->
value.take(6)
}
.to("output", producedAs("Mapped_transactions_output_topic"))

println("kafkaStreamBuilder described:")
println(kafkaStreamBuilder.build().describe())
println("~~~~~~")
println("kotkaStreamBuilder described:")
println(kotkaStreamBuilder.build().describe())
println("~~~~~~")
}
6 changes: 6 additions & 0 deletions docs/code/knit-include.ftl
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
<#-- @ftlvariable name="knit.name" type="String" -->
<#-- @ftlvariable name="knit.package" type="String" -->
<#-- @ftlvariable name="file.name" type="String" -->
// This file was automatically generated from ${file.name} by Knit tool. Do not edit.
@file:Suppress("PackageDirectoryMismatch", "unused")
package ${knit.package}.${knit.name}
Loading