Skip to content

Commit c6b51fb

Browse files
committed
Fixes request context lost in Infinispan cache get/getasync
1 parent e230153 commit c6b51fb

File tree

6 files changed

+133
-28
lines changed

6 files changed

+133
-28
lines changed

extensions/infinispan-cache/runtime/src/main/java/io/quarkus/cache/infinispan/runtime/InfinispanCacheImpl.java

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,11 @@
1111
import java.util.function.Function;
1212
import java.util.function.Predicate;
1313

14+
import org.eclipse.microprofile.context.ManagedExecutor;
15+
import org.eclipse.microprofile.context.ThreadContext;
1416
import org.infinispan.client.hotrod.RemoteCache;
1517
import org.infinispan.client.hotrod.impl.protocol.Codec27;
1618
import org.infinispan.commons.util.NullValue;
17-
import org.infinispan.commons.util.concurrent.CompletionStages;
1819
import org.reactivestreams.FlowAdapters;
1920

2021
import io.quarkus.arc.Arc;
@@ -36,6 +37,8 @@
3637
*/
3738
public class InfinispanCacheImpl extends AbstractCache implements Cache {
3839

40+
private final ThreadContext threadContext;
41+
private final ManagedExecutor managedExecutor;
3942
private final RemoteCache remoteCache;
4043
private final InfinispanCacheInfo cacheInfo;
4144
private final Map<Object, CompletableFuture> computationResults = new ConcurrentHashMap<>();
@@ -47,6 +50,8 @@ public InfinispanCacheImpl(InfinispanCacheInfo cacheInfo, RemoteCache remoteCach
4750
this.remoteCache = remoteCache;
4851
this.lifespan = cacheInfo.lifespan.map(l -> l.toMillis()).orElse(-1L);
4952
this.maxIdle = cacheInfo.maxIdle.map(m -> m.toMillis()).orElse(-1L);
53+
this.threadContext = Arc.container().select(ThreadContext.class).get();
54+
this.managedExecutor = Arc.container().select(ManagedExecutor.class).get();
5055
}
5156

5257
public InfinispanCacheImpl(InfinispanCacheInfo cacheInfo,
@@ -81,16 +86,11 @@ private <T> T decodeNull(Object value) {
8186

8287
@Override
8388
public <K, V> Uni<V> get(K key, Function<K, V> valueLoader) {
84-
return Uni.createFrom()
85-
.completionStage(() -> CompletionStages.handleAndCompose(remoteCache.getAsync(key), (v1, ex1) -> {
86-
if (ex1 != null) {
87-
return CompletableFuture.failedFuture(ex1);
88-
}
89-
89+
CompletableFuture<V> infinispanGet = threadContext.withContextCapture(remoteCache.getAsync(key))
90+
.thenApplyAsync(v1 -> {
9091
if (v1 != null) {
9192
return CompletableFuture.completedFuture(decodeNull(v1));
9293
}
93-
9494
CompletableFuture<V> resultAsync = new CompletableFuture<>();
9595
CompletableFuture<V> computedValue = computationResults.putIfAbsent(key, resultAsync);
9696
if (computedValue != null) {
@@ -111,18 +111,15 @@ public <K, V> Uni<V> get(K key, Function<K, V> valueLoader) {
111111
computationResults.remove(key);
112112
});
113113
return resultAsync;
114-
}));
114+
}, managedExecutor).thenCompose(f -> f);
115+
116+
return Uni.createFrom().completionStage(infinispanGet);
115117
}
116118

117119
@Override
118120
public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
119121
Context context = Vertx.currentContext();
120-
121-
return Uni.createFrom().completionStage(CompletionStages.handleAndCompose(remoteCache.getAsync(key), (v1, ex1) -> {
122-
if (ex1 != null) {
123-
return CompletableFuture.failedFuture(ex1);
124-
}
125-
122+
CompletableFuture<V> infinispanGet = threadContext.withContextCapture(remoteCache.getAsync(key)).thenApplyAsync(v1 -> {
126123
if (v1 != null) {
127124
return CompletableFuture.completedFuture(decodeNull(v1));
128125
}
@@ -152,7 +149,9 @@ public <K, V> Uni<V> getAsync(K key, Function<K, Uni<V>> valueLoader) {
152149
}
153150
});
154151
return resultAsync;
155-
})).emitOn(new Executor() {
152+
}, managedExecutor).thenCompose(f -> f);
153+
154+
return Uni.createFrom().completionStage(infinispanGet).emitOn(new Executor() {
156155
// We need make sure we go back to the original context when the cache value is computed.
157156
// Otherwise, we would always emit on the context having computed the value, which could
158157
// break the duplicated context isolation.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package io.quarkus.it.cache.infinispan;
2+
3+
import jakarta.inject.Inject;
4+
import jakarta.ws.rs.client.ClientRequestContext;
5+
import jakarta.ws.rs.ext.Provider;
6+
7+
@Provider
8+
public class ClientRequestFilter implements jakarta.ws.rs.client.ClientRequestFilter {
9+
10+
ClientRequestService requestService;
11+
12+
@Inject
13+
public ClientRequestFilter(ClientRequestService requestService) {
14+
this.requestService = requestService;
15+
}
16+
17+
@Override
18+
public void filter(ClientRequestContext requestContext) {
19+
if (requestService != null && requestService.data() != null) {
20+
requestContext.getHeaders().add("extra", requestService.data());
21+
}
22+
}
23+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.quarkus.it.cache.infinispan;
2+
3+
import jakarta.enterprise.context.RequestScoped;
4+
5+
@RequestScoped
6+
public class ClientRequestService {
7+
String data;
8+
9+
public String data() {
10+
return data;
11+
}
12+
13+
public ClientRequestService setData(String data) {
14+
this.data = data;
15+
return this;
16+
}
17+
}

integration-tests/infinispan-cache/src/main/java/io/quarkus/it/cache/infinispan/ExpensiveResource.java

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,39 +2,71 @@
22

33
import java.util.concurrent.atomic.AtomicInteger;
44

5+
import jakarta.inject.Inject;
6+
import jakarta.ws.rs.DELETE;
57
import jakarta.ws.rs.GET;
6-
import jakarta.ws.rs.POST;
78
import jakarta.ws.rs.Path;
89
import jakarta.ws.rs.PathParam;
910
import jakarta.ws.rs.QueryParam;
11+
import jakarta.ws.rs.core.Response;
1012

1113
import org.infinispan.protostream.GeneratedSchema;
1214
import org.infinispan.protostream.annotations.Proto;
1315
import org.infinispan.protostream.annotations.ProtoSchema;
1416

17+
import io.quarkus.cache.CacheInvalidate;
1518
import io.quarkus.cache.CacheInvalidateAll;
1619
import io.quarkus.cache.CacheKey;
1720
import io.quarkus.cache.CacheResult;
21+
import io.smallrye.mutiny.Uni;
1822

1923
@Path("/expensive-resource")
2024
public class ExpensiveResource {
2125

2226
private final AtomicInteger invocations = new AtomicInteger(0);
2327

28+
@Inject
29+
ClientRequestService requestService;
30+
2431
@GET
2532
@Path("/{keyElement1}/{keyElement2}/{keyElement3}")
2633
@CacheResult(cacheName = "expensiveResourceCache")
2734
public ExpensiveResponse getExpensiveResponse(@PathParam("keyElement1") @CacheKey String keyElement1,
28-
@PathParam("keyElement2") @CacheKey String keyElement2, @PathParam("keyElement3") @CacheKey String keyElement3,
35+
@PathParam("keyElement2") @CacheKey String keyElement2,
36+
@PathParam("keyElement3") @CacheKey String keyElement3,
2937
@QueryParam("foo") String foo) {
3038
invocations.incrementAndGet();
39+
requestService.setData("getExpensiveResponse " + foo);
3140
return new ExpensiveResponse(keyElement1 + " " + keyElement2 + " " + keyElement3 + " too!");
3241
}
3342

34-
@POST
43+
@GET
44+
@Path("/async/{keyElement1}/{keyElement2}/{keyElement3}")
45+
@CacheResult(cacheName = "expensiveResourceCache")
46+
public Uni<ExpensiveResponse> getExpensiveResponseAsync(@PathParam("keyElement1") @CacheKey String keyElement1,
47+
@PathParam("keyElement2") @CacheKey String keyElement2,
48+
@PathParam("keyElement3") @CacheKey String keyElement3,
49+
@QueryParam("foo") String foo) {
50+
invocations.incrementAndGet();
51+
requestService.setData("getExpensiveResponseAsync " + foo);
52+
return Uni.createFrom()
53+
.item(new ExpensiveResponse(keyElement1 + " " + keyElement2 + " " + keyElement3 + " async too!"));
54+
}
55+
56+
@DELETE
57+
@Path("/{keyElement1}/{keyElement2}/{keyElement3}")
58+
@CacheInvalidate(cacheName = "expensiveResourceCache")
59+
public Response resetExpensiveResponse(@PathParam("keyElement1") @CacheKey String keyElement1,
60+
@PathParam("keyElement2") @CacheKey String keyElement2, @PathParam("keyElement3") @CacheKey String keyElement3,
61+
@QueryParam("foo") String foo) {
62+
requestService.setData("invalidate");
63+
return Response.ok().build();
64+
}
65+
66+
@DELETE
3567
@CacheInvalidateAll(cacheName = "expensiveResourceCache")
3668
public void invalidateAll() {
37-
69+
requestService.setData("invalidateAll");
3870
}
3971

4072
@GET

integration-tests/infinispan-cache/src/main/java/io/quarkus/it/cache/infinispan/SunriseRestServerResource.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package io.quarkus.it.cache.infinispan;
22

3+
import java.util.concurrent.atomic.AtomicInteger;
4+
35
import jakarta.enterprise.context.ApplicationScoped;
6+
import jakarta.inject.Inject;
47
import jakarta.ws.rs.DELETE;
58
import jakarta.ws.rs.GET;
69
import jakarta.ws.rs.Path;
@@ -12,30 +15,37 @@
1215
@Path("sunrise")
1316
public class SunriseRestServerResource {
1417

15-
private int sunriseTimeInvocations;
18+
private final AtomicInteger invocations = new AtomicInteger(0);
19+
20+
@Inject
21+
ClientRequestService requestService;
1622

1723
@GET
1824
@Path("time/{city}")
1925
public String getSunriseTime(@RestPath String city, @RestQuery String date) {
20-
sunriseTimeInvocations++;
26+
invocations.incrementAndGet();
27+
requestService.setData(city);
2128
return "2020-12-20T10:15:30";
2229
}
2330

2431
@GET
2532
@Path("invocations")
2633
public Integer getSunriseTimeInvocations() {
27-
return sunriseTimeInvocations;
34+
requestService.setData("invocations");
35+
return invocations.get();
2836
}
2937

3038
@DELETE
3139
@Path("invalidate/{city}")
3240
public void invalidate(@RestPath String city, @RestQuery String notPartOfTheCacheKey, @RestQuery String date) {
3341
// Do nothing. We only need to test the caching annotation on the client side.
42+
requestService.setData("invalidate " + city);
3443
}
3544

3645
@DELETE
3746
@Path("invalidate")
3847
public void invalidateAll() {
3948
// Do nothing. We only need to test the caching annotation on the client side.
49+
requestService.setData("invalidate all");
4050
}
4151
}

integration-tests/infinispan-cache/src/test/java/io/quarkus/it/cache/infinispan/CacheTest.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,22 +12,46 @@ public class CacheTest {
1212

1313
@Test
1414
public void testCache() {
15-
runExpensiveRequest();
16-
runExpensiveRequest();
17-
runExpensiveRequest();
15+
when().get("/expensive-resource/invocations").then().statusCode(200).body(is("0"));
16+
runGetExpensiveRequest();
17+
runGetExpensiveRequest();
18+
runGetExpensiveRequest();
1819
when().get("/expensive-resource/invocations").then().statusCode(200).body(is("1"));
1920

21+
runDeleteExpensiveRequest();
22+
23+
runGetExpensiveRequestAsync();
24+
runGetExpensiveRequestAsync();
25+
runGetExpensiveRequestAsync();
26+
27+
when().get("/expensive-resource/invocations").then().statusCode(200).body(is("2"));
28+
2029
when()
21-
.post("/expensive-resource")
30+
.delete("/expensive-resource")
2231
.then()
2332
.statusCode(204);
2433
}
2534

26-
private void runExpensiveRequest() {
35+
private void runGetExpensiveRequest() {
2736
when()
2837
.get("/expensive-resource/I/love/Quarkus?foo=bar")
2938
.then()
3039
.statusCode(200)
3140
.body("result", is("I love Quarkus too!"));
3241
}
42+
43+
private void runGetExpensiveRequestAsync() {
44+
when()
45+
.get("/expensive-resource/async/I/love/Quarkus?foo=bar")
46+
.then()
47+
.statusCode(200)
48+
.body("result", is("I love Quarkus async too!"));
49+
}
50+
51+
private void runDeleteExpensiveRequest() {
52+
when()
53+
.delete("/expensive-resource/I/love/Quarkus?foo=bar")
54+
.then()
55+
.statusCode(200);
56+
}
3357
}

0 commit comments

Comments
 (0)