Skip to content

IDE sample of "unsupported sources"->DataFrame #1231

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ allprojects {
logger.warn("Could not set ktlint config on :${this.name}")
}

// set the java toolchain version to 11 for all subprojects for CI stability
// set the java toolchain version to 21 for all subprojects for CI stability
extensions.findByType<KotlinJvmProjectExtension>()?.jvmToolchain(21)

// Attempts to configure buildConfig for each sub-project that uses it
Expand Down
12 changes: 12 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@
* [json](idea-examples/json) Using OpenAPI support in DataFrame's Gradle and KSP plugins to access data from [API guru](https://apis.guru/) in a type-safe manner
* [imdb sql database](https://github.com/zaleslaw/KotlinDataFrame-SQL-Examples) This project prominently showcases how to convert data from an SQL table to a Kotlin DataFrame
and how to transform the result of an SQL query into a DataFrame.
* [unsupported-data-sources](idea-examples/unsupported-data-sources) Showcases of how to use DataFrame with
(momentarily) unsupported data libraries such as [Spark](https://spark.apache.org/) and [Exposed](https://github.com/JetBrains/Exposed).
They show how to convert to and from Kotlin Dataframe and their respective tables.
* **JetBrains Exposed**: See the [exposed folder](./idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/exposed)
for an example of using Kotlin Dataframe with [Exposed](https://github.com/JetBrains/Exposed).
* **Apache Spark**: See the [spark folder](./idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/spark)
for an example of using Kotlin Dataframe with [Spark](https://spark.apache.org/).
* **Spark (with Kotlin Spark API)**: See the [kotlinSpark folder](./idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/kotlinSpark)
for an example of using Kotlin DataFrame with the [Kotlin Spark API](https://github.com/JetBrains/kotlin-spark-api).
* **Multik**: See the [multik folder](./idea-examples/unsupported-data-sources/src/main/kotlin/org/jetbrains/kotlinx/dataframe/examples/multik)
for an example of using Kotlin Dataframe with [Multik](https://github.com/Kotlin/multik).


### Notebook examples

Expand Down
73 changes: 73 additions & 0 deletions examples/idea-examples/unsupported-data-sources/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
plugins {
application
kotlin("jvm")

id("org.jetbrains.kotlinx.dataframe")

// only mandatory if `kotlin.dataframe.add.ksp=false` in gradle.properties
id("com.google.devtools.ksp")
}

repositories {
mavenLocal() // in case of local dataframe development
mavenCentral()
}

dependencies {
// implementation("org.jetbrains.kotlinx:dataframe:X.Y.Z")
implementation(project(":"))

// exposed + sqlite database support
implementation(libs.sqlite)
implementation(libs.exposed.core)
implementation(libs.exposed.kotlin.datetime)
implementation(libs.exposed.jdbc)
implementation(libs.exposed.json)
implementation(libs.exposed.money)

// (kotlin) spark support
implementation(libs.kotlin.spark)
compileOnly(libs.spark)
implementation(libs.log4j.core)
implementation(libs.log4j.api)

// multik support
implementation(libs.multik.core)
implementation(libs.multik.default)
}

/**
* Runs the kotlinSpark/typedDataset example with java 11.
*/
val runKotlinSparkTypedDataset by tasks.registering(JavaExec::class) {
classpath = sourceSets["main"].runtimeClasspath
javaLauncher = javaToolchains.launcherFor { languageVersion = JavaLanguageVersion.of(11) }
mainClass = "org.jetbrains.kotlinx.dataframe.examples.kotlinSpark.TypedDatasetKt"
}

/**
* Runs the kotlinSpark/untypedDataset example with java 11.
*/
val runKotlinSparkUntypedDataset by tasks.registering(JavaExec::class) {
classpath = sourceSets["main"].runtimeClasspath
javaLauncher = javaToolchains.launcherFor { languageVersion = JavaLanguageVersion.of(11) }
mainClass = "org.jetbrains.kotlinx.dataframe.examples.kotlinSpark.UntypedDatasetKt"
}

/**
* Runs the spark/typedDataset example with java 11.
*/
val runSparkTypedDataset by tasks.registering(JavaExec::class) {
classpath = sourceSets["main"].runtimeClasspath
javaLauncher = javaToolchains.launcherFor { languageVersion = JavaLanguageVersion.of(11) }
mainClass = "org.jetbrains.kotlinx.dataframe.examples.spark.TypedDatasetKt"
}

/**
* Runs the spark/untypedDataset example with java 11.
*/
val runSparkUntypedDataset by tasks.registering(JavaExec::class) {
classpath = sourceSets["main"].runtimeClasspath
javaLauncher = javaToolchains.launcherFor { languageVersion = JavaLanguageVersion.of(11) }
mainClass = "org.jetbrains.kotlinx.dataframe.examples.spark.UntypedDatasetKt"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package org.jetbrains.kotlinx.dataframe.examples.exposed

import org.jetbrains.exposed.v1.core.BiCompositeColumn
import org.jetbrains.exposed.v1.core.Column
import org.jetbrains.exposed.v1.core.Expression
import org.jetbrains.exposed.v1.core.ExpressionAlias
import org.jetbrains.exposed.v1.core.ResultRow
import org.jetbrains.exposed.v1.core.Table
import org.jetbrains.exposed.v1.jdbc.Query
import org.jetbrains.kotlinx.dataframe.AnyFrame
import org.jetbrains.kotlinx.dataframe.DataFrame
import org.jetbrains.kotlinx.dataframe.api.convertTo
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
import org.jetbrains.kotlinx.dataframe.codeGen.NameNormalizer
import org.jetbrains.kotlinx.dataframe.impl.schema.DataFrameSchemaImpl
import org.jetbrains.kotlinx.dataframe.schema.ColumnSchema
import org.jetbrains.kotlinx.dataframe.schema.DataFrameSchema
import kotlin.reflect.KProperty1
import kotlin.reflect.full.isSubtypeOf
import kotlin.reflect.full.memberProperties
import kotlin.reflect.typeOf

/**
* Retrieves all columns of any [Iterable][Iterable]`<`[ResultRow][ResultRow]`>`, like [Query][Query],
* from Exposed row by row and converts the resulting [Map] into a [DataFrame], cast to type [T].
*
* In notebooks, the untyped version works just as well due to runtime inference :)
*/
inline fun <reified T : Any> Iterable<ResultRow>.convertToDataFrame(): DataFrame<T> =
convertToDataFrame().convertTo<T>()

/**
* Retrieves all columns of any [Iterable][Iterable]`<`[ResultRow][ResultRow]`>`, like [Query][Query],
* from Exposed row by row and converts the resulting [Map] into a [DataFrame].
*/
@JvmName("convertToAnyFrame")
fun Iterable<ResultRow>.convertToDataFrame(): AnyFrame {
val map = mutableMapOf<String, MutableList<Any?>>()
for (row in this) {
for (expression in row.fieldIndex.keys) {
map.getOrPut(expression.readableName) {
mutableListOf()
} += row[expression]
}
}
return map.toDataFrame()
}

/**
* Retrieves a simple column name from [this] [Expression].
*
* Might need to be expanded with multiple types of [Expression].
*/
val Expression<*>.readableName: String
get() = when (this) {
is Column<*> -> name
is ExpressionAlias<*> -> alias
is BiCompositeColumn<*, *, *> -> getRealColumns().joinToString("_") { it.readableName }
else -> toString()
}

/**
* Creates a [DataFrameSchema] from the declared [Table] instance.
*
* @param columnNameToAccessor Optional [MutableMap] which will be filled with entries mapping
* the SQL column name to the accessor name from the [Table].
* This can be used to define a [NameNormalizer] later.
*/
@Suppress("UNCHECKED_CAST")
fun Table.toDataFrameSchema(columnNameToAccessor: MutableMap<String, String> = mutableMapOf()): DataFrameSchema {
val columns = this::class.memberProperties
.filter { it.returnType.isSubtypeOf(typeOf<Column<*>>()) }
.associate { prop ->
prop as KProperty1<Table, Column<*>>

// retrieve the actual column name
val columnName = prop.get(this).name
// store the actual column name together with the accessor name in the map
columnNameToAccessor[columnName] = prop.name

// get the column type from `val a: Column<Type>`
val type = prop.returnType.arguments.first().type!!

columnName to ColumnSchema.Value(type)
}
return DataFrameSchemaImpl(columns)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package org.jetbrains.kotlinx.dataframe.examples.exposed

import org.jetbrains.exposed.v1.core.Column
import org.jetbrains.exposed.v1.core.StdOutSqlLogger
import org.jetbrains.exposed.v1.jdbc.Database
import org.jetbrains.exposed.v1.jdbc.SchemaUtils
import org.jetbrains.exposed.v1.jdbc.addLogger
import org.jetbrains.exposed.v1.jdbc.batchInsert
import org.jetbrains.exposed.v1.jdbc.deleteAll
import org.jetbrains.exposed.v1.jdbc.selectAll
import org.jetbrains.exposed.v1.jdbc.transactions.transaction
import org.jetbrains.kotlinx.dataframe.api.asSequence
import org.jetbrains.kotlinx.dataframe.api.count
import org.jetbrains.kotlinx.dataframe.api.describe
import org.jetbrains.kotlinx.dataframe.api.groupBy
import org.jetbrains.kotlinx.dataframe.api.print
import org.jetbrains.kotlinx.dataframe.api.sortByDesc
import org.jetbrains.kotlinx.dataframe.size
import java.io.File

/**
* Describes a simple bridge between [Exposed](https://www.jetbrains.com/exposed/) and DataFrame!
*/
fun main() {
// defining where to find our SQLite database for Exposed
val resourceDb = "chinook.db"
val dbPath = File(object {}.javaClass.classLoader.getResource(resourceDb)!!.toURI()).absolutePath
val db = Database.connect(url = "jdbc:sqlite:$dbPath", driver = "org.sqlite.JDBC")

// let's read the database!
val df = transaction(db) {
addLogger(StdOutSqlLogger)

// tables in Exposed need to be defined, see tables.kt
SchemaUtils.create(Customers, Artists, Albums)

// Perform the specific query you want to read into the DataFrame.
// Note: DataFrames are in-memory structures, so don't make it too large if you don't have the RAM ;)
val query = Customers.selectAll() // .where { Customers.company.isNotNull() }

// read and convert the query to a typed DataFrame
// see compatibilityLayer.kt for how we created convertToDataFrame<>()
// and see tables.kt for how we created CustomersDf!
query.convertToDataFrame<CustomersDf>()
}

println(df.size())

// now we have a DataFrame, we can perform DataFrame operations,
// like seeing how often a country is represented
df.groupBy { country }.count()
.sortByDesc { "count"<Int>() }
.print(columnTypes = true, borders = true)

// or just general statistics
df.describe()
.print(columnTypes = true, borders = true)

// or make plots using Kandy! It's all up to you

// writing a DataFrame back into an SQL database with Exposed can also be done!
transaction(db) {
addLogger(StdOutSqlLogger)

// first delete the original contents
Customers.deleteAll()

// batch insert our rows back into the SQL database
Customers.batchInsert(df.asSequence()) { dfRow ->
for (column in Customers.columns) {
this[column as Column<Any?>] = dfRow[column.name]
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package org.jetbrains.kotlinx.dataframe.examples.exposed

import org.jetbrains.exposed.v1.core.Column
import org.jetbrains.exposed.v1.core.Table
import org.jetbrains.kotlinx.dataframe.annotations.ColumnName
import org.jetbrains.kotlinx.dataframe.annotations.DataSchema
import org.jetbrains.kotlinx.dataframe.api.generateDataClasses
import org.jetbrains.kotlinx.dataframe.api.print
import org.jetbrains.kotlinx.dataframe.codeGen.NameNormalizer

object Albums : Table() {
val albumId: Column<Int> = integer("AlbumId").autoIncrement()
val title: Column<String> = varchar("Title", 160)
val artistId: Column<Int> = integer("ArtistId")

override val primaryKey = PrimaryKey(albumId)
}

object Artists : Table() {
val artistId: Column<Int> = integer("ArtistId").autoIncrement()
val name: Column<String> = varchar("Name", 120)

override val primaryKey = PrimaryKey(artistId)
}

object Customers : Table() {
val customerId: Column<Int> = integer("CustomerId").autoIncrement()
val firstName: Column<String> = varchar("FirstName", 40)
val lastName: Column<String> = varchar("LastName", 20)
val company: Column<String?> = varchar("Company", 80).nullable()
val address: Column<String?> = varchar("Address", 70).nullable()
val city: Column<String?> = varchar("City", 40).nullable()
val state: Column<String?> = varchar("State", 40).nullable()
val country: Column<String?> = varchar("Country", 40).nullable()
val postalCode: Column<String?> = varchar("PostalCode", 10).nullable()
val phone: Column<String?> = varchar("Phone", 24).nullable()
val fax: Column<String?> = varchar("Fax", 24).nullable()
val email: Column<String> = varchar("Email", 60)
val supportRepId: Column<Int?> = integer("SupportRepId").nullable()

override val primaryKey = PrimaryKey(customerId)
}

/**
* Exposed requires you to provide [Table] instances to
* provide type-safe access to your columns and data.
*
* While DataFrame can infer types at runtime, which is enough for Kotlin Notebook,
* to get type safe access at compile time, we need to define a [@DataSchema][DataSchema].
*
* This is what we created the [toDataFrameSchema] function for!
*/
fun main() {
val columnNameToAccessor = mutableMapOf<String, String>()
val schema = Customers.toDataFrameSchema(columnNameToAccessor)

// checking whether the schema is converted correctly.
// schema.print()

// printing a @DataSchema data class to copy-paste into the code.
// we use a NameNormalizer to let DataFrame generate the same accessors as in the Table
// while keeping the correct column names
schema.generateDataClasses(
name = "CustomersDf",
nameNormalizer = NameNormalizer { columnNameToAccessor[it] ?: it },
).print()
}

// created by Customers.toDataFrameSchema()
// The same can be done for the other tables
@DataSchema
data class CustomersDf(
@ColumnName("Address")
val address: String?,
@ColumnName("City")
val city: String?,
@ColumnName("Company")
val company: String?,
@ColumnName("Country")
val country: String?,
@ColumnName("CustomerId")
val customerId: Int,
@ColumnName("Email")
val email: String,
@ColumnName("Fax")
val fax: String?,
@ColumnName("FirstName")
val firstName: String,
@ColumnName("LastName")
val lastName: String,
@ColumnName("Phone")
val phone: String?,
@ColumnName("PostalCode")
val postalCode: String?,
@ColumnName("State")
val state: String?,
@ColumnName("SupportRepId")
val supportRepId: Int?,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
@file:Suppress("ktlint:standard:no-empty-file")

package org.jetbrains.kotlinx.dataframe.examples.kotlinSpark

/*
* See ../spark/compatibilityLayer.kt for the implementation.
* It's the same with- and without the Kotlin Spark API.
*/
Loading