diff --git a/gateway-ha/config.yaml b/gateway-ha/config.yaml index cd1ccfbc4..b2c7c3dd3 100644 --- a/gateway-ha/config.yaml +++ b/gateway-ha/config.yaml @@ -19,3 +19,4 @@ clusterStatsConfiguration: # This can be adjusted based on the coordinator state monitor: taskDelaySeconds: 10 + clusterMetricsRegistryRefreshPeriod: 30s diff --git a/gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java b/gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java index 33302dc9a..13e0b874e 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java +++ b/gateway-ha/src/main/java/io/trino/gateway/baseapp/BaseApp.java @@ -19,6 +19,7 @@ import com.google.inject.Scopes; import io.airlift.log.Logger; import io.trino.gateway.ha.clustermonitor.ActiveClusterMonitor; +import io.trino.gateway.ha.clustermonitor.ClusterMetricsStatsExporter; import io.trino.gateway.ha.clustermonitor.ForMonitor; import io.trino.gateway.ha.config.HaGatewayConfiguration; import io.trino.gateway.ha.handler.ProxyHandlerStats; @@ -146,6 +147,7 @@ public void configure(Binder binder) binder.bind(ProxyHandlerStats.class).in(Scopes.SINGLETON); newExporter(binder).export(ProxyHandlerStats.class).withGeneratedName(); binder.bind(RoutingRulesManager.class); + binder.bind(ClusterMetricsStatsExporter.class).in(Scopes.SINGLETON); } private static void addManagedApps(HaGatewayConfiguration configuration, Binder binder) diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterMetricsStats.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterMetricsStats.java new file mode 100644 index 000000000..aecbe8aaf --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterMetricsStats.java @@ -0,0 +1,44 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.clustermonitor; + +import io.trino.gateway.ha.router.GatewayBackendManager; +import org.weakref.jmx.Managed; + +import static java.util.Objects.requireNonNull; + +public class ClusterMetricsStats +{ + private final String clusterName; + private final GatewayBackendManager gatewayBackendManager; + + public ClusterMetricsStats(String clusterName, GatewayBackendManager gatewayBackendManager) + { + this.clusterName = requireNonNull(clusterName, "clusterName is null"); + this.gatewayBackendManager = requireNonNull(gatewayBackendManager, "gatewayBackendManager is null"); + } + + public String getClusterName() + { + return clusterName; + } + + @Managed + public int getActivationStatus() + { + return gatewayBackendManager.getBackendByName(clusterName) + .map(cluster -> cluster.isActive() ? 1 : 0) + .orElse(-1); + } +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterMetricsStatsExporter.java b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterMetricsStatsExporter.java new file mode 100644 index 000000000..06db0d730 --- /dev/null +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/clustermonitor/ClusterMetricsStatsExporter.java @@ -0,0 +1,142 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.clustermonitor; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.airlift.log.Logger; +import io.airlift.units.Duration; +import io.trino.gateway.ha.config.MonitorConfiguration; +import io.trino.gateway.ha.config.ProxyBackendConfiguration; +import io.trino.gateway.ha.router.GatewayBackendManager; +import jakarta.annotation.PostConstruct; +import jakarta.annotation.PreDestroy; +import org.weakref.jmx.MBeanExporter; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; + +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.MILLISECONDS; + +@Singleton +public class ClusterMetricsStatsExporter + implements AutoCloseable +{ + private static final Logger log = Logger.get(ClusterMetricsStatsExporter.class); + + private final MBeanExporter exporter; + private final GatewayBackendManager gatewayBackendManager; + private final Duration refreshInterval; + // MBeanExporter uses weak references, so clustersStats Map is needed to maintain strong references to metric objects to prevent garbage collection + private final Map clustersStats = new HashMap<>(); + private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(); + + @Inject + public ClusterMetricsStatsExporter(GatewayBackendManager gatewayBackendManager, MBeanExporter exporter, MonitorConfiguration monitorConfiguration) + { + this.gatewayBackendManager = requireNonNull(gatewayBackendManager, "gatewayBackendManager is null"); + this.exporter = requireNonNull(exporter, "exporter is null"); + this.refreshInterval = monitorConfiguration.getClusterMetricsRegistryRefreshPeriod(); + } + + @PostConstruct + public void start() + { + log.debug("Running periodic metric refresh with interval of %s", refreshInterval); + scheduledExecutor.scheduleAtFixedRate(() -> { + try { + updateClustersMetricRegistry(); + } + catch (Exception e) { + log.error(e, "Error refreshing cluster metrics"); + } + }, 0, refreshInterval.toMillis(), MILLISECONDS); + } + + @PreDestroy + public void stop() + { + scheduledExecutor.shutdownNow(); + } + + private synchronized void updateClustersMetricRegistry() + { + // Get current clusters from DB + Set currentClusters = gatewayBackendManager.getAllBackends().stream() + .map(ProxyBackendConfiguration::getName) + .collect(Collectors.toSet()); + + // Create a copy of keys to avoid concurrent modification + Set registeredClusters = new HashSet<>(clustersStats.keySet()); + + // Unregister metrics for removed clusters + for (String registeredCluster : registeredClusters) { + if (!currentClusters.contains(registeredCluster)) { + try { + exporter.unexportWithGeneratedName(ClusterMetricsStats.class, registeredCluster); + log.debug("Unregistered metrics for removed cluster: %s", registeredCluster); + clustersStats.remove(registeredCluster); + } + catch (Exception e) { + log.error(e, "Failed to unregister metrics for cluster: %s", registeredCluster); + } + } + } + + // Register metrics for added clusters + for (String cluster : currentClusters) { + if (!clustersStats.containsKey(cluster)) { + registerClusterMetrics(cluster); + } + } + } + + private synchronized void registerClusterMetrics(String clusterName) + { + ClusterMetricsStats stats = new ClusterMetricsStats(clusterName, gatewayBackendManager); + + if (clustersStats.putIfAbsent(clusterName, stats) == null) { // null means the stats didn't exist previously and was inserted + try { + exporter.exportWithGeneratedName(stats, ClusterMetricsStats.class, clusterName); + log.debug("Registered metrics for cluster: %s", clusterName); + } + catch (Exception e) { + clustersStats.remove(clusterName); + log.error(e, "Failed to register metrics for cluster: %s", clusterName); + } + } + } + + GatewayBackendManager getGatewayBackendManager() + { + return gatewayBackendManager; + } + + MBeanExporter getExporter() + { + return exporter; + } + + @Override + public void close() + { + stop(); + } +} diff --git a/gateway-ha/src/main/java/io/trino/gateway/ha/config/MonitorConfiguration.java b/gateway-ha/src/main/java/io/trino/gateway/ha/config/MonitorConfiguration.java index 81750a34e..480be2c57 100644 --- a/gateway-ha/src/main/java/io/trino/gateway/ha/config/MonitorConfiguration.java +++ b/gateway-ha/src/main/java/io/trino/gateway/ha/config/MonitorConfiguration.java @@ -29,6 +29,8 @@ public class MonitorConfiguration private Duration queryTimeout = new Duration(10, SECONDS); + private Duration clusterMetricsRegistryRefreshPeriod = new Duration(30, SECONDS); + private boolean explicitPrepare; private String metricsEndpoint = "/metrics"; @@ -133,4 +135,14 @@ public void setMetricMaximumValues(Map metricMaximumValues) { this.metricMaximumValues = metricMaximumValues; } + + public Duration getClusterMetricsRegistryRefreshPeriod() + { + return clusterMetricsRegistryRefreshPeriod; + } + + public void setClusterMetricsRegistryRefreshPeriod(Duration clusterMetricsRegistryRefreshPeriod) + { + this.clusterMetricsRegistryRefreshPeriod = clusterMetricsRegistryRefreshPeriod; + } } diff --git a/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterMetricsStatsExporter.java b/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterMetricsStatsExporter.java new file mode 100644 index 000000000..133bda04b --- /dev/null +++ b/gateway-ha/src/test/java/io/trino/gateway/ha/clustermonitor/TestClusterMetricsStatsExporter.java @@ -0,0 +1,110 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.gateway.ha.clustermonitor; + +import io.airlift.units.Duration; +import io.trino.gateway.ha.config.MonitorConfiguration; +import io.trino.gateway.ha.config.ProxyBackendConfiguration; +import io.trino.gateway.ha.router.GatewayBackendManager; +import org.junit.jupiter.api.Test; +import org.weakref.jmx.MBeanExporter; + +import java.util.List; +import java.util.concurrent.TimeUnit; + +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +final class TestClusterMetricsStatsExporter +{ + @Test + void testMetricsRegistrationForNewCluster() + throws InterruptedException + { + try (ClusterMetricsStatsExporter statsExporter = createStatsExporter()) { + String clusterName1 = "test-cluster1"; + ProxyBackendConfiguration cluster1 = createTestCluster(clusterName1); + String clusterName2 = "test-cluster2"; + ProxyBackendConfiguration cluster2 = createTestCluster(clusterName2); + when(statsExporter.getGatewayBackendManager().getAllBackends()) + .thenReturn(List.of(cluster1)) // First return with 1 cluster + .thenReturn(List.of(cluster1, cluster2)); // Then return with 2 clusters to simulate addition + + statsExporter.start(); + Thread.sleep(2000); + + verify(statsExporter.getExporter()).exportWithGeneratedName( + argThat(stats -> stats instanceof ClusterMetricsStats && ((ClusterMetricsStats) stats).getClusterName().equals(clusterName1)), + eq(ClusterMetricsStats.class), eq(clusterName1)); + + // Wait for next update where cluster is added + statsExporter.start(); + Thread.sleep(2000); + + verify(statsExporter.getExporter()).exportWithGeneratedName( + argThat(stats -> stats instanceof ClusterMetricsStats && ((ClusterMetricsStats) stats).getClusterName().equals(clusterName2)), + eq(ClusterMetricsStats.class), eq(clusterName2)); + } + } + + @Test + public void testMetricsUnregistrationForRemovedCluster() + throws InterruptedException + { + try (ClusterMetricsStatsExporter statsExporter = createStatsExporter()) { + String clusterName = "test-cluster"; + ProxyBackendConfiguration cluster = createTestCluster(clusterName); + when(statsExporter.getGatewayBackendManager().getAllBackends()) + .thenReturn(List.of(cluster)) // First return with cluster + .thenReturn(List.of()); // Then return empty list to simulate removal + + statsExporter.start(); + Thread.sleep(2000); + + verify(statsExporter.getExporter()).exportWithGeneratedName( + argThat(stats -> stats instanceof ClusterMetricsStats && ((ClusterMetricsStats) stats).getClusterName().equals(clusterName)), + eq(ClusterMetricsStats.class), eq(clusterName)); + + // Wait for next update where cluster is removed + Thread.sleep(2000); + + verify(statsExporter.getExporter()).unexportWithGeneratedName(eq(ClusterMetricsStats.class), eq(clusterName)); + } + } + + private static ProxyBackendConfiguration createTestCluster(String name) + { + ProxyBackendConfiguration cluster = new ProxyBackendConfiguration(); + cluster.setName(name); + return cluster; + } + + private ClusterMetricsStatsExporter createStatsExporter() + { + GatewayBackendManager gatewayBackendManager = mock(GatewayBackendManager.class); + MBeanExporter exporter = mock(MBeanExporter.class); + MonitorConfiguration monitorConfiguration = mock(MonitorConfiguration.class); + + when(monitorConfiguration.getClusterMetricsRegistryRefreshPeriod()) + .thenReturn(new Duration(1, TimeUnit.SECONDS)); + + return new ClusterMetricsStatsExporter( + gatewayBackendManager, + exporter, + monitorConfiguration); + } +}