Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Repository Guidelines

## Project Structure & Module Organization
- Core libraries live in module directories such as `core`, `core-reactor`, and `core-kotlin-coroutine`; each follows the Gradle layout `src/main` and `src/test`.
- Spring adapters sit under `core-spring*` modules, while runnable samples are in `req-shield-*example` projects.
- Shared utilities and constants are centralized in `support`. Generated build outputs stay under each module's `build/` folder.

## Build, Test, and Development Commands
- `./gradlew build` compiles all modules, runs unit tests, and assembles artifacts; pass `--parallel` for faster local feedback.
- `./gradlew test` executes Kotlin/JVM unit tests across every enabled module.
- `./gradlew ktlintCheck` enforces the project's formatting contract before you open a PR.
- Use `./gradlew :req-shield-spring-boot3-example:bootRun` (or another sample module) to manually exercise integration paths.

## Coding Style & Naming Conventions
- Kotlin sources use 4-space indentation, `UpperCamelCase` for types, and `lowerCamelCase` for functions and properties.
- Keep package names lowercase and aligned with module boundaries (e.g., `com.linecorp.reqshield.core`).
- Always add the Apache 2.0 copyright header shown in `CONTRIBUTING.md` to new files.
- Prefer early-return patterns and meaningful exception messages; align with the `ErrorCode` enums already defined.

## Testing Guidelines
- Write tests with JUnit 5 (`org.junit.jupiter`) and place them under `src/test/kotlin` mirroring the `src/main` package.
- Use descriptive method names such as `shouldCollapseConcurrentRequests()` and cover both success and failure paths.
- When adding integration behaviour, extend the corresponding example module and run `./gradlew test` before submission.

## Commit & Pull Request Guidelines
- Follow the repository's history of concise, imperative commits (e.g., `Add cache invalidation helper`).
- Reference related issues in the body, summarise motivation, modifications, and results, and include screenshots/logs for behaviour changes.
- Verify CLS (Contributor License Agreement) status, ensure CI passes locally, and request review from a maintainer familiar with your module.
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,35 @@ A lib that regulates the cache-based requests an application receives in terms o
`implementation("com.linecorp.cse.reqshield:core-spring-webflux:{version}")`<br>
`implementation("com.linecorp.cse.reqshield:core-spring-webflux-kotlin-coroutine:{version}")`<br>

## Testing & Integration Tips

### Integration tests with Redis (Testcontainers)

- Some modules provide optional Redis-backed integration tests leveraging Testcontainers.
- Enable them via an environment variable to avoid Docker dependency by default:
- macOS/Linux: `RUN_REDIS_IT=true ./gradlew :core-spring-webflux:test :core-spring-webflux-kotlin-coroutine:test`
- Windows (PowerShell): `$env:RUN_REDIS_IT='true'; ./gradlew :core-spring-webflux:test :core-spring-webflux-kotlin-coroutine:test`
- If Docker is not available, these tests stay skipped. In-memory integration tests still validate core semantics (request collapsing and eviction).

### WebFlux null handling

- `@ReqShieldCacheable(nullHandling = ...)` controls how `null` values are emitted in WebFlux:
- `EMIT_EMPTY` (default): map `null` to `Mono.empty()`.
- `ERROR`: throw an `IllegalStateException` if a `null` value is produced.

### Global lock guidance

- When `isLocalLock = false`, you must provide real global lock/unlock implementations.
- Recommended approach with Redis:
- Lock: `SETNX lock:{key} 1` + `PEXPIRE lock:{key} {ttlMillis}`
- Unlock: `DEL lock:{key}`
- The provided defaults return `true` and are only suitable for local/dev usage.

### Reactor Scheduler tuning

- Reactor-based modules accept a `Scheduler` (e.g., `boundedElastic`) through configuration.
- Spring WebFlux adapter exposes a `reqShieldScheduler` bean you can override for tuning thread usage.

## Contributing

Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,35 +32,49 @@ import kotlin.coroutines.CoroutineContext
private val log = LoggerFactory.getLogger(KeyLocalLock::class.java)

class KeyLocalLock(private val lockTimeoutMillis: Long) : KeyLock, CoroutineScope {
private data class LockInfo(val semaphore: Semaphore, val createdAt: Long)
private data class LockInfo(val semaphore: Semaphore, val expiresAt: Long)

private val lockMap = ConcurrentHashMap<String, LockInfo>()
companion object {
private val lockMap = ConcurrentHashMap<String, LockInfo>()

@Volatile
private var monitorJob: Job? = null

private fun ensureMonitorStarted() {
if (monitorJob?.isActive == true) return
synchronized(this) {
if (monitorJob?.isActive == true) return
monitorJob =
CoroutineScope(Dispatchers.IO).launch {
while (isActive) {
runCatching {
val now = System.currentTimeMillis()
lockMap.entries.removeIf { now > it.value.expiresAt }
delay(LOCK_MONITOR_INTERVAL_MILLIS)
}.onFailure { e ->
log.error("Error in lock lifecycle monitoring : {}", e.message)
}
}
}
}
}
}

private val job = Job()
override val coroutineContext: CoroutineContext
get() = Dispatchers.IO + job

init {
launch {
while (isActive) {
runCatching {
val now = System.currentTimeMillis()
lockMap.entries.removeIf { now - it.value.createdAt > lockTimeoutMillis } // 특정 시간이 지나면 lock 여부와 상관없이 map에서 삭제한다.
delay(LOCK_MONITOR_INTERVAL_MILLIS)
}.onFailure { e ->
log.error("Error in lock lifecycle monitoring : {}", e.message)
}
}
}
ensureMonitorStarted()
}

override suspend fun tryLock(
key: String,
lockType: LockType,
): Boolean {
val completeKey = "${key}_${lockType.name}"
val lockInfo = lockMap.computeIfAbsent(completeKey) { LockInfo(Semaphore(1), nowToEpochTime()) }

val now = nowToEpochTime()
val lockInfo = lockMap.computeIfAbsent(completeKey) { LockInfo(Semaphore(1), now + lockTimeoutMillis) }
return lockInfo.semaphore.tryAcquire()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.Ignore

@EnabledIfEnvironmentVariable(named = "RUN_REDIS_IT", matches = "true")
class KeyGlobalLockTest :
AbstractRedisTest(),
BaseKeyLockTest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,36 @@ import org.junit.jupiter.api.Test
import java.util.concurrent.atomic.AtomicInteger

class KeyLocalLockTest : BaseKeyLockTest {
@Test
fun `should share global lockMap across multiple instances`() =
runBlocking {
val instance1 = KeyLocalLock(lockTimeoutMillis)
val instance2 = KeyLocalLock(lockTimeoutMillis)
val key = "shared-key"
val lockType = LockType.CREATE

assertTrue(instance1.tryLock(key, lockType))
assertTrue(!instance2.tryLock(key, lockType))

instance1.unLock(key, lockType)
}

@Test
fun `should maintain request collapsing across multiple instances`() =
runBlocking {
val instance1 = KeyLocalLock(lockTimeoutMillis)
val instance2 = KeyLocalLock(lockTimeoutMillis)
val instance3 = KeyLocalLock(lockTimeoutMillis)
val key = "collapsing-key"
val lockType = LockType.CREATE

val acquired = listOf(instance1, instance2, instance3).map { it.tryLock(key, lockType) }.count { it }
assertEquals(1, acquired)

// cleanup whoever acquired
listOf(instance1, instance2, instance3).forEach { it.unLock(key, lockType) }
}

@Test
override fun testConcurrencyWithOneKey() =
runBlocking {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,34 @@ class KeyLocalLock(
) : KeyLock {
private data class LockInfo(
val semaphore: Semaphore,
val createdAt: Long,
val expiresAt: Long,
)

private val lockMap = ConcurrentHashMap<String, LockInfo>()
companion object {
private val lockMap = ConcurrentHashMap<String, LockInfo>()

@Volatile
private var monitoringStarted: Boolean = false

private fun startMonitoringOnce() {
if (monitoringStarted) return
synchronized(this) {
if (monitoringStarted) return
Flux
.interval(Duration.ofMillis(LOCK_MONITOR_INTERVAL_MILLIS), Schedulers.single())
.doOnNext {
val now = System.currentTimeMillis()
lockMap.entries.removeIf { now > it.value.expiresAt }
}.doOnError { e ->
log.error("Error in lock lifecycle monitoring : {}", e.message)
}.subscribe()
monitoringStarted = true
}
}
}

init {
Flux
.interval(Duration.ofMillis(LOCK_MONITOR_INTERVAL_MILLIS), Schedulers.single())
.doOnNext {
val now = System.currentTimeMillis()
lockMap.entries.removeIf { now - it.value.createdAt > lockTimeoutMillis }
}.doOnError { e ->
log.error("Error in lock lifecycle monitoring : {}", e.message)
}.subscribe()
startMonitoringOnce()
}

override fun tryLock(
Expand All @@ -55,7 +69,11 @@ class KeyLocalLock(
): Mono<Boolean> =
Mono.fromCallable {
val completeKey = "${key}_${lockType.name}"
val lockInfo = lockMap.computeIfAbsent(completeKey) { LockInfo(Semaphore(1), nowToEpochTime()) }
val now = nowToEpochTime()
val lockInfo =
lockMap.computeIfAbsent(completeKey) {
LockInfo(Semaphore(1), now + lockTimeoutMillis)
}
lockInfo.semaphore.tryAcquire()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import com.linecorp.cse.reqshield.support.model.ReqShieldData
import com.linecorp.cse.reqshield.support.utils.decideToUpdateCache
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.util.retry.Retry
import java.time.Duration
import java.util.concurrent.Callable
Expand Down Expand Up @@ -73,13 +72,13 @@ class ReqShield<T>(
fun processMono(): Mono<ReqShieldData<T>> =
executeCallable({ callable.call() }, true, key, lockType)
.map { data -> buildReqShieldData(data, timeToLiveMillis) }
.doOnNext { reqShieldData ->
.flatMap { reqShieldData ->
setReqShieldData(
reqShieldConfig.setCacheFunction,
key,
reqShieldData,
lockType,
)
).thenReturn(reqShieldData)
}.switchIfEmpty(
Mono.defer {
val reqShieldData = buildReqShieldData(null, timeToLiveMillis)
Expand All @@ -88,21 +87,20 @@ class ReqShield<T>(
key,
reqShieldData,
lockType,
)
Mono.just(reqShieldData)
).thenReturn(reqShieldData)
},
)

if (reqShieldConfig.reqShieldWorkMode == ReqShieldWorkMode.ONLY_CREATE_CACHE) {
processMono()
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(reqShieldConfig.scheduler)
.subscribe()
} else {
reqShieldConfig.keyLock
.tryLock(key, lockType)
.filter { it }
.flatMap { processMono() }
.subscribeOn(Schedulers.boundedElastic())
.subscribeOn(reqShieldConfig.scheduler)
.subscribe()
}
}
Expand Down Expand Up @@ -138,15 +136,12 @@ class ReqShield<T>(
executeCallable({ callable.call() }, true, key, lockType)
.map { data -> buildReqShieldData(data, timeToLiveMillis) }
.flatMap { reqShieldData ->

setReqShieldData(
reqShieldConfig.setCacheFunction,
key,
reqShieldData,
lockType,
)

Mono.just(reqShieldData)
).thenReturn(reqShieldData)
}.switchIfEmpty(
Mono.defer {
val reqShieldData = buildReqShieldData(null, timeToLiveMillis)
Expand All @@ -156,9 +151,7 @@ class ReqShield<T>(
key,
reqShieldData,
lockType,
)

Mono.just(reqShieldData)
).thenReturn(reqShieldData)
},
)

Expand Down Expand Up @@ -191,7 +184,7 @@ class ReqShield<T>(
Mono.just(reqShieldData)
},
),
).subscribeOn(Schedulers.boundedElastic())
).subscribeOn(reqShieldConfig.scheduler)

private fun buildReqShieldData(
value: T?,
Expand All @@ -207,18 +200,14 @@ class ReqShield<T>(
key: String,
reqShieldData: ReqShieldData<T>,
lockType: LockType,
) {
executeSetCacheFunction(cacheSetter, key, reqShieldData, lockType).subscribe()
}
): Mono<Boolean> = executeSetCacheFunction(cacheSetter, key, reqShieldData, lockType)

private fun executeGetCacheFunction(
getFunction: (String) -> Mono<ReqShieldData<T>?>,
key: String,
): Mono<ReqShieldData<T>?> =
getFunction(key)
.doOnError { e ->
throw ClientException(ErrorCode.GET_CACHE_ERROR, originErrorMessage = e.message)
}
.onErrorMap { e -> ClientException(ErrorCode.GET_CACHE_ERROR, originErrorMessage = e.message) }

private fun executeSetCacheFunction(
setFunction: (String, ReqShieldData<T>, Long) -> Mono<Boolean>,
Expand All @@ -227,9 +216,8 @@ class ReqShield<T>(
lockType: LockType,
): Mono<Boolean> =
setFunction(key, value, value.timeToLiveMillis)
.doOnError { e ->
throw ClientException(ErrorCode.SET_CACHE_ERROR, originErrorMessage = e.message)
}.doFinally {
.onErrorMap { e -> ClientException(ErrorCode.SET_CACHE_ERROR, originErrorMessage = e.message) }
.doFinally {
if (shouldAttemptUnlock(lockType)) {
reqShieldConfig.keyLock
.unLock(key, lockType)
Expand All @@ -240,7 +228,7 @@ class ReqShield<T>(
),
).subscribe()
}
}.subscribeOn(Schedulers.boundedElastic())
}.subscribeOn(reqShieldConfig.scheduler)

private fun executeCallable(
callable: Callable<Mono<T?>>,
Expand All @@ -254,7 +242,9 @@ class ReqShield<T>(
if (isUnlockWhenException && key != null && lockType != null) {
reqShieldConfig.keyLock.unLock(key, lockType).subscribe()
}
throw ClientException(ErrorCode.SUPPLIER_ERROR, originErrorMessage = e.message)
}
.onErrorMap { e ->
ClientException(ErrorCode.SUPPLIER_ERROR, originErrorMessage = e.message)
}

private fun shouldAttemptUnlock(lockType: LockType): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,15 @@ import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Assertions.assertTrue
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
import reactor.test.StepVerifier
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger
import kotlin.test.Ignore

@EnabledIfEnvironmentVariable(named = "RUN_REDIS_IT", matches = "true")
class KeyGlobalLockTest :
AbstractRedisTest(),
BaseKeyLockTest {
Expand Down
Loading
Loading