-
Notifications
You must be signed in to change notification settings - Fork 83
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add a cache for GatewayBackend to HaGatewayManager #501
base: main
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -13,7 +13,8 @@ | |||||||||||||||||||||||||||||||||||||||||||||||
*/ | ||||||||||||||||||||||||||||||||||||||||||||||||
package io.trino.gateway.ha.router; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.common.collect.ImmutableList; | ||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.common.base.Supplier; | ||||||||||||||||||||||||||||||||||||||||||||||||
import com.google.common.base.Suppliers; | ||||||||||||||||||||||||||||||||||||||||||||||||
import io.trino.gateway.ha.config.ProxyBackendConfiguration; | ||||||||||||||||||||||||||||||||||||||||||||||||
import io.trino.gateway.ha.persistence.dao.GatewayBackend; | ||||||||||||||||||||||||||||||||||||||||||||||||
import io.trino.gateway.ha.persistence.dao.GatewayBackendDao; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -26,31 +27,32 @@ | |||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
import static com.google.common.collect.ImmutableList.toImmutableList; | ||||||||||||||||||||||||||||||||||||||||||||||||
import static java.util.Objects.requireNonNull; | ||||||||||||||||||||||||||||||||||||||||||||||||
import static java.util.concurrent.TimeUnit.MILLISECONDS; | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
public class HaGatewayManager | ||||||||||||||||||||||||||||||||||||||||||||||||
implements GatewayBackendManager | ||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||
private final GatewayBackendDao dao; | ||||||||||||||||||||||||||||||||||||||||||||||||
private final Supplier<List<GatewayBackend>> gatewayBackendSupplier; | ||||||||||||||||||||||||||||||||||||||||||||||||
private final AtomicReference<List<GatewayBackend>> cache = new AtomicReference<>(); | ||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should use something like a Guava/Caffeine cache here that has more proper cache management including cache expiration times. Remember that the same database will be shared by multiple GW instances. This code seems to assume that it can keep the cache in sync by simply invalidating the cache when update operations are made, but updates may be made out-of-band by other GW instances. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @xkrogen |
||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
public HaGatewayManager(Jdbi jdbi) | ||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||
dao = requireNonNull(jdbi, "jdbi is null").onDemand(GatewayBackendDao.class); | ||||||||||||||||||||||||||||||||||||||||||||||||
cache.set(ImmutableList.of()); | ||||||||||||||||||||||||||||||||||||||||||||||||
fetchAllBackendsToCache(); | ||||||||||||||||||||||||||||||||||||||||||||||||
gatewayBackendSupplier = Suppliers.memoizeWithExpiration(dao::findAll, 500, MILLISECONDS); | ||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should probably let the cache TTL be configurable since there is a perf vs staleness tradeoff here |
||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||
public List<ProxyBackendConfiguration> getAllBackends() | ||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||
List<GatewayBackend> proxyBackendList = cache.get(); | ||||||||||||||||||||||||||||||||||||||||||||||||
List<GatewayBackend> proxyBackendList = fetchAllBackends(); | ||||||||||||||||||||||||||||||||||||||||||||||||
return upcast(proxyBackendList); | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||
public List<ProxyBackendConfiguration> getAllActiveBackends() | ||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||
List<GatewayBackend> proxyBackendList = cache.get().stream() | ||||||||||||||||||||||||||||||||||||||||||||||||
List<GatewayBackend> proxyBackendList = fetchAllBackends().stream() | ||||||||||||||||||||||||||||||||||||||||||||||||
.filter(GatewayBackend::active) | ||||||||||||||||||||||||||||||||||||||||||||||||
.collect(toImmutableList()); | ||||||||||||||||||||||||||||||||||||||||||||||||
return upcast(proxyBackendList); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -65,7 +67,7 @@ public List<ProxyBackendConfiguration> getActiveAdhocBackends() | |||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||
public List<ProxyBackendConfiguration> getActiveBackends(String routingGroup) | ||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||
List<GatewayBackend> proxyBackendList = cache.get().stream() | ||||||||||||||||||||||||||||||||||||||||||||||||
List<GatewayBackend> proxyBackendList = fetchAllBackends().stream() | ||||||||||||||||||||||||||||||||||||||||||||||||
.filter(GatewayBackend::active) | ||||||||||||||||||||||||||||||||||||||||||||||||
.filter(backend -> backend.routingGroup().equals(routingGroup)) | ||||||||||||||||||||||||||||||||||||||||||||||||
.collect(toImmutableList()); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -75,7 +77,7 @@ public List<ProxyBackendConfiguration> getActiveBackends(String routingGroup) | |||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||
public Optional<ProxyBackendConfiguration> getBackendByName(String name) | ||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||
List<GatewayBackend> proxyBackendList = cache.get().stream() | ||||||||||||||||||||||||||||||||||||||||||||||||
List<GatewayBackend> proxyBackendList = fetchAllBackends().stream() | ||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So with this change, instead of letting the SQL backend do the filtering (by name / active status / routing group), we always fetch the full set of backends, and do the filtering in-process. I think this is fine. We expect the number of backends to be small, no more than in the tens, so it shouldn't be a substantial perf hit. But let's call it out clearly in the PR description. This also means we can remove all the (newly) unused methods in trino-gateway/gateway-ha/src/main/java/io/trino/gateway/ha/persistence/dao/GatewayBackendDao.java Lines 26 to 48 in d9998b1
|
||||||||||||||||||||||||||||||||||||||||||||||||
.filter(backend -> backend.name().equals(name)) | ||||||||||||||||||||||||||||||||||||||||||||||||
.collect(toImmutableList()); | ||||||||||||||||||||||||||||||||||||||||||||||||
return upcast(proxyBackendList).stream().findAny(); | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -85,21 +87,18 @@ public Optional<ProxyBackendConfiguration> getBackendByName(String name) | |||||||||||||||||||||||||||||||||||||||||||||||
public void deactivateBackend(String backendName) | ||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||
dao.deactivate(backendName); | ||||||||||||||||||||||||||||||||||||||||||||||||
fetchAllBackendsToCache(); | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||
public void activateBackend(String backendName) | ||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||
dao.activate(backendName); | ||||||||||||||||||||||||||||||||||||||||||||||||
fetchAllBackendsToCache(); | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
@Override | ||||||||||||||||||||||||||||||||||||||||||||||||
public ProxyBackendConfiguration addBackend(ProxyBackendConfiguration backend) | ||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||
dao.create(backend.getName(), backend.getRoutingGroup(), backend.getProxyTo(), backend.getExternalUrl(), backend.isActive()); | ||||||||||||||||||||||||||||||||||||||||||||||||
fetchAllBackendsToCache(); | ||||||||||||||||||||||||||||||||||||||||||||||||
return backend; | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -113,14 +112,12 @@ public ProxyBackendConfiguration updateBackend(ProxyBackendConfiguration backend | |||||||||||||||||||||||||||||||||||||||||||||||
else { | ||||||||||||||||||||||||||||||||||||||||||||||||
dao.update(backend.getName(), backend.getRoutingGroup(), backend.getProxyTo(), backend.getExternalUrl(), backend.isActive()); | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
fetchAllBackendsToCache(); | ||||||||||||||||||||||||||||||||||||||||||||||||
return backend; | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
public void deleteBackend(String name) | ||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||
dao.deleteByName(name); | ||||||||||||||||||||||||||||||||||||||||||||||||
fetchAllBackendsToCache(); | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
private static List<ProxyBackendConfiguration> upcast(List<GatewayBackend> gatewayBackendList) | ||||||||||||||||||||||||||||||||||||||||||||||||
|
@@ -138,8 +135,15 @@ private static List<ProxyBackendConfiguration> upcast(List<GatewayBackend> gatew | |||||||||||||||||||||||||||||||||||||||||||||||
return proxyBackendConfigurations; | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||
private void fetchAllBackendsToCache() | ||||||||||||||||||||||||||||||||||||||||||||||||
private List<GatewayBackend> fetchAllBackends() | ||||||||||||||||||||||||||||||||||||||||||||||||
{ | ||||||||||||||||||||||||||||||||||||||||||||||||
cache.set(dao.findAll()); | ||||||||||||||||||||||||||||||||||||||||||||||||
try { | ||||||||||||||||||||||||||||||||||||||||||||||||
List<GatewayBackend> backends = gatewayBackendSupplier.get(); | ||||||||||||||||||||||||||||||||||||||||||||||||
cache.set(backends); | ||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Trying to understand the logic here since we are kind of layering two caches on top of each other -- the memoized supplier and the
This makes sense to me, but it's a bit confusing to track how these two interact. Why not just use a Guava Cache? It gives you much more configurability/flexibility so we can have all of this logic in a single place (using private static final Logger LOG = Logger.get(HaGatewayManager.class);
private static final Object ALL_BACKEND_CACHE_KEY = new Object();
private final GatewayBackendDao dao;
private final LoadingCache<Object, List<GatewayBackend>> backendCache;
private final CounterStat backendLookupSuccesses = new CounterStat();
private final CounterStat backendLookupFailures = new CounterStat();
public HaGatewayManager(Jdbi jdbi)
{
dao = requireNonNull(jdbi, "jdbi is null").onDemand(GatewayBackendDao.class);
backendCache = CacheBuilder
.newBuilder()
.initialCapacity(1)
.refreshAfterWrite(Duration.ofSeconds(1))
.build(CacheLoader.asyncReloading(
CacheLoader.from(this::loadAllBackends),
MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor())));
}
private List<GatewayBackend> loadAllBackends()
{
try {
var backends = dao.findAll();
backendLookupSuccesses.update(1);
return backends;
} catch (Exception e) {
backendLookupFailures.update(1);
LOG.warn(e, "Failed to load backends");
throw e;
}
}
// probably should be named "getAllBackends()" since it may or may not actually do any fetch
private List<GatewayBackend> fetchAllBackends()
{
return backendCache.getUnchecked(ALL_BACKEND_CACHE_KEY);
} |
||||||||||||||||||||||||||||||||||||||||||||||||
return backends; | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
catch (Exception e) { | ||||||||||||||||||||||||||||||||||||||||||||||||
return cache.get(); | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ | |
package io.trino.gateway.ha.router; | ||
|
||
import com.google.common.base.Strings; | ||
import io.airlift.log.Logger; | ||
import io.trino.gateway.ha.domain.TableData; | ||
import io.trino.gateway.ha.domain.request.QueryHistoryRequest; | ||
import io.trino.gateway.ha.domain.response.DistributionResponse; | ||
|
@@ -36,6 +37,7 @@ | |
public class HaQueryHistoryManager | ||
implements QueryHistoryManager | ||
{ | ||
private final Logger logger = Logger.get(HaQueryHistoryManager.class); | ||
private final QueryHistoryDao dao; | ||
|
||
public HaQueryHistoryManager(Jdbi jdbi) | ||
|
@@ -51,13 +53,18 @@ public void submitQueryDetail(QueryDetail queryDetail) | |
return; | ||
} | ||
|
||
dao.insertHistory( | ||
queryDetail.getQueryId(), | ||
queryDetail.getQueryText(), | ||
queryDetail.getBackendUrl(), | ||
queryDetail.getUser(), | ||
queryDetail.getSource(), | ||
queryDetail.getCaptureTime()); | ||
try { | ||
dao.insertHistory( | ||
queryDetail.getQueryId(), | ||
queryDetail.getQueryText(), | ||
queryDetail.getBackendUrl(), | ||
queryDetail.getUser(), | ||
queryDetail.getSource(), | ||
queryDetail.getCaptureTime()); | ||
} | ||
catch (Exception e) { | ||
logger.error(e); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let's provide some additional context in the error message. We also need to emit some metrics based on this. We shouldn't just silently fail -- this can mask errors. When this happens, we should have metics that alerting can be set up on. |
||
} | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add some testing for this as well. We want to ensure that, even if the DB goes down, we can still submit new queries.
We can add a new
TestGatewayHaDataStoreFailure
where we set up a gateway, populate some backends, submit a query to populate the cache, then simulate a database failure. After this, we should try to submit new queries, and ensure everything works as expected.