diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java index ff95d8d0dd..e1fb4b46b6 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroManagementConf.java @@ -233,6 +233,13 @@ public class AmoroManagementConf { .defaultValue(1260) .withDescription("Port that the table service thrift server is bound to."); + public static final ConfigOption TABLE_SERVICE_IMPL = + ConfigOptions.key("table-service.impl") + .stringType() + .defaultValue("default") + .withDescription( + "TableService implementation provider name or FQCN. Default is 'default'."); + public static final ConfigOption OPTIMIZING_SERVICE_THRIFT_BIND_PORT = ConfigOptions.key("thrift-server.optimizing-service.bind-port") .intType() diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java index ddbf8a1c73..c7ecf3fcdc 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/AmoroServiceContainer.java @@ -47,10 +47,10 @@ import org.apache.amoro.server.resource.OptimizerManager; import org.apache.amoro.server.scheduler.inline.InlineTableExecutors; import org.apache.amoro.server.table.DefaultTableManager; -import org.apache.amoro.server.table.DefaultTableService; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableManager; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.TableServiceLoader; import org.apache.amoro.server.terminal.TerminalManager; import org.apache.amoro.server.utils.ThriftServiceProxy; import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; @@ -167,7 +167,7 @@ public void startRestServices() throws Exception { } public void startOptimizingService() throws Exception { - tableService = new DefaultTableService(serviceConfig, catalogManager); + tableService = TableServiceLoader.load(serviceConfig, catalogManager); optimizingService = new DefaultOptimizingService(serviceConfig, catalogManager, optimizerManager, tableService); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java index 97f73894ff..9ed2adb138 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/DefaultTableService.java @@ -38,7 +38,6 @@ import org.apache.amoro.server.persistence.TableRuntimeState; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper; -import org.apache.amoro.shade.guava32.com.google.common.annotations.VisibleForTesting; import org.apache.amoro.shade.guava32.com.google.common.base.MoreObjects; import org.apache.amoro.shade.guava32.com.google.common.base.Objects; import org.apache.amoro.shade.guava32.com.google.common.collect.Lists; @@ -217,7 +216,7 @@ public TableRuntime getRuntime(Long tableId) { return tableRuntimeMap.get(tableId); } - @VisibleForTesting + @Override public void setRuntime(DefaultTableRuntime tableRuntime) { checkStarted(); tableRuntimeMap.put(tableRuntime.getTableIdentifier().getId(), tableRuntime); @@ -245,8 +244,8 @@ public void dispose() { } } - @VisibleForTesting - void exploreTableRuntimes() { + @Override + public void exploreTableRuntimes() { if (!initialized.isDone()) { throw new IllegalStateException("TableService is not initialized"); } @@ -289,7 +288,7 @@ void exploreTableRuntimes() { LOG.info("Syncing external catalogs took {} ms.", end - start); } - @VisibleForTesting + @Override public void exploreExternalCatalog(ExternalCatalog externalCatalog) { final List>> tableIdentifiersFutures = Lists.newArrayList(); @@ -508,7 +507,7 @@ private void revertTableRuntimeAdded( } } - @VisibleForTesting + @Override public void disposeTable(ServerTableIdentifier tableIdentifier) { TableRuntime existedTableRuntime = tableRuntimeMap.get(tableIdentifier.getId()); try { diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/MasterSlaveTableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/MasterSlaveTableService.java new file mode 100644 index 0000000000..bc27c52fd5 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/MasterSlaveTableService.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import org.apache.amoro.AmoroTable; +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.config.Configurations; +import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.server.catalog.CatalogManager; +import org.apache.amoro.server.catalog.ExternalCatalog; +import org.apache.amoro.server.catalog.InternalCatalog; +import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.amoro.server.persistence.PersistentBase; + +public class MasterSlaveTableService extends PersistentBase implements TableService { + private final Configurations serverConfiguration; + private final CatalogManager catalogManager; + + public MasterSlaveTableService(Configurations configuration, CatalogManager catalogManager) { + this.serverConfiguration = configuration; + this.catalogManager = catalogManager; + } + + @Override + public void initialize() {} + + @Override + public void dispose() {} + + @Override + public void onTableCreated(InternalCatalog catalog, ServerTableIdentifier identifier) {} + + @Override + public void onTableDropped(InternalCatalog catalog, ServerTableIdentifier identifier) {} + + @Override + public TableRuntime getRuntime(Long tableId) { + return null; + } + + @Override + public AmoroTable loadTable(ServerTableIdentifier identifier) { + return null; + } + + @Override + public void exploreTableRuntimes() {} + + @Override + public void exploreExternalCatalog(ExternalCatalog externalCatalog) {} + + @Override + public void setRuntime(DefaultTableRuntime tableRuntime) {} + + @Override + public void disposeTable(ServerTableIdentifier tableIdentifier) {} + + @Override + public void addHandlerChain(RuntimeHandlerChain handler) {} + + @Override + public void handleTableChanged( + DefaultTableRuntime tableRuntime, OptimizingStatus originalStatus) {} + + @Override + public void handleTableChanged( + DefaultTableRuntime tableRuntime, TableConfiguration originalConfig) {} +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java index ae711639f5..617791fc73 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableService.java @@ -21,6 +21,7 @@ import org.apache.amoro.AmoroTable; import org.apache.amoro.ServerTableIdentifier; import org.apache.amoro.TableRuntime; +import org.apache.amoro.server.catalog.ExternalCatalog; import org.apache.amoro.server.catalog.InternalCatalog; public interface TableService extends TableRuntimeHandler { @@ -46,4 +47,16 @@ default boolean contains(Long tableId) { * @return managed table. */ AmoroTable loadTable(ServerTableIdentifier identifier); + + /** Explore and synchronize table runtimes from catalogs. Intended for periodic sync and tests. */ + void exploreTableRuntimes(); + + /** Explore and synchronize a specific external catalog. */ + void exploreExternalCatalog(ExternalCatalog externalCatalog); + + /** Set or replace a runtime for testing or recovery scenarios. */ + void setRuntime(DefaultTableRuntime tableRuntime); + + /** Dispose a managed table and its runtime. */ + void disposeTable(ServerTableIdentifier tableIdentifier); } diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableServiceLoader.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableServiceLoader.java new file mode 100644 index 0000000000..2907937d2e --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableServiceLoader.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import org.apache.amoro.config.Configurations; +import org.apache.amoro.server.AmoroManagementConf; +import org.apache.amoro.server.catalog.CatalogManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Constructor; +import java.util.ServiceLoader; + +public final class TableServiceLoader { + + private static final Logger LOG = LoggerFactory.getLogger(TableServiceLoader.class); + + private TableServiceLoader() {} + + public static TableService load(Configurations conf, CatalogManager catalogManager) { + String impl = conf.getString(AmoroManagementConf.TABLE_SERVICE_IMPL); + + // 1) Try named providers via ServiceLoader + ServiceLoader loader = ServiceLoader.load(TableServiceProvider.class); + for (TableServiceProvider provider : loader) { + try { + if (provider.name().equalsIgnoreCase(impl)) { + LOG.info("Loading TableService from provider name: {} -> {}", impl, provider.getClass()); + return provider.create(conf, catalogManager); + } + } catch (Throwable t) { + LOG.warn("Failed to create TableService from provider {}", provider.getClass(), t); + } + } + + // 2) Try FQCN + try { + Class clazz = Class.forName(impl); + if (!TableService.class.isAssignableFrom(clazz)) { + LOG.warn("Configured class {} does not implement TableService, fallback to default.", impl); + } else { + try { + Constructor constructor = + clazz.getConstructor(Configurations.class, CatalogManager.class); + LOG.info("Loading TableService from class: {}", impl); + return (TableService) constructor.newInstance(conf, catalogManager); + } catch (NoSuchMethodException nsme) { + LOG.warn( + "No (Configurations, CatalogManager) constructor for {}, fallback to default.", impl); + } + } + } catch (ClassNotFoundException cnfe) { + LOG.info("Configured TableService impl not found as class: {}. Will fallback.", impl); + } catch (Throwable t) { + LOG.warn("Failed to instantiate TableService impl: {}. Will fallback.", impl, t); + } + + // 3) Fallback to 'default' provider + for (TableServiceProvider provider : loader) { + if ("default".equalsIgnoreCase(provider.name())) { + LOG.info("Falling back to default TableService provider: {}", provider.getClass()); + return provider.create(conf, catalogManager); + } + } + + // 4) Last resort: try DefaultTableService directly (avoid circular deps by FQCN) + LOG.info("Falling back to DefaultTableService directly."); + return new org.apache.amoro.server.table.DefaultTableService(conf, catalogManager); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/TableServiceProvider.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableServiceProvider.java new file mode 100644 index 0000000000..b73730964f --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/TableServiceProvider.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table; + +import org.apache.amoro.config.Configurations; +import org.apache.amoro.server.catalog.CatalogManager; + +/** + * SPI provider for {@link TableService}. Implementations should be registered via + * META-INF/services/org.apache.amoro.server.table.TableServiceProvider + */ +public interface TableServiceProvider { + + /** Provider name to select by configuration, e.g., "default". */ + String name(); + + /** Create a {@link TableService} instance. */ + TableService create(Configurations configuration, CatalogManager catalogManager); +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/spi/DefaultTableServiceProvider.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/spi/DefaultTableServiceProvider.java new file mode 100644 index 0000000000..4ecbcf8029 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/spi/DefaultTableServiceProvider.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table.spi; + +import org.apache.amoro.config.Configurations; +import org.apache.amoro.server.catalog.CatalogManager; +import org.apache.amoro.server.table.DefaultTableService; +import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.TableServiceProvider; + +public class DefaultTableServiceProvider implements TableServiceProvider { + + @Override + public String name() { + return "default"; + } + + @Override + public TableService create(Configurations configuration, CatalogManager catalogManager) { + return new DefaultTableService(configuration, catalogManager); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/spi/MasterSlaveTableServiceProvider.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/spi/MasterSlaveTableServiceProvider.java new file mode 100644 index 0000000000..de6619397e --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/spi/MasterSlaveTableServiceProvider.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table.spi; + +import org.apache.amoro.config.Configurations; +import org.apache.amoro.server.catalog.CatalogManager; +import org.apache.amoro.server.table.MasterSlaveTableService; +import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.TableServiceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** It can be used by configuring the item "table-service.impl=master-slave" */ +public class MasterSlaveTableServiceProvider implements TableServiceProvider { + + private static final Logger LOG = LoggerFactory.getLogger(MasterSlaveTableServiceProvider.class); + + @Override + public String name() { + return "master-slave"; + } + + @Override + public TableService create(Configurations configuration, CatalogManager catalogManager) { + LOG.info("Creating master-slave TableService implementation"); + return new MasterSlaveTableService(configuration, catalogManager); + } +} diff --git a/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.table.TableServiceProvider b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.table.TableServiceProvider new file mode 100644 index 0000000000..0ab8e2498b --- /dev/null +++ b/amoro-ams/src/main/resources/META-INF/services/org.apache.amoro.server.table.TableServiceProvider @@ -0,0 +1,20 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.amoro.server.table.spi.DefaultTableServiceProvider +org.apache.amoro.server.table.spi.MasterSlaveTableServiceProvider \ No newline at end of file diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java index 15f7009b0c..04174ec96e 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/AMSServiceTestBase.java @@ -23,7 +23,8 @@ import org.apache.amoro.server.manager.EventsManager; import org.apache.amoro.server.manager.MetricManager; import org.apache.amoro.server.table.DefaultTableRuntime; -import org.apache.amoro.server.table.DefaultTableService; +import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.TableServiceLoader; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -31,7 +32,7 @@ import java.time.Duration; public abstract class AMSServiceTestBase extends AMSManagerTestBase { - private static DefaultTableService TABLE_SERVICE = null; + private static TableService TABLE_SERVICE = null; private static DefaultOptimizingService OPTIMIZING_SERVICE = null; @BeforeClass @@ -41,7 +42,7 @@ public static void initTableService() { configurations.set(AmoroManagementConf.OPTIMIZER_HB_TIMEOUT, Duration.ofMillis(800L)); configurations.set( AmoroManagementConf.OPTIMIZER_TASK_EXECUTE_TIMEOUT, Duration.ofMillis(30000L)); - TABLE_SERVICE = new DefaultTableService(new Configurations(), CATALOG_MANAGER); + TABLE_SERVICE = TableServiceLoader.load(configurations, CATALOG_MANAGER); OPTIMIZING_SERVICE = new DefaultOptimizingService( configurations, CATALOG_MANAGER, OPTIMIZER_MANAGER, TABLE_SERVICE); @@ -65,7 +66,7 @@ public static void disposeTableService() { EventsManager.dispose(); } - protected DefaultTableService tableService() { + protected TableService tableService() { return TABLE_SERVICE; } diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java index 71b984e0a9..b869cd18f3 100644 --- a/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java +++ b/amoro-ams/src/test/java/org/apache/amoro/server/table/TestDefaultTableRuntimeHandler.java @@ -126,7 +126,7 @@ protected DefaultTableService tableService() { if (tableService != null) { return tableService; } else { - return super.tableService(); + return (DefaultTableService) super.tableService(); } } diff --git a/dist/src/main/amoro-bin/conf/config.yaml b/dist/src/main/amoro-bin/conf/config.yaml index f628433b7e..5736a700e2 100644 --- a/dist/src/main/amoro-bin/conf/config.yaml +++ b/dist/src/main/amoro-bin/conf/config.yaml @@ -20,6 +20,8 @@ ams: admin-password: admin server-bind-host: "0.0.0.0" server-expose-host: "127.0.0.1" + table-service: + impl: default thrift-server: max-message-size: 100MB # 104857600