Skip to content

Commit e52be50

Browse files
committed
add optional db connection pool support
1 parent 4c6bf99 commit e52be50

File tree

3 files changed

+82
-26
lines changed

3 files changed

+82
-26
lines changed

gateway-ha/src/main/java/io/trino/gateway/ha/config/DataStoreConfiguration.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ public class DataStoreConfiguration
2323
private boolean runMigrationsEnabled = true;
2424
private Integer maxPoolSize;
2525

26-
public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled, Integer maxPoolSize) {
26+
public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled, Integer maxPoolSize)
27+
{
2728
this.jdbcUrl = jdbcUrl;
2829
this.user = user;
2930
this.password = password;
@@ -33,15 +34,18 @@ public DataStoreConfiguration(String jdbcUrl, String user, String password, Stri
3334
this.maxPoolSize = maxPoolSize;
3435
}
3536

36-
public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled) {
37+
public DataStoreConfiguration(String jdbcUrl, String user, String password, String driver, Integer queryHistoryHoursRetention, boolean runMigrationsEnabled)
38+
{
3739
this(jdbcUrl, user, password, driver, queryHistoryHoursRetention, runMigrationsEnabled, null);
3840
}
3941

40-
public Integer getMaxPoolSize() {
42+
public Integer getMaxPoolSize()
43+
{
4144
return this.maxPoolSize;
4245
}
4346

44-
public void setMaxPoolSize(Integer maxPoolSize) {
47+
public void setMaxPoolSize(Integer maxPoolSize)
48+
{
4549
this.maxPoolSize = maxPoolSize;
4650
}
4751

gateway-ha/src/main/java/io/trino/gateway/ha/persistence/JdbcConnectionManager.java

Lines changed: 53 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import java.net.URI;
2727
import java.net.URISyntaxException;
2828
import java.nio.file.Path;
29+
import java.util.Map;
30+
import java.util.concurrent.ConcurrentHashMap;
2931
import java.util.concurrent.Executors;
3032
import java.util.concurrent.ScheduledExecutorService;
3133
import java.util.concurrent.TimeUnit;
@@ -41,6 +43,8 @@ public class JdbcConnectionManager
4143
private final ScheduledExecutorService executorService =
4244
Executors.newSingleThreadScheduledExecutor();
4345

46+
private final Map<String, HikariDataSource> pools = new ConcurrentHashMap<>();
47+
4448
public JdbcConnectionManager(Jdbi jdbi, DataStoreConfiguration configuration)
4549
{
4650
this.jdbi = requireNonNull(jdbi, "jdbi is null")
@@ -61,21 +65,18 @@ public Jdbi getJdbi(@Nullable String routingGroupDatabase)
6165
return jdbi;
6266
}
6367

64-
if (configuration.getMaxPoolSize() != null && configuration.getMaxPoolSize() > 0) {
65-
HikariConfig hikariConfig = new HikariConfig();
66-
hikariConfig.setJdbcUrl(buildJdbcUrl(routingGroupDatabase));
67-
hikariConfig.setUsername(configuration.getUser());
68-
hikariConfig.setPassword(configuration.getPassword());
69-
if (configuration.getDriver() != null) {
70-
hikariConfig.setDriverClassName(configuration.getDriver());
71-
}
72-
hikariConfig.setMaximumPoolSize(configuration.getMaxPoolSize());
73-
return Jdbi.create(new HikariDataSource(hikariConfig))
68+
Integer maxPoolSize = configuration.getMaxPoolSize();
69+
if (maxPoolSize != null && maxPoolSize > 0) {
70+
HikariDataSource ds = getOrCreateDataSource(routingGroupDatabase, maxPoolSize);
71+
return Jdbi.create(ds)
7472
.installPlugin(new SqlObjectPlugin())
7573
.registerRowMapper(new RecordAndAnnotatedConstructorMapper());
7674
}
7775

78-
return Jdbi.create(buildJdbcUrl(routingGroupDatabase), configuration.getUser(), configuration.getPassword())
76+
return Jdbi.create(
77+
buildJdbcUrl(routingGroupDatabase),
78+
configuration.getUser(),
79+
configuration.getPassword())
7980
.installPlugin(new SqlObjectPlugin())
8081
.registerRowMapper(new RecordAndAnnotatedConstructorMapper());
8182
}
@@ -123,11 +124,51 @@ private void startCleanUps()
123124
executorService.scheduleWithFixedDelay(
124125
() -> {
125126
log.info("Performing query history cleanup task");
126-
long created = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(this.configuration.getQueryHistoryHoursRetention());
127+
long created = System.currentTimeMillis()
128+
- TimeUnit.HOURS.toMillis(this.configuration.getQueryHistoryHoursRetention());
127129
jdbi.onDemand(QueryHistoryDao.class).deleteOldHistory(created);
128130
},
129131
1,
130132
120,
131133
TimeUnit.MINUTES);
132134
}
135+
136+
private HikariDataSource getOrCreateDataSource(String routingGroupDatabase, int maxPoolSize)
137+
{
138+
return pools.compute(routingGroupDatabase, (key, existing) -> {
139+
if (existing != null && !existing.isClosed()) {
140+
return existing;
141+
}
142+
143+
HikariConfig cfg = new HikariConfig();
144+
cfg.setJdbcUrl(buildJdbcUrl(key));
145+
cfg.setUsername(configuration.getUser());
146+
cfg.setPassword(configuration.getPassword());
147+
if (configuration.getDriver() != null) {
148+
cfg.setDriverClassName(configuration.getDriver());
149+
}
150+
cfg.setMaximumPoolSize(maxPoolSize);
151+
cfg.setPoolName("gateway-ha-" + key);
152+
153+
return new HikariDataSource(cfg);
154+
});
155+
}
156+
157+
public void close()
158+
{
159+
for (Map.Entry<String, HikariDataSource> e : pools.entrySet()) {
160+
HikariDataSource ds = e.getValue();
161+
if (ds != null && !ds.isClosed()) {
162+
try {
163+
ds.close();
164+
}
165+
catch (RuntimeException ex) {
166+
log.warn(ex, "Failed to close datasource for key: %s", e.getKey());
167+
}
168+
}
169+
}
170+
pools.clear();
171+
172+
executorService.shutdownNow();
173+
}
133174
}

gateway-ha/src/test/java/io/trino/gateway/ha/persistence/TestJdbcConnectionManagerPool.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,28 @@
2121
import java.sql.Connection;
2222
import java.util.ArrayList;
2323
import java.util.List;
24-
import java.util.concurrent.*;
24+
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
27+
import java.util.concurrent.Future;
28+
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.TimeoutException;
2530

2631
import static org.assertj.core.api.Assertions.assertThat;
2732

28-
final class TestJdbcConnectionManagerPool {
33+
final class TestJdbcConnectionManagerPool
34+
{
2935
@Test
30-
void blocksWhenExceedingMaxPoolSize() throws Exception {
36+
void blocksWhenExceedingMaxPoolSize()
37+
throws Exception
38+
{
3139
String dbPath = Path.of(System.getProperty("java.io.tmpdir"), "h2db-pool-" + System.currentTimeMillis()).toString();
3240
String jdbcUrl = "jdbc:h2:" + dbPath;
3341

3442
DataStoreConfiguration cfg = new DataStoreConfiguration(
3543
jdbcUrl, "sa", "sa", "org.h2.Driver",
3644
4, true,
37-
2
38-
);
45+
2);
3946

4047
JdbcConnectionManager cm = new JdbcConnectionManager(Jdbi.create(jdbcUrl, "sa", "sa"), cfg);
4148
Jdbi jdbi = cm.getJdbi("testdb");
@@ -73,7 +80,8 @@ void blocksWhenExceedingMaxPoolSize() throws Exception {
7380
try {
7481
third.get(200, TimeUnit.MILLISECONDS);
7582
completedIn200ms = true; // if this happens when connection was not blocked, which is wrong
76-
} catch (TimeoutException expected) {
83+
}
84+
catch (TimeoutException expected) {
7785
// expected, means the request was blocked on the pool
7886
}
7987

@@ -92,9 +100,10 @@ void blocksWhenExceedingMaxPoolSize() throws Exception {
92100
}
93101
}
94102

95-
96103
@Test
97-
void doesNotBlockWhenMaxPoolSizeIsNull() throws Exception {
104+
void doesNotBlockWhenMaxPoolSizeIsNull()
105+
throws Exception
106+
{
98107
String dbPath = Path.of(System.getProperty("java.io.tmpdir"), "h2db-nopool-" + System.currentTimeMillis()).toString();
99108
String jdbcUrl = "jdbc:h2:" + dbPath;
100109

@@ -138,7 +147,8 @@ void doesNotBlockWhenMaxPoolSizeIsNull() throws Exception {
138147
try {
139148
third.get(200, TimeUnit.MILLISECONDS);
140149
completedIn200ms = true; // not blocked - expected behavior
141-
} catch (TimeoutException ignore) {
150+
}
151+
catch (TimeoutException ignore) {
142152
completedIn200ms = false; // blocked - incorrect for no-pool case
143153
}
144154

@@ -156,7 +166,8 @@ void doesNotBlockWhenMaxPoolSizeIsNull() throws Exception {
156166
// Release the first two connections
157167
hold.countDown();
158168
assertThat(third.get(3, TimeUnit.SECONDS)).isTrue();
159-
} finally {
169+
}
170+
finally {
160171
es.shutdownNow();
161172
}
162173
}

0 commit comments

Comments
 (0)