Skip to content

Add cluster activation status metric and emit to v1/jmx #673

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions gateway-ha/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,4 @@ clusterStatsConfiguration:
# This can be adjusted based on the coordinator state
monitor:
taskDelaySeconds: 10
clusterMetricsRegistryRefreshPeriod: 30s
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.inject.Scopes;
import io.airlift.log.Logger;
import io.trino.gateway.ha.clustermonitor.ActiveClusterMonitor;
import io.trino.gateway.ha.clustermonitor.ClusterMetricsStatsExporter;
import io.trino.gateway.ha.clustermonitor.ForMonitor;
import io.trino.gateway.ha.config.HaGatewayConfiguration;
import io.trino.gateway.ha.handler.ProxyHandlerStats;
Expand Down Expand Up @@ -146,6 +147,7 @@ public void configure(Binder binder)
binder.bind(ProxyHandlerStats.class).in(Scopes.SINGLETON);
newExporter(binder).export(ProxyHandlerStats.class).withGeneratedName();
binder.bind(RoutingRulesManager.class);
binder.bind(ClusterMetricsStatsExporter.class).in(Scopes.SINGLETON);
}

private static void addManagedApps(HaGatewayConfiguration configuration, Binder binder)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.clustermonitor;

import io.trino.gateway.ha.router.GatewayBackendManager;
import org.weakref.jmx.Managed;

import static java.util.Objects.requireNonNull;

public class ClusterMetricsStats
{
private final String clusterName;
private final GatewayBackendManager gatewayBackendManager;

public ClusterMetricsStats(String clusterName, GatewayBackendManager gatewayBackendManager)
{
this.clusterName = requireNonNull(clusterName, "clusterName is null");
this.gatewayBackendManager = requireNonNull(gatewayBackendManager, "gatewayBackendManager is null");
}

public String getClusterName()
{
return clusterName;
}

@Managed
public int getActivationStatus()
{
return gatewayBackendManager.getBackendByName(clusterName)
.map(cluster -> cluster.isActive() ? 1 : 0)
.orElse(-1);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.clustermonitor;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.airlift.log.Logger;
import io.airlift.units.Duration;
import io.trino.gateway.ha.config.MonitorConfiguration;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import io.trino.gateway.ha.router.GatewayBackendManager;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import org.weakref.jmx.MBeanExporter;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

@Singleton
public class ClusterMetricsStatsExporter
implements AutoCloseable
{
private static final Logger log = Logger.get(ClusterMetricsStatsExporter.class);

private final MBeanExporter exporter;
private final GatewayBackendManager gatewayBackendManager;
private final Duration refreshInterval;
// MBeanExporter uses weak references, so clustersStats Map is needed to maintain strong references to metric objects to prevent garbage collection
private final Map<String, ClusterMetricsStats> clustersStats = new HashMap<>();
private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor();

@Inject
public ClusterMetricsStatsExporter(GatewayBackendManager gatewayBackendManager, MBeanExporter exporter, MonitorConfiguration monitorConfiguration)
{
this.gatewayBackendManager = requireNonNull(gatewayBackendManager, "gatewayBackendManager is null");
this.exporter = requireNonNull(exporter, "exporter is null");
this.refreshInterval = monitorConfiguration.getClusterMetricsRegistryRefreshPeriod();
}

@PostConstruct
public void start()
{
log.debug("Running periodic metric refresh with interval of %s", refreshInterval);
scheduledExecutor.scheduleAtFixedRate(() -> {
try {
updateClustersMetricRegistry();
}
catch (Exception e) {
log.error(e, "Error refreshing cluster metrics");
}
}, 0, refreshInterval.toMillis(), MILLISECONDS);
}

@PreDestroy
public void stop()
{
scheduledExecutor.shutdownNow();
}

private synchronized void updateClustersMetricRegistry()
{
// Get current clusters from DB
Set<String> currentClusters = gatewayBackendManager.getAllBackends().stream()
.map(ProxyBackendConfiguration::getName)
.collect(Collectors.toSet());

// Create a copy of keys to avoid concurrent modification
Set<String> registeredClusters = new HashSet<>(clustersStats.keySet());

// Unregister metrics for removed clusters
for (String registeredCluster : registeredClusters) {
if (!currentClusters.contains(registeredCluster)) {
try {
exporter.unexportWithGeneratedName(ClusterMetricsStats.class, registeredCluster);
log.debug("Unregistered metrics for removed cluster: %s", registeredCluster);
clustersStats.remove(registeredCluster);
}
catch (Exception e) {
log.error(e, "Failed to unregister metrics for cluster: %s", registeredCluster);
}
}
}

// Register metrics for added clusters
for (String cluster : currentClusters) {
if (!clustersStats.containsKey(cluster)) {
registerClusterMetrics(cluster);
}
}
}

private synchronized void registerClusterMetrics(String clusterName)
{
ClusterMetricsStats stats = new ClusterMetricsStats(clusterName, gatewayBackendManager);

if (clustersStats.putIfAbsent(clusterName, stats) == null) { // null means the stats didn't exist previously and was inserted
try {
exporter.exportWithGeneratedName(stats, ClusterMetricsStats.class, clusterName);
log.debug("Registered metrics for cluster: %s", clusterName);
}
catch (Exception e) {
clustersStats.remove(clusterName);
log.error(e, "Failed to register metrics for cluster: %s", clusterName);
}
}
}

GatewayBackendManager getGatewayBackendManager()
{
return gatewayBackendManager;
}

MBeanExporter getExporter()
{
return exporter;
}

@Override
public void close()
{
stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class MonitorConfiguration

private Duration queryTimeout = new Duration(10, SECONDS);

private Duration clusterMetricsRegistryRefreshPeriod = new Duration(30, SECONDS);

private boolean explicitPrepare;

private String metricsEndpoint = "/metrics";
Expand Down Expand Up @@ -133,4 +135,14 @@ public void setMetricMaximumValues(Map<String, Float> metricMaximumValues)
{
this.metricMaximumValues = metricMaximumValues;
}

public Duration getClusterMetricsRegistryRefreshPeriod()
{
return clusterMetricsRegistryRefreshPeriod;
}

public void setClusterMetricsRegistryRefreshPeriod(Duration clusterMetricsRegistryRefreshPeriod)
{
this.clusterMetricsRegistryRefreshPeriod = clusterMetricsRegistryRefreshPeriod;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.gateway.ha.clustermonitor;

import io.airlift.units.Duration;
import io.trino.gateway.ha.config.MonitorConfiguration;
import io.trino.gateway.ha.config.ProxyBackendConfiguration;
import io.trino.gateway.ha.router.GatewayBackendManager;
import org.junit.jupiter.api.Test;
import org.weakref.jmx.MBeanExporter;

import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

final class TestClusterMetricsStatsExporter
{
@Test
void testMetricsRegistrationForNewCluster()
throws InterruptedException
{
try (ClusterMetricsStatsExporter statsExporter = createStatsExporter()) {
String clusterName1 = "test-cluster1";
ProxyBackendConfiguration cluster1 = createTestCluster(clusterName1);
String clusterName2 = "test-cluster2";
ProxyBackendConfiguration cluster2 = createTestCluster(clusterName2);
when(statsExporter.getGatewayBackendManager().getAllBackends())
.thenReturn(List.of(cluster1)) // First return with 1 cluster
.thenReturn(List.of(cluster1, cluster2)); // Then return with 2 clusters to simulate addition

statsExporter.start();
Thread.sleep(2000);

verify(statsExporter.getExporter()).exportWithGeneratedName(
argThat(stats -> stats instanceof ClusterMetricsStats && ((ClusterMetricsStats) stats).getClusterName().equals(clusterName1)),
eq(ClusterMetricsStats.class), eq(clusterName1));

// Wait for next update where cluster is added
statsExporter.start();
Thread.sleep(2000);

verify(statsExporter.getExporter()).exportWithGeneratedName(
argThat(stats -> stats instanceof ClusterMetricsStats && ((ClusterMetricsStats) stats).getClusterName().equals(clusterName2)),
eq(ClusterMetricsStats.class), eq(clusterName2));
}
}

@Test
public void testMetricsUnregistrationForRemovedCluster()
throws InterruptedException
{
try (ClusterMetricsStatsExporter statsExporter = createStatsExporter()) {
String clusterName = "test-cluster";
ProxyBackendConfiguration cluster = createTestCluster(clusterName);
when(statsExporter.getGatewayBackendManager().getAllBackends())
.thenReturn(List.of(cluster)) // First return with cluster
.thenReturn(List.of()); // Then return empty list to simulate removal

statsExporter.start();
Thread.sleep(2000);

verify(statsExporter.getExporter()).exportWithGeneratedName(
argThat(stats -> stats instanceof ClusterMetricsStats && ((ClusterMetricsStats) stats).getClusterName().equals(clusterName)),
eq(ClusterMetricsStats.class), eq(clusterName));

// Wait for next update where cluster is removed
Thread.sleep(2000);

verify(statsExporter.getExporter()).unexportWithGeneratedName(eq(ClusterMetricsStats.class), eq(clusterName));
}
}

private static ProxyBackendConfiguration createTestCluster(String name)
{
ProxyBackendConfiguration cluster = new ProxyBackendConfiguration();
cluster.setName(name);
return cluster;
}

private ClusterMetricsStatsExporter createStatsExporter()
{
GatewayBackendManager gatewayBackendManager = mock(GatewayBackendManager.class);
MBeanExporter exporter = mock(MBeanExporter.class);
MonitorConfiguration monitorConfiguration = mock(MonitorConfiguration.class);

when(monitorConfiguration.getClusterMetricsRegistryRefreshPeriod())
.thenReturn(new Duration(1, TimeUnit.SECONDS));

return new ClusterMetricsStatsExporter(
gatewayBackendManager,
exporter,
monitorConfiguration);
}
}