Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ okeydoke = "2.0.3"
strikt = "0.34.1"
version-catalog-update = "1.0.1"
versions = "0.53.0"
testcontainers = "2.0.2"
postgres-driver = "42.7.7"
mariadb-driver = "3.5.7"

[libraries]
hamkrest = { module = "com.natpryce:hamkrest", version.ref = "hamkrest" }
Expand All @@ -40,6 +43,12 @@ kotlinx-serialization-json = { module = "org.jetbrains.kotlinx:kotlinx-serializa
okeydoke = { module = "com.oneeyedmen:okeydoke", version.ref = "okeydoke" }
strikt-core = { module = "io.strikt:strikt-core", version.ref = "strikt" }
strikt-jvm = { module = "io.strikt:strikt-jvm", version.ref = "strikt" }
testcontainers-bom = { module = "org.testcontainers:testcontainers-bom", version.ref = "testcontainers" }
testcontainers-junit5 = { module = "org.testcontainers:testcontainers-junit-jupiter", version.ref = "testcontainers" }
testcontainers-postgres = { module = "org.testcontainers:testcontainers-postgresql", version.ref = "testcontainers" }
testcontainers-mariadb = { module = "org.testcontainers:testcontainers-mariadb", version.ref = "testcontainers" }
postgres-driver = { module = "org.postgresql:postgresql", version.ref = "postgres-driver" }
mariadb-driver = { module = "org.mariadb.jdbc:mariadb-java-client", version.ref = "mariadb-driver" }

[bundles]
jmh = [
Expand All @@ -62,6 +71,21 @@ strikt = [
"strikt-jvm",
]

testcontainers = [
"testcontainers-bom",
"testcontainers-junit5"
]

testcontainers-postgres = [
"testcontainers-postgres",
"postgres-driver"
]

testcontainers-mariadb = [
"testcontainers-mariadb",
"mariadb-driver"
]

[plugins]
jmh = { id = "me.champeau.jmh", version.ref = "champeau-jmh" }
jmhreport = { id = "io.morethan.jmhreport", version.ref = "jmhreport" }
Expand Down
1 change: 1 addition & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,5 @@ include("ropes4k")
include("state4k")
include("time4k")
include("tuples4k")
include("tx4k")
include("values4k")
18 changes: 18 additions & 0 deletions tx4k/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import org.jetbrains.kotlin.gradle.tasks.KotlinJvmCompile

description = "ForkHandles Transactor library"

dependencies {
testImplementation(kotlin("test-junit5"))
testImplementation(libs.bundles.junit)
testImplementation(libs.bundles.testcontainers)
testImplementation(libs.bundles.testcontainers.postgres)
testImplementation(libs.bundles.testcontainers.mariadb)
}


tasks.withType<KotlinJvmCompile>().configureEach {
compilerOptions {
freeCompilerArgs.set(freeCompilerArgs.get() + "-Xinline-classes")
}
}
61 changes: 61 additions & 0 deletions tx4k/src/main/kotlin/dev/forkhandles/tx/Transactor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package dev.forkhandles.tx

import java.time.Duration
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
import kotlin.contracts.contract

abstract class Transactor<Resource,out API> {
abstract fun createResource(): Resource
abstract fun configureResource(resource: Resource)
abstract fun destroyResource(resource: Resource)

abstract fun createApi(resource: Resource): API

abstract fun startTransaction(resource: Resource)
abstract fun commitTransaction(resource: Resource)
abstract fun rollbackTransaction(resource: Resource)

abstract fun canRetry(e: Exception): Boolean
abstract fun retryBackoff(attempt: Int): Duration?

// Inline so that the `work` lambda can do an early return
@OptIn(ExperimentalContracts::class)
inline fun <Result> perform(work: (API) -> Result): Result {
contract {
callsInPlace(work, InvocationKind.AT_LEAST_ONCE)
}

val resource = createResource()
try {
configureResource(resource)
val api = createApi(resource)

var attempts = 0
while (true) try {
startTransaction(resource)
val result = work(api)
commitTransaction(resource)
return result
}
catch (e: Exception) {
rollbackTransaction(resource)
if (canRetry(e)) {
attempts++
when (val backoff = retryBackoff(attempts)) {
null -> throw SerialisabilityFailure(e)
else -> Thread.sleep(backoff)
}
} else {
throw e
}
}
} finally {
destroyResource(resource)
}
}
}

class SerialisabilityFailure(cause: Exception) : Exception(cause)

typealias Transactional<API> = Transactor<*, API>
49 changes: 49 additions & 0 deletions tx4k/src/main/kotlin/dev/forkhandles/tx/jdbc/JdbcTransactor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package dev.forkhandles.tx.jdbc

import dev.forkhandles.tx.RetryPolicy
import dev.forkhandles.tx.Transactor
import dev.forkhandles.tx.increasingBackoff
import dev.forkhandles.tx.withAdditiveJitter
import dev.forkhandles.tx.maxAttempts
import java.sql.Connection
import java.sql.SQLException
import java.time.Duration


class JdbcTransactor<out API>(
private val createConnection: () -> Connection,
private val createWrapper: (Connection) -> API,
private val retryPolicy: RetryPolicy =
increasingBackoff(Duration.ofMillis(50)).withAdditiveJitter().maxAttempts(5),
private val retryableFailurePolicy: (Exception) -> Boolean =
::jdbcStandardRetryability
) : Transactor<Connection, API>() {
override fun createResource(): Connection = createConnection()

override fun configureResource(resource: Connection) {
resource.autoCommit = false
resource.transactionIsolation = Connection.TRANSACTION_SERIALIZABLE
}

override fun destroyResource(resource: Connection) = resource.close()

override fun createApi(resource: Connection): API =
createWrapper(resource)

override fun startTransaction(resource: Connection) {
// Nothing required for JDBC
}

override fun rollbackTransaction(resource: Connection) = resource.rollback()
override fun commitTransaction(resource: Connection) = resource.commit()

override fun canRetry(e: Exception): Boolean =
retryableFailurePolicy(e)

override fun retryBackoff(attempt: Int): Duration? =
retryPolicy(attempt)
}


fun jdbcStandardRetryability(e: Exception): Boolean =
e is SQLException && (e.sqlState == "40001" || e.sqlState == "40P01")
46 changes: 46 additions & 0 deletions tx4k/src/main/kotlin/dev/forkhandles/tx/mem/InMemoryTransactor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
@file:OptIn(ExperimentalAtomicApi::class)

package dev.forkhandles.tx.mem

import dev.forkhandles.tx.RetryPolicy
import dev.forkhandles.tx.Transactor
import dev.forkhandles.tx.increasingBackoff
import dev.forkhandles.tx.withAdditiveJitter
import dev.forkhandles.tx.maxAttempts
import java.time.Duration
import kotlin.concurrent.atomics.AtomicReference
import kotlin.concurrent.atomics.ExperimentalAtomicApi
import kotlin.Unit as noop

class InMemoryTransaction<State>(val initialState: State) {
var state: State = initialState
}

class InMemoryTransactor<State, out API>(
initialState: State,
private val createRepository: (InMemoryTransaction<State>) -> API,
private val retryPolicy: RetryPolicy =
increasingBackoff(Duration.ofMillis(1)).withAdditiveJitter().maxAttempts(5)
) : Transactor<InMemoryTransaction<State>, API>() {
val state = AtomicReference(initialState)

override fun createResource() = InMemoryTransaction(state.load())
override fun configureResource(resource: InMemoryTransaction<State>) = noop
override fun destroyResource(resource: InMemoryTransaction<State>) = noop

override fun createApi(resource: InMemoryTransaction<State>) =
createRepository(resource)

override fun startTransaction(resource: InMemoryTransaction<State>) = noop
override fun rollbackTransaction(resource: InMemoryTransaction<State>) = noop
override fun commitTransaction(resource: InMemoryTransaction<State>) {
if (!state.compareAndSet(expectedValue = resource.initialState, newValue = resource.state)) {
throw RetryException()
}
}

override fun canRetry(e: Exception) = e is RetryException
override fun retryBackoff(attempt: Int) = retryPolicy(attempt)

internal class RetryException : Exception()
}
41 changes: 41 additions & 0 deletions tx4k/src/main/kotlin/dev/forkhandles/tx/retry.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package dev.forkhandles.tx

import java.time.Duration
import kotlin.random.Random

typealias RetryPolicy = (attempt: Int) -> Duration?

/**
* A very simple backoff strategy that always backs off the same amount of time.
* This is not suitable for production workloads.
*/
fun linearBackoff(backoff: Duration): RetryPolicy =
fun(attempt: Int) =
backoff

/**
* A backoff strategy that increases the retry delay with each attempt.
*/
fun increasingBackoff(step: Duration): RetryPolicy =
fun(attempt: Int) =
step.multipliedBy(attempt.toLong())

/**
* Limit the number of retries.
*/
fun RetryPolicy.maxAttempts(max: Int): RetryPolicy =
fun(attempt: Int): Duration? {
require(attempt > 0) { "attempt must be > 0" }
return if (attempt <= max) this(attempt) else null
}

/**
* Applies jitter to a retry policy.
*/
fun RetryPolicy.withAdditiveJitter(maxFraction: Double = 0.1, random: Random = Random.Default): RetryPolicy =
fun(attempt: Int): Duration? =
this(attempt)?.let { baseDelay ->
val jitterScale = maxFraction * random.nextDouble(-1.0, 1.0)
val jitter = baseDelay.toMillis() * jitterScale
baseDelay + Duration.ofMillis(jitter.toLong())
}
27 changes: 27 additions & 0 deletions tx4k/src/test/kotlin/dev/forkhandles/tx/BackoffJitterTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package dev.forkhandles.tx

import java.time.Duration
import kotlin.test.Test
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue

class BackoffJitterTest {
@Test
fun `jittered policy adds jitter to base delay`() {
val baseDelay = Duration.ofSeconds(1)
val jitteredPolicy = linearBackoff(baseDelay).withAdditiveJitter(0.125)

repeat(1000) {
val jitteredDelay = assertNotNull(jitteredPolicy(it))
assertTrue(jitteredDelay in (Duration.ofMillis(875) .. Duration.ofMillis(1125)))
}
}

@Test
fun `jittered policy signals end of retry`() {
val jitteredPolicy = { _ : Int -> null }.withAdditiveJitter(0.125)

assertNull(jitteredPolicy(1))
}
}
6 changes: 6 additions & 0 deletions tx4k/src/test/kotlin/dev/forkhandles/tx/Counter.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package dev.forkhandles.tx

interface Counter {
fun incrementBy(n: Int)
fun count(): Int
}
23 changes: 23 additions & 0 deletions tx4k/src/test/kotlin/dev/forkhandles/tx/IncreasingBackoffTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package dev.forkhandles.tx

import java.time.Duration
import kotlin.test.Test
import kotlin.test.assertEquals


class IncreasingBackoffTest {
@Test
fun `total backoff`() {
val policy = increasingBackoff(Duration.ofSeconds(1))

assertEquals(Duration.ofSeconds(1), totalBackoff(policy, 1), "attempt 1")
assertEquals(Duration.ofSeconds(3), totalBackoff(policy, 2), "attempt 2")
assertEquals(Duration.ofSeconds(6), totalBackoff(policy, 3), "attempt 3")
assertEquals(Duration.ofSeconds(10), totalBackoff(policy, 4), "attempt 4")
assertEquals(Duration.ofSeconds(15), totalBackoff(policy, 5), "attempt 5")
}

private fun totalBackoff(policy: RetryPolicy, maxAttempt: Int): Duration =
(1..maxAttempt)
.fold(Duration.ZERO) { acc, attempt -> acc + policy(attempt) }
}
Loading