Skip to content
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
b8194f5
Initial plan
Copilot Mar 17, 2026
bc3c983
feat: add next-page prefetch cache for paginated SELECT queries
Copilot Mar 17, 2026
eb9ac07
refactor: improve thread naming and reduce duplication in executeQuer…
Copilot Mar 17, 2026
563e3f8
feat: materialise CLOB/NCLOB return columns in prefetch cache as String
Copilot Mar 17, 2026
77195b9
fix: datasource-isolated cache keys + background cleanup job for next…
Copilot Mar 17, 2026
268cdaa
fix: single shared static cleanup executor guarantees one background …
Copilot Mar 17, 2026
e953c30
feat(test): Postgres pagination cache integration test with BYTEA LOB…
Copilot Mar 17, 2026
73a4d18
fix(paging): use virtual thread for CLEANUP_EXECUTOR thread factory
Copilot Mar 17, 2026
a2a9726
feat(paging): per-datasource prefetchWaitTimeoutMs configuration
Copilot Mar 17, 2026
1d3d878
fix(jdbc): skip remote close() when ResultSet has no server-side UUID…
Copilot Mar 17, 2026
59ca642
fix(paging): fix Sonar issues — resource leaks, dead code, unused imp…
Copilot Mar 17, 2026
8c71a39
fix(paging): fix remaining Sonar issues — duplicate conditions, regex…
Copilot Mar 17, 2026
0293f8f
test: parameterize detection tests, add assertion, fix duplicate @Aft…
Copilot Mar 17, 2026
fab19cd
test: address review comments on test classes (round 2)
Copilot Mar 17, 2026
76f8de7
docs: add prefetch cache feature docs and refactor buildNextPageSql t…
Copilot Mar 17, 2026
27a02cd
feat: add per-datasource cache enabled flag with tests and docs update
Copilot Mar 18, 2026
37a84bf
refactor: per-datasource cache enabled is client-side property (ojp.n…
Copilot Mar 18, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ public class ServerConfiguration {
private static final String TELEMETRY_GRPC_METRICS_ENABLED_KEY = "ojp.telemetry.grpc.metrics.enabled";
private static final String TELEMETRY_POOL_METRICS_ENABLED_KEY = "ojp.telemetry.pool.metrics.enabled";

// Next-page prefetch cache configuration keys
private static final String NEXT_PAGE_CACHE_ENABLED_KEY = "ojp.server.nextPageCache.enabled";
private static final String NEXT_PAGE_CACHE_TTL_SECONDS_KEY = "ojp.server.nextPageCache.ttlSeconds";
private static final String NEXT_PAGE_CACHE_MAX_ENTRIES_KEY = "ojp.server.nextPageCache.maxEntries";
private static final String NEXT_PAGE_CACHE_PREFETCH_WAIT_TIMEOUT_MS_KEY = "ojp.server.nextPageCache.prefetchWaitTimeoutMs";
private static final String NEXT_PAGE_CACHE_CLEANUP_INTERVAL_SECONDS_KEY = "ojp.server.nextPageCache.cleanupIntervalSeconds";

// TLS configuration keys
private static final String TLS_ENABLED_KEY = "ojp.server.tls.enabled";
private static final String TLS_KEYSTORE_PATH_KEY = "ojp.server.tls.keystore.path";
Expand Down Expand Up @@ -135,6 +142,13 @@ public class ServerConfiguration {
public static final boolean DEFAULT_TELEMETRY_GRPC_METRICS_ENABLED = true; // Enabled by default when OpenTelemetry is enabled
public static final boolean DEFAULT_TELEMETRY_POOL_METRICS_ENABLED = true; // Enabled by default when OpenTelemetry is enabled

// Next-page prefetch cache default values
public static final boolean DEFAULT_NEXT_PAGE_CACHE_ENABLED = false; // Disabled by default, opt-in
public static final long DEFAULT_NEXT_PAGE_CACHE_TTL_SECONDS = 60; // 1 minute
public static final int DEFAULT_NEXT_PAGE_CACHE_MAX_ENTRIES = 100;
public static final long DEFAULT_NEXT_PAGE_CACHE_PREFETCH_WAIT_TIMEOUT_MS = 5000; // 5 seconds
public static final long DEFAULT_NEXT_PAGE_CACHE_CLEANUP_INTERVAL_SECONDS = 60; // 1 minute

// TLS default values
public static final boolean DEFAULT_TLS_ENABLED = false; // Disabled by default for backwards compatibility
public static final boolean DEFAULT_TLS_CLIENT_AUTH_REQUIRED = false; // mTLS disabled by default
Expand Down Expand Up @@ -211,6 +225,13 @@ public class ServerConfiguration {
private final boolean tlsClientAuthRequired;


// Next-page prefetch cache configuration
private final boolean nextPageCacheEnabled;
private final long nextPageCacheTtlSeconds;
private final int nextPageCacheMaxEntries;
private final long nextPageCachePrefetchWaitTimeoutMs;
private final long nextPageCacheCleanupIntervalSeconds;

public ServerConfiguration() {
this.serverPort = getIntProperty(SERVER_PORT_KEY, DEFAULT_SERVER_PORT);
this.prometheusPort = getIntProperty(PROMETHEUS_PORT_KEY, DEFAULT_PROMETHEUS_PORT);
Expand Down Expand Up @@ -274,6 +295,13 @@ public ServerConfiguration() {
this.telemetryGrpcMetricsEnabled = getBooleanProperty(TELEMETRY_GRPC_METRICS_ENABLED_KEY, DEFAULT_TELEMETRY_GRPC_METRICS_ENABLED);
this.telemetryPoolMetricsEnabled = getBooleanProperty(TELEMETRY_POOL_METRICS_ENABLED_KEY, DEFAULT_TELEMETRY_POOL_METRICS_ENABLED);

// Next-page prefetch cache configuration
this.nextPageCacheEnabled = getBooleanProperty(NEXT_PAGE_CACHE_ENABLED_KEY, DEFAULT_NEXT_PAGE_CACHE_ENABLED);
this.nextPageCacheTtlSeconds = getLongProperty(NEXT_PAGE_CACHE_TTL_SECONDS_KEY, DEFAULT_NEXT_PAGE_CACHE_TTL_SECONDS);
this.nextPageCacheMaxEntries = getIntProperty(NEXT_PAGE_CACHE_MAX_ENTRIES_KEY, DEFAULT_NEXT_PAGE_CACHE_MAX_ENTRIES);
this.nextPageCachePrefetchWaitTimeoutMs = getLongProperty(NEXT_PAGE_CACHE_PREFETCH_WAIT_TIMEOUT_MS_KEY, DEFAULT_NEXT_PAGE_CACHE_PREFETCH_WAIT_TIMEOUT_MS);
this.nextPageCacheCleanupIntervalSeconds = getLongProperty(NEXT_PAGE_CACHE_CLEANUP_INTERVAL_SECONDS_KEY, DEFAULT_NEXT_PAGE_CACHE_CLEANUP_INTERVAL_SECONDS);

logConfigurationSummary();
}

Expand Down Expand Up @@ -416,6 +444,14 @@ private void logConfigurationSummary() {
logger.info(" Tracing Service Name: {}", tracingServiceName);
logger.info(" Tracing Sample Rate: {}", tracingSampleRate);
}
logger.info("Next-Page Prefetch Cache Configuration:");
logger.info(" Next-Page Cache Enabled: {}", nextPageCacheEnabled);
if (nextPageCacheEnabled) {
logger.info(" Next-Page Cache TTL: {} seconds", nextPageCacheTtlSeconds);
logger.info(" Next-Page Cache Max Entries: {}", nextPageCacheMaxEntries);
logger.info(" Next-Page Cache Prefetch Wait Timeout: {} ms", nextPageCachePrefetchWaitTimeoutMs);
logger.info(" Next-Page Cache Cleanup Interval: {} seconds", nextPageCacheCleanupIntervalSeconds);
}
}

/**
Expand Down Expand Up @@ -641,5 +677,25 @@ public boolean isTelemetryGrpcMetricsEnabled() {
public boolean isTelemetryPoolMetricsEnabled() {
return telemetryPoolMetricsEnabled;
}


public boolean isNextPageCacheEnabled() {
return nextPageCacheEnabled;
}

public long getNextPageCacheTtlSeconds() {
return nextPageCacheTtlSeconds;
}

public int getNextPageCacheMaxEntries() {
return nextPageCacheMaxEntries;
}

public long getNextPageCachePrefetchWaitTimeoutMs() {
return nextPageCachePrefetchWaitTimeoutMs;
}

public long getNextPageCacheCleanupIntervalSeconds() {
return nextPageCacheCleanupIntervalSeconds;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.openjproxy.constants.CommonConstants;
import org.openjproxy.grpc.ProtoConverter;
import org.openjproxy.grpc.dto.OpQueryResult;
import org.openjproxy.grpc.dto.Parameter;
import org.openjproxy.grpc.server.action.resource.CallResourceAction;
import org.openjproxy.grpc.server.action.session.ResultSetHelper;
import org.openjproxy.grpc.server.action.session.TerminateSessionAction;
import org.openjproxy.grpc.server.action.transaction.CommitTransactionAction;
import org.openjproxy.grpc.server.action.transaction.RollbackTransactionAction;
Expand All @@ -35,6 +38,11 @@
import org.openjproxy.grpc.server.action.xa.XaRecoverAction;
import org.openjproxy.grpc.server.action.xa.XaRollbackAction;
import org.openjproxy.grpc.server.action.xa.XaStartAction;
import org.openjproxy.grpc.server.paging.CachedPage;
import org.openjproxy.grpc.server.paging.NextPagePrefetchCache;
import org.openjproxy.grpc.server.paging.PageInfo;
import org.openjproxy.grpc.server.paging.PaginationDetector;
import org.openjproxy.grpc.server.resultset.ResultSetWrapper;
import org.openjproxy.grpc.server.statement.StatementFactory;
import org.openjproxy.xa.pool.XATransactionRegistry;
import org.openjproxy.xa.pool.spi.XAConnectionPoolProvider;
Expand All @@ -47,6 +55,7 @@
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
Expand All @@ -72,6 +81,9 @@ public class StatementServiceImpl extends StatementServiceGrpc.StatementServiceI
// SQL Enhancer Engine for query optimization
private final org.openjproxy.grpc.server.sql.SqlEnhancerEngine sqlEnhancerEngine;

// Next-page prefetch cache for paginated queries (disabled by default)
private final NextPagePrefetchCache nextPagePrefetchCache;

// Multinode XA coordinator for distributing transaction limits
private static final MultinodeXaCoordinator xaCoordinator = new MultinodeXaCoordinator();

Expand All @@ -90,6 +102,13 @@ public StatementServiceImpl(SessionManager sessionManager, CircuitBreakerRegistr
// Server configuration for creating segregation managers
this.sqlEnhancerEngine = new org.openjproxy.grpc.server.sql.SqlEnhancerEngine(
serverConfiguration.isSqlEnhancerEnabled());
// Next-page prefetch cache (disabled by default)
this.nextPagePrefetchCache = new NextPagePrefetchCache(
serverConfiguration.isNextPageCacheEnabled(),
serverConfiguration.getNextPageCacheMaxEntries(),
serverConfiguration.getNextPageCacheTtlSeconds(),
serverConfiguration.getNextPageCachePrefetchWaitTimeoutMs(),
serverConfiguration.getNextPageCacheCleanupIntervalSeconds());
initializeXAPoolProvider();

// Create SQL statement metrics from the registered OpenTelemetry instance (if available)
Expand Down Expand Up @@ -287,16 +306,97 @@ private void executeQueryInternal(StatementRequest request, StreamObserver<OpRes
}

List<Parameter> params = ProtoConverter.fromProtoList(request.getParametersList());

// ---- Next-page prefetch cache ----
if (nextPagePrefetchCache.isEnabled()) {
String connHash = dto.getSession().getConnHash();
Optional<CachedPage> cached = nextPagePrefetchCache.getIfReady(connHash, sql);
if (cached.isPresent()) {
CachedPage page = cached.get();
// Start prefetch for the page after this one before returning the cached result
startNextPagePrefetch(sql, params, connHash);
streamCachedPage(page, dto.getSession(), responseObserver);
return;
}
}
// ---- End next-page prefetch cache check ----

String resultSetUUID;
if (CollectionUtils.isNotEmpty(params)) {
PreparedStatement ps = StatementFactory.createPreparedStatement(sessionManager, dto, sql, params, request);
String resultSetUUID = this.sessionManager.registerResultSet(dto.getSession(), ps.executeQuery());
handleResultSet(actionContext, dto.getSession(), resultSetUUID, responseObserver);
resultSetUUID = this.sessionManager.registerResultSet(dto.getSession(), ps.executeQuery());
} else {
Statement stmt = StatementFactory.createStatement(sessionManager, dto.getConnection(), request);
String resultSetUUID = this.sessionManager.registerResultSet(dto.getSession(),
stmt.executeQuery(sql));
handleResultSet(actionContext, dto.getSession(), resultSetUUID, responseObserver);
resultSetUUID = this.sessionManager.registerResultSet(dto.getSession(), stmt.executeQuery(sql));
}
// Start prefetch for the next page while the current page is being streamed
startNextPagePrefetch(sql, params, dto.getSession().getConnHash());
handleResultSet(actionContext, dto.getSession(), resultSetUUID, responseObserver);
}

/**
* Starts an asynchronous prefetch of the next page for the given SQL, if the feature
* is enabled and the SQL contains a recognised pagination clause.
*
* @param sql the current paginated SQL
* @param params the query parameters (used as-is for the next-page query)
* @param connHash the connection hash used to look up the DataSource
*/
private void startNextPagePrefetch(String sql, List<Parameter> params, String connHash) {
if (!nextPagePrefetchCache.isEnabled()) {
return;
}
Optional<PageInfo> pageInfo = PaginationDetector.detect(sql);
if (pageInfo.isEmpty()) {
return;
}
String nextPageSql = PaginationDetector.buildNextPageSql(sql, pageInfo.get());
if (nextPageSql == null) {
return;
}
DataSource dataSource = datasourceMap.get(connHash);
if (dataSource == null) {
log.debug("No DataSource found for prefetch, connHash={}", connHash);
return;
}
nextPagePrefetchCache.prefetchAsync(dataSource, connHash, nextPageSql, params);
}

/**
* Streams the rows held in a {@link CachedPage} directly to the gRPC response observer,
* using the same chunking strategy as {@link ResultSetHelper#handleResultSet}.
*
* @param page the cached page to stream
* @param session the current session info (embedded in each response message)
* @param responseObserver the gRPC observer to stream results into
*/
private static void streamCachedPage(CachedPage page, SessionInfo session,
StreamObserver<OpResult> responseObserver) {
OpQueryResult.OpQueryResultBuilder queryResultBuilder = OpQueryResult.builder();
queryResultBuilder.labels(page.getColumnLabels());

List<Object[]> batch = new ArrayList<>();
int row = 0;
boolean justSent = false;

for (Object[] rowValues : page.getRows()) {
justSent = false;
row++;
batch.add(rowValues);
if (row % CommonConstants.ROWS_PER_RESULT_SET_DATA_BLOCK == 0) {
justSent = true;
responseObserver.onNext(ResultSetWrapper.wrapResults(session, batch,
queryResultBuilder, null, ""));
queryResultBuilder = OpQueryResult.builder();
batch = new ArrayList<>();
}
}

if (!justSent) {
responseObserver.onNext(ResultSetWrapper.wrapResults(session, batch,
queryResultBuilder, null, ""));
}
responseObserver.onCompleted();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.openjproxy.grpc.server.paging;

import java.util.Collections;
import java.util.List;

/**
* Holds a single cached page of query results.
*
* <p>Instances are immutable once created. The {@link #isExpired(long)} method
* can be used to check whether the entry has exceeded its time-to-live.</p>
*/
public class CachedPage {

private final List<String> columnLabels;
private final List<Object[]> rows;
private final long createdAtMs;

/**
* @param columnLabels ordered list of column names from the result set metadata
* @param rows result rows; each element is an array of column values
*/
public CachedPage(List<String> columnLabels, List<Object[]> rows) {
this.columnLabels = Collections.unmodifiableList(columnLabels);
this.rows = Collections.unmodifiableList(rows);
this.createdAtMs = System.currentTimeMillis();
}

/**
* Returns the ordered list of column labels for this result set.
*/
public List<String> getColumnLabels() {
return columnLabels;
}

/**
* Returns the cached rows. Each element is an array of column values
* in the same order as {@link #getColumnLabels()}.
*/
public List<Object[]> getRows() {
return rows;
}

/**
* Returns the epoch milliseconds at which this entry was created.
*/
public long getCreatedAtMs() {
return createdAtMs;
}

/**
* Returns {@code true} if the entry is older than {@code ttlMs} milliseconds.
*
* @param ttlMs time-to-live in milliseconds
*/
public boolean isExpired(long ttlMs) {
return System.currentTimeMillis() - createdAtMs > ttlMs;
}
}
Loading