diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java index 49eb9f5db5..537f02e36d 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java @@ -31,6 +31,7 @@ import org.apache.amoro.server.persistence.mapper.PlatformFileMapper; import org.apache.amoro.server.persistence.mapper.ResourceMapper; import org.apache.amoro.server.persistence.mapper.TableBlockerMapper; +import org.apache.amoro.server.persistence.mapper.TableCleanupMapper; import org.apache.amoro.server.persistence.mapper.TableMetaMapper; import org.apache.amoro.server.persistence.mapper.TableProcessMapper; import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper; @@ -74,6 +75,7 @@ public void init(DataSource dataSource) throws SQLException { configuration.addMapper(TableBlockerMapper.class); configuration.addMapper(TableProcessMapper.class); configuration.addMapper(TableRuntimeMapper.class); + configuration.addMapper(TableCleanupMapper.class); PageInterceptor interceptor = new PageInterceptor(); Properties interceptorProperties = new Properties(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/CleanupOperationConverter.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/CleanupOperationConverter.java new file mode 100644 index 0000000000..6156266d88 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/converter/CleanupOperationConverter.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence.converter; + +import org.apache.amoro.server.table.cleanup.CleanupOperation; +import org.apache.ibatis.type.BaseTypeHandler; +import org.apache.ibatis.type.JdbcType; +import org.apache.ibatis.type.MappedJdbcTypes; +import org.apache.ibatis.type.MappedTypes; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.CallableStatement; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +/** + * MyBatis type handler for converting between CleanupOperation enum and database integer values. + * This handler maps CleanupOperation enum values to their corresponding integer codes for + * persistence, and converts integer codes back to enum values when reading from the database. + */ +@MappedJdbcTypes(JdbcType.INTEGER) +@MappedTypes(CleanupOperation.class) +public class CleanupOperationConverter extends BaseTypeHandler { + private static final Logger LOG = LoggerFactory.getLogger(CleanupOperationConverter.class); + + @Override + public void setNonNullParameter( + PreparedStatement ps, int i, CleanupOperation parameter, JdbcType jdbcType) + throws SQLException { + ps.setInt(i, parameter.getCode()); + } + + @Override + public CleanupOperation getNullableResult(ResultSet rs, String columnName) throws SQLException { + return convertFromString(rs.getString(columnName)); + } + + @Override + public CleanupOperation getNullableResult(ResultSet rs, int columnIndex) throws SQLException { + return convertFromString(rs.getString(columnIndex)); + } + + @Override + public CleanupOperation getNullableResult(CallableStatement cs, int columnIndex) + throws SQLException { + return convertFromString(cs.getString(columnIndex)); + } + + /** + * Converts a string value to a CleanupOperation enum value. Handles null values and parsing + * errors gracefully. + * + * @param s The string value from the database + * @return The corresponding CleanupOperation enum value, or null if conversion fails + */ + private CleanupOperation convertFromString(String s) { + if (s == null) { + return null; + } + try { + return CleanupOperation.fromCode(Integer.parseInt(s)); + } catch (NumberFormatException e) { + LOG.warn("Failed to parse CleanupOperation from string: {}", s, e); + return null; + } + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableCleanupMapper.java b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableCleanupMapper.java new file mode 100644 index 0000000000..24a0945aea --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/persistence/mapper/TableCleanupMapper.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.persistence.mapper; + +import org.apache.amoro.server.persistence.converter.Long2TsConverter; +import org.apache.amoro.server.table.cleanup.CleanupOperation; +import org.apache.amoro.server.table.cleanup.TableCleanupProcessMeta; +import org.apache.ibatis.annotations.Delete; +import org.apache.ibatis.annotations.Insert; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Result; +import org.apache.ibatis.annotations.Results; +import org.apache.ibatis.annotations.Select; +import org.apache.ibatis.annotations.Update; + +import java.util.List; + +public interface TableCleanupMapper { + String TABLE_NAME = "table_cleanup_process"; + + /** Insert a new table cleanup process record */ + @Insert( + "INSERT INTO " + + TABLE_NAME + + " (cleanup_process_id, table_id, catalog_name, db_name, table_name, cleanup_operation_code, last_cleanup_end_time) " + + "VALUES (#{cleanupProcessMeta.cleanupProcessId}," + + " #{cleanupProcessMeta.tableId}, #{cleanupProcessMeta.catalogName}, " + + "#{cleanupProcessMeta.dbName}, #{cleanupProcessMeta.tableName}," + + "#{cleanupProcessMeta.cleanupOperation," + + " typeHandler=org.apache.amoro.server.persistence.converter.CleanupOperationConverter}," + + "#{cleanupProcessMeta.lastCleanupEndTime," + + " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter})") + void insertTableCleanupProcess( + @Param("cleanupProcessMeta") TableCleanupProcessMeta cleanupProcessMeta); + + /** Update lastCleanupEndTime field by table_id and cleanup_operation_code */ + @Update( + "UPDATE " + + TABLE_NAME + + " SET last_cleanup_end_time = #{lastCleanupEndTime, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter} " + + "WHERE table_id = #{tableId} AND cleanup_operation_code = #{cleanupOperation," + + " typeHandler=org.apache.amoro.server.persistence.converter.CleanupOperationConverter}") + int updateLastCleanupEndTimeByTableIdAndCleanupOperation( + @Param("tableId") long tableId, + @Param("cleanupOperation") CleanupOperation cleanupOperation, + @Param("lastCleanupEndTime") long lastCleanupEndTime); + + /** + * Select table_id and last_cleanup_end_time by cleanup_operation_code and table_ids If + * last_cleanup_end_time is NULL in database, it will be converted to 0L as default value + */ + @Select( + "") + @Results({ + @Result(property = "tableId", column = "table_id"), + @Result( + property = "lastCleanupEndTime", + column = "last_cleanup_end_time", + typeHandler = Long2TsConverter.class) + }) + List selectTableIdAndLastCleanupEndTime( + @Param("tableIds") List tableIds, + @Param("cleanupOperation") CleanupOperation cleanupOperation); + + /** Delete all cleanup process records for a specific table */ + @Delete("DELETE FROM " + TABLE_NAME + " WHERE table_id = #{tableId}") + void deleteTableCleanupProcesses(@Param("tableId") long tableId); +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java index f08b5110ce..d1f00cef3f 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/PeriodicTableScheduler.java @@ -27,6 +27,10 @@ import org.apache.amoro.server.optimizing.OptimizingStatus; import org.apache.amoro.server.table.RuntimeHandlerChain; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.cleanup.CleanupOperation; +import org.apache.amoro.server.table.cleanup.CleanupProcessPersistence; +import org.apache.amoro.server.table.cleanup.TableCleanupProcessMeta; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; import org.apache.amoro.shade.guava32.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -36,16 +40,19 @@ import java.util.HashSet; import java.util.List; import java.util.Locale; +import java.util.Map; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; public abstract class PeriodicTableScheduler extends RuntimeHandlerChain { protected final Logger logger = LoggerFactory.getLogger(getClass()); private static final long START_DELAY = 10 * 1000L; + private static final int BATCH_SIZE = 1000; protected final Set scheduledTables = Collections.synchronizedSet(new HashSet<>()); @@ -53,6 +60,10 @@ public abstract class PeriodicTableScheduler extends RuntimeHandlerChain { private final ScheduledExecutorService executor; private final TableService tableService; + private final Map tableIdWithLastCleanTimeCache = + Collections.synchronizedMap(Maps.newHashMap()); + private final CleanupProcessPersistence persistence; + protected PeriodicTableScheduler(Action action, TableService tableService, int poolSize) { this.action = action; this.tableService = tableService; @@ -63,6 +74,7 @@ protected PeriodicTableScheduler(Action action, TableService tableService, int p .setDaemon(false) .setNameFormat("async-" + getThreadName() + "-%d") .build()); + this.persistence = new CleanupProcessPersistence(); } protected PeriodicTableScheduler(TableService tableService, int poolSize) { @@ -75,27 +87,79 @@ protected PeriodicTableScheduler(TableService tableService, int poolSize) { .setDaemon(false) .setNameFormat("async-" + getThreadName() + "-%d") .build()); + this.persistence = new CleanupProcessPersistence(); } @Override protected void initHandler(List tableRuntimeList) { - tableRuntimeList.stream() - .filter(this::enabled) - .forEach( - tableRuntime -> { - if (scheduledTables.add(tableRuntime.getTableIdentifier())) { - executor.schedule( - () -> executeTask(tableRuntime), getStartDelay(), TimeUnit.MILLISECONDS); - } - }); + // 1、Filter enabled tables + List enabledTables = + tableRuntimeList.stream().filter(this::enabled).collect(Collectors.toList()); + if (enabledTables.isEmpty()) { + logger.info( + "No enabled tables found for executor {}, skipping initialization", + getClass().getSimpleName()); + return; + } + + // 2、For specific cleanup executors, query existing cleanup records at startup + CleanupOperation cleanupOperation = getCleanupOperation(); + if (cleanupOperation != CleanupOperation.NONE) { + try { + batchQueryExistingCleanupInfo(enabledTables, cleanupOperation); + } catch (Exception e) { + logger.warn("Failed to query table cleanup info for operation: {}", cleanupOperation, e); + } + } + + enabledTables.forEach( + tableRuntime -> { + if (scheduledTables.add(tableRuntime.getTableIdentifier())) { + scheduleTableExecution( + tableRuntime, calculateExecutionDelay(tableRuntime, getCleanupOperation())); + } + }); logger.info("Table executor {} initialized", getClass().getSimpleName()); } + private long calculateExecutionDelay( + TableRuntime tableRuntime, CleanupOperation cleanupOperation) { + // If the table needs to be executed immediately, schedule it to run after a short delay. + if (shouldExecuteTask(tableRuntime, cleanupOperation)) { + return getStartDelay(); + } + + // If the table does not need to be executed immediately, schedule it for the next execution + // time. + // Adding getStartDelay() helps distribute the execution time of multiple tables, + // reducing the probability of simultaneous execution and system load spikes. + return getNextExecutingTime(tableRuntime) + getStartDelay(); + } + + /** + * Schedule a table for execution with the specified delay. + * + * @param tableRuntime The table runtime to schedule + * @param delay The delay in milliseconds before execution + */ + private void scheduleTableExecution(TableRuntime tableRuntime, long delay) { + executor.schedule(() -> executeTask(tableRuntime), delay, TimeUnit.MILLISECONDS); + logger.debug( + "Scheduled execution for table {} with delay {} ms", + tableRuntime.getTableIdentifier(), + delay); + } + private void executeTask(TableRuntime tableRuntime) { try { if (isExecutable(tableRuntime)) { execute(tableRuntime); + + // Different tables take different amounts of time to execute the end of execute(), + // so you need to perform the save or update lastCleanEndTime operation separately for each + // table. + persistUpdatingCleanupTime(tableRuntime); } } finally { scheduledTables.remove(tableRuntime.getTableIdentifier()); @@ -117,12 +181,192 @@ protected final void scheduleIfNecessary(TableRuntime tableRuntime, long millise protected abstract void execute(TableRuntime tableRuntime); + protected boolean shouldExecute(Long lastCleanupEndTime) { + return true; + } + + private void persistUpdatingCleanupTime(TableRuntime tableRuntime) { + CleanupOperation cleanupOperation = getCleanupOperation(); + if (cleanupOperation == CleanupOperation.NONE) { + logger.debug( + "No cleanup operation found for table {}, skipping cleanup time update", + tableRuntime.getTableIdentifier().getTableName()); + return; + } + + try { + long currentTime = System.currentTimeMillis(); + TableCleanupProcessMeta meta = + new TableCleanupProcessMeta( + currentTime, + tableRuntime.getTableIdentifier().getId(), + tableRuntime.getTableIdentifier().getCatalog(), + tableRuntime.getTableIdentifier().getDatabase(), + tableRuntime.getTableIdentifier().getTableName(), + cleanupOperation, + currentTime); + persistence.upsertLastCleanupEndTimeByTableIdAndCleanupOperation(meta); + + logger.debug( + "Upserted lastCleanupEndTime for table {} with cleanupOperation {}", + tableRuntime.getTableIdentifier().getTableName(), + cleanupOperation); + } catch (Exception e) { + logger.error( + "Failed to upsert cleanup end time for table {}", + tableRuntime.getTableIdentifier().getTableName(), + e); + } + } + + private void persistDeletingCleanupRecords(TableRuntime tableRuntime) { + try { + persistence.deleteTableCleanupProcesses(tableRuntime.getTableIdentifier().getId()); + + logger.debug( + "Deleted cleanup records for table {}", tableRuntime.getTableIdentifier().getTableName()); + } catch (Exception e) { + logger.error( + "Failed to delete cleanup records for table {}", + tableRuntime.getTableIdentifier().getTableName(), + e); + } + } + + /** + * Get cleanup operation. Default is NONE, subclasses should override this method to provide + * specific operation. + * + * @return cleanup operation + */ + protected CleanupOperation getCleanupOperation() { + return CleanupOperation.NONE; + } + + protected boolean shouldExecuteTask( + TableRuntime tableRuntime, CleanupOperation cleanupOperation) { + if (cleanupOperation == CleanupOperation.NONE) { + logger.debug( + "No cleanup operation specified, executing task for table {}", + tableRuntime.getTableIdentifier().getTableName()); + return true; + } + + // If there's no previous cleanup time, or it's zero, execute the task + Long lastCleanupEndTime = + tableIdWithLastCleanTimeCache.remove(tableRuntime.getTableIdentifier().getId()); + if (lastCleanupEndTime == null || lastCleanupEndTime == 0L) { + logger.debug( + "No previous cleanup time found for table {}, executing task", + tableRuntime.getTableIdentifier().getTableName()); + return true; + } + + // After ams restarts, certain cleanup operations can only be re-executed + // if sufficient time has elapsed since the last cleanup. + boolean result = shouldExecute(lastCleanupEndTime); + logger.debug( + result + ? "Should execute task for table {}" + : "Not enough time has passed since last cleanup for table {}, delaying execution", + tableRuntime.getTableIdentifier().getTableName()); + + return result; + } + + /** Batch query cleanup records from table table_cleanup_process */ + private void batchQueryExistingCleanupInfo( + List tableRuntimes, CleanupOperation cleanupOperation) { + List tableIds = + tableRuntimes.stream() + .map(tableRuntime -> tableRuntime.getTableIdentifier().getId()) + .collect(Collectors.toList()); + if (tableIds.isEmpty()) { + return; + } + + // 1、Find existing cleanup records for the given table IDs + Map existingTableIdWithCleanTime = + findExistingCleanupRecords(tableIds, cleanupOperation); + + // 2、Cache the tableId with the last cleanup time to determine later + // whether the table requires cleanup operations. + tableIdWithLastCleanTimeCache.putAll(existingTableIdWithCleanTime); + } + + /** Find existing cleanup records for the given table IDs */ + private Map findExistingCleanupRecords( + List tableIds, CleanupOperation cleanupOperation) { + Map existingTableIdWithCleanTime = Maps.newHashMap(); + + try { + // Batch query + processInBatches( + tableIds, + BATCH_SIZE, + (batchTableIds, batchIndex, totalBatches) -> { + Map batchResult = + persistence.selectTableCleanupProcess(batchTableIds, cleanupOperation); + existingTableIdWithCleanTime.putAll(batchResult); + + logger.debug( + "Queried batch {}/{} found {} existing records", + batchIndex + 1, + totalBatches, + batchResult.size()); + }); + + logger.debug( + "Found {} existing cleanup process records out of {} tables", + existingTableIdWithCleanTime.size(), + tableIds.size()); + } catch (Exception e) { + logger.warn( + "Failed to batch check existing cleanup process records for {} tables", + tableIds.size(), + e); + } + + return existingTableIdWithCleanTime; + } + + interface BatchProcessor { + void process(List batch, int batchIndex, int totalBatches) throws Exception; + } + + /** + * Process items in batches to avoid memory overflow and improve performance + * + * @param the type of items to process + * @param items the list of items to process + * @param batchSize the maximum number of items to process in each batch + * @param processor the processor to handle each batch + */ + private void processInBatches(List items, int batchSize, BatchProcessor processor) + throws Exception { + final int size = items.size(); + if (size <= batchSize) { + // If the number of items does not exceed the batch size, process directly + processor.process(items, 0, 1); + } else { + // If the number of items exceeds the batch size, process in batches + int totalBatches = (size + batchSize - 1) / batchSize; + logger.debug("Splitting {} items into batches of size {}", size, batchSize); + + for (int i = 0; i < size; i += batchSize) { + int endIndex = Math.min(i + batchSize, size); + List batch = items.subList(i, endIndex); + processor.process(batch, i / batchSize, totalBatches); + } + } + } + protected String getThreadName() { return String.join("-", StringUtils.splitByCharacterTypeCamelCase(getClass().getSimpleName())) .toLowerCase(Locale.ROOT); } - private boolean isExecutable(TableRuntime tableRuntime) { + protected boolean isExecutable(TableRuntime tableRuntime) { return tableService.contains(tableRuntime.getTableIdentifier().getId()) && enabled(tableRuntime); } @@ -135,6 +379,9 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or @Override public void handleTableRemoved(TableRuntime tableRuntime) { // DO nothing, handling would be canceled when calling executeTable + // when the table is removed, we need to delete the corresponding cleanup records + // in table table_cleanup_process + persistDeletingCleanupRecords(tableRuntime); } @Override diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java index 16f80c9c0f..d7d8801f1a 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DanglingDeleteFilesCleaningExecutor.java @@ -26,6 +26,7 @@ import org.apache.amoro.server.scheduler.PeriodicTableScheduler; import org.apache.amoro.server.table.DefaultTableRuntime; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.cleanup.CleanupOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,16 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) { return INTERVAL; } + @Override + protected boolean shouldExecute(Long lastCleanupEndTime) { + return System.currentTimeMillis() - lastCleanupEndTime >= INTERVAL; + } + + @Override + protected CleanupOperation getCleanupOperation() { + return CleanupOperation.DANGLING_DELETE_FILES_CLEANING; + } + @Override protected boolean enabled(TableRuntime tableRuntime) { return tableRuntime instanceof DefaultTableRuntime diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java index 4990b74093..61f45860b9 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/DataExpiringExecutor.java @@ -25,6 +25,7 @@ import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.cleanup.CleanupOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +48,16 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) { return interval.toMillis(); } + @Override + protected boolean shouldExecute(Long lastCleanupEndTime) { + return System.currentTimeMillis() - lastCleanupEndTime >= interval.toMillis(); + } + + @Override + protected CleanupOperation getCleanupOperation() { + return CleanupOperation.DATA_EXPIRING; + } + @Override protected boolean enabled(TableRuntime tableRuntime) { return tableRuntime.getTableConfiguration().getExpiringDataConfig().isEnabled(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java index 332c511411..21d60cd105 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/OrphanFilesCleaningExecutor.java @@ -25,6 +25,7 @@ import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.cleanup.CleanupOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,6 +46,16 @@ protected long getNextExecutingTime(TableRuntime tableRuntime) { return interval.toMillis(); } + @Override + protected boolean shouldExecute(Long lastCleanupEndTime) { + return System.currentTimeMillis() - lastCleanupEndTime >= interval.toMillis(); + } + + @Override + protected CleanupOperation getCleanupOperation() { + return CleanupOperation.ORPHAN_FILES_CLEANING; + } + @Override protected boolean enabled(TableRuntime tableRuntime) { return tableRuntime.getTableConfiguration().isCleanOrphanEnabled(); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java index f7d0cb927f..15f2d49d9e 100644 --- a/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java +++ b/amoro-ams/src/main/java/org/apache/amoro/server/scheduler/inline/SnapshotsExpiringExecutor.java @@ -25,6 +25,7 @@ import org.apache.amoro.server.optimizing.maintainer.TableMaintainers; import org.apache.amoro.server.scheduler.PeriodicTableScheduler; import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.cleanup.CleanupOperation; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +56,16 @@ public void handleConfigChanged(TableRuntime tableRuntime, TableConfiguration or scheduleIfNecessary(tableRuntime, getStartDelay()); } + @Override + protected boolean shouldExecute(Long lastCleanupEndTime) { + return System.currentTimeMillis() - lastCleanupEndTime >= INTERVAL; + } + + @Override + protected CleanupOperation getCleanupOperation() { + return CleanupOperation.SNAPSHOTS_EXPIRING; + } + @Override protected long getExecutorDelay() { return ThreadLocalRandom.current().nextLong(INTERVAL); diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java new file mode 100644 index 0000000000..e3892ec8b5 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupOperation.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table.cleanup; + +/** + * Table cleanup operation enum. Defines different operation types for table cleanup tasks and their + * corresponding codes. Each operation type has a unique code, which is used to identify the + * operation type when persisting the cleanup process record in the table_cleanup_process table. + */ +public enum CleanupOperation { + DANGLING_DELETE_FILES_CLEANING(11), + ORPHAN_FILES_CLEANING(22), + DATA_EXPIRING(33), + SNAPSHOTS_EXPIRING(44), + // NONE(-1) indicates operation types where no cleanup process records are + // saved in the table_cleanup_process table. + NONE(-1); + + private final int code; + + CleanupOperation(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + public static CleanupOperation fromCode(int code) { + for (CleanupOperation op : values()) { + if (op.code == code) { + return op; + } + } + return NONE; + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupProcessPersistence.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupProcessPersistence.java new file mode 100644 index 0000000000..1dff718643 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/CleanupProcessPersistence.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table.cleanup; + +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.persistence.mapper.TableCleanupMapper; +import org.apache.amoro.shade.guava32.com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class CleanupProcessPersistence extends PersistentBase { + + private static final Logger LOG = LoggerFactory.getLogger(CleanupProcessPersistence.class); + + /** + * Upsert lastCleanupEndTime field by table_id and cleanup_operation If record exists, update it, + * otherwise insert a new record + */ + public void upsertLastCleanupEndTimeByTableIdAndCleanupOperation( + TableCleanupProcessMeta cleanupProcessMeta) { + doAsTransaction( + () -> + doAs( + TableCleanupMapper.class, + mapper -> { + // First try to update + int updatedRows = + mapper.updateLastCleanupEndTimeByTableIdAndCleanupOperation( + cleanupProcessMeta.getTableId(), + cleanupProcessMeta.getCleanupOperation(), + cleanupProcessMeta.getLastCleanupEndTime()); + + // If no rows were updated, insert a new record + if (updatedRows == 0) { + mapper.insertTableCleanupProcess(cleanupProcessMeta); + } + })); + } + + /** + * Check if cleanup process records exist for multiple table ids and one cleanupOperation, return + * the list of tableId and lastCleanupEndTime that have record + * + * @param tableIds List of table IDs to check + * @param cleanupOperation Operation to filter by + */ + public Map selectTableCleanupProcess( + List tableIds, CleanupOperation cleanupOperation) { + + List metas = + getAs( + TableCleanupMapper.class, + mapper -> mapper.selectTableIdAndLastCleanupEndTime(tableIds, cleanupOperation)); + if (metas.isEmpty()) { + LOG.debug( + "No cleanup process records found for {} tables with cleanupOperation: {}", + tableIds.size(), + cleanupOperation); + return Maps.newHashMap(); + } else { + LOG.debug( + "Found cleanup process records for {} out of {} tables with cleanupOperation: {}", + metas.size(), + tableIds.size(), + cleanupOperation); + return metas.stream() + .collect( + Collectors.toMap( + TableCleanupProcessMeta::getTableId, + TableCleanupProcessMeta::getLastCleanupEndTime, + (existing, replacement) -> replacement)); + } + } + + public void deleteTableCleanupProcesses(Long tableId) { + doAsTransaction( + () -> + doAs( + TableCleanupMapper.class, + mapper -> { + mapper.deleteTableCleanupProcesses(tableId); + })); + } +} diff --git a/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableCleanupProcessMeta.java b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableCleanupProcessMeta.java new file mode 100644 index 0000000000..f691053118 --- /dev/null +++ b/amoro-ams/src/main/java/org/apache/amoro/server/table/cleanup/TableCleanupProcessMeta.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.table.cleanup; + +/** + * The class for table used when transfer data from/to database. Maps to table_cleanup_process table + * in database. + */ +public class TableCleanupProcessMeta { + // Maps to cleanup_process_id column in database + private long cleanupProcessId; + // Maps to table_id column in database + private Long tableId; + // Maps to catalog_name column in database + private String catalogName; + // Maps to db_name column in database + private String dbName; + // Maps to table_name column in database + private String tableName; + // Maps to cleanup_operation_code column in database + private CleanupOperation cleanupOperation; + // Maps to last_cleanup_end_time column in database + private long lastCleanupEndTime; + + public TableCleanupProcessMeta() {} + + public TableCleanupProcessMeta( + long cleanupProcessId, + Long tableId, + String catalogName, + String dbName, + String tableName, + CleanupOperation cleanupOperation, + long lastCleanupEndTime) { + this.cleanupProcessId = cleanupProcessId; + this.tableId = tableId; + this.catalogName = catalogName; + this.dbName = dbName; + this.tableName = tableName; + this.cleanupOperation = cleanupOperation; + this.lastCleanupEndTime = lastCleanupEndTime; + } + + public TableCleanupProcessMeta( + long cleanupProcessId, + Long tableId, + String catalogName, + String dbName, + String tableName, + CleanupOperation cleanupOperation) { + this.cleanupProcessId = cleanupProcessId; + this.tableId = tableId; + this.catalogName = catalogName; + this.dbName = dbName; + this.tableName = tableName; + this.cleanupOperation = cleanupOperation; + } + + public TableCleanupProcessMeta(Long tableId, long lastCleanupEndTime) { + + this.tableId = tableId; + this.lastCleanupEndTime = lastCleanupEndTime; + } + + public long getCleanupProcessId() { + return cleanupProcessId; + } + + public void setCleanupProcessId(long cleanupProcessId) { + this.cleanupProcessId = cleanupProcessId; + } + + public Long getTableId() { + return tableId; + } + + public void setTableId(Long tableId) { + this.tableId = tableId; + } + + public String getCatalogName() { + return catalogName; + } + + public void setCatalogName(String catalogName) { + this.catalogName = catalogName; + } + + public String getDbName() { + return dbName; + } + + public void setDbName(String dbName) { + this.dbName = dbName; + } + + public String getTableName() { + return tableName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public CleanupOperation getCleanupOperation() { + return cleanupOperation; + } + + public void setCleanupOperation(CleanupOperation cleanupOperation) { + this.cleanupOperation = cleanupOperation; + } + + public long getLastCleanupEndTime() { + return lastCleanupEndTime; + } + + public void setLastCleanupEndTime(long lastCleanupEndTime) { + this.lastCleanupEndTime = lastCleanupEndTime; + } +} diff --git a/amoro-ams/src/main/resources/derby/ams-derby-init.sql b/amoro-ams/src/main/resources/derby/ams-derby-init.sql index 9cbd133e4c..60531495fe 100644 --- a/amoro-ams/src/main/resources/derby/ams-derby-init.sql +++ b/amoro-ams/src/main/resources/derby/ams-derby-init.sql @@ -127,6 +127,17 @@ CREATE TABLE table_runtime_state ( CREATE UNIQUE INDEX uniq_table_state_key ON table_runtime_state (table_id, state_key); +CREATE TABLE table_cleanup_process ( + cleanup_process_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + catalog_name VARCHAR(64) NOT NULL, + db_name VARCHAR(128) NOT NULL, + table_name VARCHAR(256) NOT NULL, + cleanup_operation_code INT NOT NULL, + last_cleanup_end_time TIMESTAMP, + PRIMARY KEY (table_id, cleanup_operation_code) +); + CREATE TABLE table_process ( process_id BIGINT NOT NULL PRIMARY KEY, table_id BIGINT NOT NULL, diff --git a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql index 3f4c9031e9..d45e6327e5 100644 --- a/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql +++ b/amoro-ams/src/main/resources/mysql/ams-mysql-init.sql @@ -135,6 +135,19 @@ CREATE TABLE `table_runtime_state` ( UNIQUE KEY `uniq_table_state_key` (`table_id`,`state_key`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='State of Table Runtimes'; +CREATE TABLE `table_cleanup_process` +( + `cleanup_process_id` bigint(20) NOT NULL COMMENT 'cleanup_process UUID', + `table_id` bigint(20) NOT NULL COMMENT 'Table identifier id', + `catalog_name` varchar(64) NOT NULL COMMENT 'Catalog name', + `db_name` varchar(128) NOT NULL COMMENT 'Database name', + `table_name` varchar(256) NOT NULL COMMENT 'Table name', + `cleanup_operation_code` int NOT NULL COMMENT 'Cleanup operation code:OrphanFilesCleaning(11),DanglingDeleteFilesCleaning(22),DataExpiring(33),SnapshotsExpiring(44)', + `last_cleanup_end_time` timestamp(3) NULL DEFAULT NULL COMMENT 'Last cleanup operation end time', + PRIMARY KEY (`table_id`, `cleanup_operation_code`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of table cleanup tasks'; + + CREATE TABLE `table_process` ( `process_id` bigint(20) NOT NULL COMMENT 'table process id', diff --git a/amoro-ams/src/main/resources/mysql/upgrade.sql b/amoro-ams/src/main/resources/mysql/upgrade.sql index 266f43045e..0c6938cdf7 100644 --- a/amoro-ams/src/main/resources/mysql/upgrade.sql +++ b/amoro-ams/src/main/resources/mysql/upgrade.sql @@ -154,3 +154,15 @@ SELECT FROM table_runtime_old; DROP TABLE IF EXISTS table_runtime_old; + +-- ADD Table table_cleanup_process +CREATE TABLE `table_cleanup_process` ( + `cleanup_process_id` bigint(20) NOT NULL COMMENT 'cleanup_process UUID', + `table_id` bigint(20) NOT NULL, + `catalog_name` varchar(64) NOT NULL COMMENT 'Catalog name', + `db_name` varchar(128) NOT NULL COMMENT 'Database name', + `table_name` varchar(256) NOT NULL COMMENT 'Table name', + `cleanup_operation_code` int NOT NULL COMMENT 'Cleanup operation code:OrphanFilesCleaning(11),DanglingDeleteFilesCleaning(22),DataExpiring(33),SnapshotsExpiring(44)', + `last_cleanup_end_time` timestamp(3) NULL DEFAULT NULL COMMENT 'Last cleanup operation end time', + PRIMARY KEY (`table_id`, `cleanup_operation_code`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT 'History of table cleanup tasks'; diff --git a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql index bc4375cd82..49462bec9e 100644 --- a/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql +++ b/amoro-ams/src/main/resources/postgres/ams-postgres-init.sql @@ -212,6 +212,24 @@ comment on column table_runtime_state.state_version is 'Table runtime state vers comment on column table_runtime_state.create_time is 'create time'; comment on column table_runtime_state.update_time is 'update time'; +CREATE TABLE table_cleanup_process ( + cleanup_process_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + catalog_name VARCHAR(64) NOT NULL, + db_name VARCHAR(128) NOT NULL, + table_name VARCHAR(256) NOT NULL, + cleanup_operation_code INTEGER NOT NULL, + last_cleanup_end_time TIMESTAMP(3) NULL DEFAULT NULL, + PRIMARY KEY (table_id, cleanup_operation_code) +); + +COMMENT ON TABLE table_cleanup_process IS 'History of table cleanup tasks'; +COMMENT ON COLUMN table_cleanup_process.cleanup_process_id IS 'cleanup_process UUID'; +COMMENT ON COLUMN table_cleanup_process.catalog_name IS 'Catalog name'; +COMMENT ON COLUMN table_cleanup_process.db_name IS 'Database name'; +COMMENT ON COLUMN table_cleanup_process.table_name IS 'Table name'; +COMMENT ON COLUMN table_cleanup_process.cleanup_operation_code IS 'Cleanup operation code:OrphanFilesCleaning(11),DanglingDeleteFilesCleaning(22),DataExpiring(33),SnapshotsExpiring(44)'; +COMMENT ON COLUMN table_cleanup_process.last_cleanup_end_time IS 'Last cleanup operation end time'; CREATE TABLE table_process ( process_id bigserial PRIMARY KEY, diff --git a/amoro-ams/src/main/resources/postgres/upgrade.sql b/amoro-ams/src/main/resources/postgres/upgrade.sql index 401b62b2f9..49c252c3e3 100644 --- a/amoro-ams/src/main/resources/postgres/upgrade.sql +++ b/amoro-ams/src/main/resources/postgres/upgrade.sql @@ -213,5 +213,25 @@ FROM table_runtime_old; DROP TABLE IF EXISTS table_runtime_old; +-- ADD table table_cleanup_process +CREATE TABLE table_cleanup_process ( + cleanup_process_id BIGINT NOT NULL, + table_id BIGINT NOT NULL, + catalog_name VARCHAR(64) NOT NULL, + db_name VARCHAR(128) NOT NULL, + table_name VARCHAR(256) NOT NULL, + cleanup_operation_code INTEGER NOT NULL, + last_cleanup_end_time TIMESTAMP(3) NULL DEFAULT NULL, + PRIMARY KEY (table_id, cleanup_operation_code) +); + +COMMENT ON TABLE table_cleanup_process IS 'History of table cleanup tasks'; +COMMENT ON COLUMN table_cleanup_process.cleanup_process_id IS 'cleanup_process UUID'; +COMMENT ON COLUMN table_cleanup_process.catalog_name IS 'Catalog name'; +COMMENT ON COLUMN table_cleanup_process.db_name IS 'Database name'; +COMMENT ON COLUMN table_cleanup_process.table_name IS 'Table name'; +COMMENT ON COLUMN table_cleanup_process.cleanup_operation_code IS 'Cleanup operation code:OrphanFilesCleaning(11),DanglingDeleteFilesCleaning(22),DataExpiring(33),SnapshotsExpiring(44)'; +COMMENT ON COLUMN table_cleanup_process.last_cleanup_end_time IS 'Last cleanup operation end time'; + -- ADD bucket_id to table_runtime ALTER TABLE table_runtime ADD COLUMN bucket_id varchar(4); \ No newline at end of file diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/cleanup/TestCleanupOperation.java b/amoro-ams/src/test/java/org/apache/amoro/server/cleanup/TestCleanupOperation.java new file mode 100644 index 0000000000..6e747e6887 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/cleanup/TestCleanupOperation.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.cleanup; + +import org.apache.amoro.server.table.cleanup.CleanupOperation; +import org.junit.Assert; +import org.junit.Test; + +public class TestCleanupOperation { + @Test + public void testCleanupOperationFromCode() { + // Test that all cleanup operations can be retrieved by their codes + Assert.assertEquals( + CleanupOperation.DANGLING_DELETE_FILES_CLEANING, CleanupOperation.fromCode(11)); + Assert.assertEquals(CleanupOperation.ORPHAN_FILES_CLEANING, CleanupOperation.fromCode(22)); + Assert.assertEquals(CleanupOperation.DATA_EXPIRING, CleanupOperation.fromCode(33)); + Assert.assertEquals(CleanupOperation.SNAPSHOTS_EXPIRING, CleanupOperation.fromCode(44)); + Assert.assertEquals(CleanupOperation.NONE, CleanupOperation.fromCode(-1)); + + // Test that unknown codes return NONE + Assert.assertEquals(CleanupOperation.NONE, CleanupOperation.fromCode(999)); + } +} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java new file mode 100644 index 0000000000..5e31ec3d66 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/PeriodicTableSchedulerTestBase.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.scheduler.inline; + +import org.apache.amoro.TableRuntime; +import org.apache.amoro.server.scheduler.PeriodicTableScheduler; +import org.apache.amoro.server.table.TableService; +import org.apache.amoro.server.table.cleanup.CleanupOperation; + +import java.lang.reflect.Method; +import java.util.List; +import java.util.Map; + +/** + * Test table executor implementation for testing PeriodicTableScheduler functionality. This class + * allows configuration of cleanup operations and enabled state for testing purposes. It provides + * public methods to access protected/private functionality of PeriodicTableScheduler for + * comprehensive test coverage. + */ +class PeriodicTableSchedulerTestBase extends PeriodicTableScheduler { + private final CleanupOperation cleanupOperation; + private final boolean enabled; + private final TableService tableService; + private static final long SNAPSHOTS_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour + private static final long ORPHAN_FILES_CLEANING_INTERVAL = 24 * 60 * 60 * 1000L; // 1 day + private static final long DANGLING_DELETE_FILES_CLEANING_INTERVAL = 24 * 60 * 60 * 1000L; + private static final long DATA_EXPIRING_INTERVAL = 60 * 60 * 1000L; // 1 hour + + public PeriodicTableSchedulerTestBase( + TableService tableService, CleanupOperation cleanupOperation, boolean enabled) { + super(tableService, 1); + this.tableService = tableService; + this.cleanupOperation = cleanupOperation; + this.enabled = enabled; + } + + @Override + protected CleanupOperation getCleanupOperation() { + return cleanupOperation; + } + + @Override + protected long getNextExecutingTime(TableRuntime tableRuntime) { + return 1000; + } + + @Override + protected boolean enabled(TableRuntime tableRuntime) { + return enabled; + } + + @Override + protected void execute(TableRuntime tableRuntime) { + // Do nothing in test + } + + @Override + protected long getExecutorDelay() { + return 0; + } + + @Override + protected boolean shouldExecute(Long lastCleanupEndTime) { + long currentTime = System.currentTimeMillis(); + switch (cleanupOperation) { + case SNAPSHOTS_EXPIRING: + return currentTime - lastCleanupEndTime >= SNAPSHOTS_EXPIRING_INTERVAL; + case ORPHAN_FILES_CLEANING: + return currentTime - lastCleanupEndTime >= ORPHAN_FILES_CLEANING_INTERVAL; + case DANGLING_DELETE_FILES_CLEANING: + return currentTime - lastCleanupEndTime >= DANGLING_DELETE_FILES_CLEANING_INTERVAL; + case DATA_EXPIRING: + return currentTime - lastCleanupEndTime >= DATA_EXPIRING_INTERVAL; + default: + return true; + } + } + + // Public methods for testing private methods + public void initHandlerForTest(List tableRuntimeList) { + initHandler(tableRuntimeList); + } + + public boolean shouldExecuteTaskForTest( + TableRuntime tableRuntime, CleanupOperation cleanupOperation) { + return shouldExecuteTask(tableRuntime, cleanupOperation); + } + + public void executeTaskForTest(TableRuntime tableRuntime) { + // Access private method via reflection + invokePrivateMethod("executeTask", TableRuntime.class, tableRuntime); + } + + public void addToCacheForTest(long tableId, long cleanupTime) { + try { + java.lang.reflect.Field field = + PeriodicTableScheduler.class.getDeclaredField("tableIdWithLastCleanTimeCache"); + field.setAccessible(true); + Map cache = (Map) field.get(this); + cache.put(tableId, cleanupTime); + } catch (Exception e) { + throw new RuntimeException("Failed to add to cache for test", e); + } + } + + // Override isExecutable to avoid NPE when tableService is null + @Override + protected boolean isExecutable(TableRuntime tableRuntime) { + if (tableService == null) { + // When tableService is null (in tests), just check if the table is enabled + return enabled(tableRuntime); + } + return super.isExecutable(tableRuntime); + } + + /** + * Helper method to invoke private methods in PeriodicTableScheduler using reflection + * + * @param methodName The name of the method to invoke + * @param parameterType The parameter type of the method + * @param parameterValue The parameter value to pass to the method + */ + private void invokePrivateMethod( + String methodName, Class parameterType, Object parameterValue) { + try { + Method method = PeriodicTableScheduler.class.getDeclaredMethod(methodName, parameterType); + method.setAccessible(true); + method.invoke(this, parameterValue); + } catch (Exception e) { + throw new RuntimeException("Failed to invoke private method " + methodName + " for test", e); + } + } + + public Map getCacheForTest() { + try { + java.lang.reflect.Field field = + PeriodicTableScheduler.class.getDeclaredField("tableIdWithLastCleanTimeCache"); + field.setAccessible(true); + return (Map) field.get(this); + } catch (Exception e) { + throw new RuntimeException("Failed to get cache for test", e); + } + } +} diff --git a/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java new file mode 100644 index 0000000000..3c67423ea9 --- /dev/null +++ b/amoro-ams/src/test/java/org/apache/amoro/server/scheduler/inline/TestPeriodicTableSchedulerCleanup.java @@ -0,0 +1,524 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.amoro.server.scheduler.inline; + +import org.apache.amoro.ServerTableIdentifier; +import org.apache.amoro.TableRuntime; +import org.apache.amoro.config.TableConfiguration; +import org.apache.amoro.server.optimizing.OptimizingStatus; +import org.apache.amoro.server.persistence.PersistentBase; +import org.apache.amoro.server.persistence.mapper.TableCleanupMapper; +import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper; +import org.apache.amoro.server.scheduler.PeriodicTableScheduler; +import org.apache.amoro.server.table.DerbyPersistence; +import org.apache.amoro.server.table.TableRuntimeHandler; +import org.apache.amoro.server.table.cleanup.CleanupOperation; +import org.apache.amoro.server.table.cleanup.TableCleanupProcessMeta; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +/** + * This class tests all aspects of cleanup operation handling in {@link + * org.apache.amoro.server.scheduler.PeriodicTableScheduler}, including initialization, record + * persistence and task execution. + */ +public class TestPeriodicTableSchedulerCleanup extends PersistentBase { + + private static DerbyPersistence persistence; + private static final String TEST_CATALOG = "test_catalog"; + private static final String TEST_DB = "test_db"; + private static final String TEST_TABLE = "test_table"; + + private static final TableRuntimeHandler TEST_HANDLER = + new TableRuntimeHandler() { + @Override + public void handleTableChanged( + TableRuntime tableRuntime, OptimizingStatus originalStatus) {} + + @Override + public void handleTableChanged( + TableRuntime tableRuntime, TableConfiguration originalConfig) {} + }; + + @BeforeClass + public static void setUp() { + persistence = new DerbyPersistence(); + } + + @AfterClass + public static void tearDown() { + persistence = null; + } + + /** + * Create a test server table identifier with the given ID + * + * @param tableId the table ID + * @return a ServerTableIdentifier instance + */ + private ServerTableIdentifier createTableIdentifier(long tableId) { + return ServerTableIdentifier.of( + tableId, TEST_CATALOG, TEST_DB, TEST_TABLE + "_" + tableId, null); + } + + /** + * Create a test table runtime with the given identifier + * + * @param identifier the table identifier + * @return a TableRuntime instance + */ + private TableRuntime createTableRuntime(ServerTableIdentifier identifier) { + return new TableRuntime() { + @Override + public List getProcessStates() { + return Collections.emptyList(); + } + + @Override + public List getProcessStates( + org.apache.amoro.Action action) { + return Collections.emptyList(); + } + + @Override + public String getGroupName() { + return "test_group"; + } + + @Override + public ServerTableIdentifier getTableIdentifier() { + return identifier; + } + + @Override + public TableConfiguration getTableConfiguration() { + return new TableConfiguration(); + } + + @Override + public void registerMetric(org.apache.amoro.metrics.MetricRegistry metricRegistry) {} + + @Override + public void unregisterMetric() {} + }; + } + + private void cleanUpTestData(List tableIds) { + doAs( + TableCleanupMapper.class, + mapper -> { + for (Long tableId : tableIds) { + try { + mapper.deleteTableCleanupProcesses(tableId); + } catch (Exception e) { + // Ignore if tables don't exist + } + } + }); + } + + private void cleanUpTableRuntimeData(List tableIds) { + doAs( + TableRuntimeMapper.class, + mapper -> { + for (Long tableId : tableIds) { + try { + mapper.deleteRuntime(tableId); + } catch (Exception e) { + // Ignore if tables don't exist + } + } + }); + } + + /** + * Prepare test environment by cleaning up test data and table runtime data + * + * @param testTableIds list of table IDs to clean up + */ + private void prepareTestEnvironment(List testTableIds) { + cleanUpTestData(testTableIds); + cleanUpTableRuntimeData(testTableIds); + } + + /** + * Create a test table executor + * + * @param cleanupOperation the cleanup operation to use + * @param enabled whether the executor should be enabled + * @return a new PeriodicTableSchedulerTestBase instance + */ + private PeriodicTableSchedulerTestBase createTestExecutor( + CleanupOperation cleanupOperation, boolean enabled) { + return new PeriodicTableSchedulerTestBase(null, cleanupOperation, enabled); + } + + /** + * Create a test table executor with default enabled state (true) + * + * @param cleanupOperation the cleanup operation to use + * @return a new PeriodicTableSchedulerTestBase instance + */ + private PeriodicTableSchedulerTestBase createTestExecutor(CleanupOperation cleanupOperation) { + return createTestExecutor(cleanupOperation, true); + } + + /** + * Create test data with specified table IDs + * + * @param tableIds table IDs to create data for + * @return list of TableRuntime instances + */ + private List createTestTableRuntimes(List tableIds) { + List runtimes = new ArrayList<>(); + for (Long tableId : tableIds) { + ServerTableIdentifier identifier = createTableIdentifier(tableId); + runtimes.add(createTableRuntime(identifier)); + } + return runtimes; + } + + /** + * Helper method to set up a basic test environment with a single table + * + * @param cleanupOperation the cleanup operation to use + * @return PeriodicTableSchedulerTestBase and TableRuntime for testing + */ + private TestSetup setupSingleTableTest(CleanupOperation cleanupOperation) { + List testTableIds = Collections.singletonList(1L); + prepareTestEnvironment(testTableIds); + + PeriodicTableSchedulerTestBase executor = createTestExecutor(cleanupOperation); + ServerTableIdentifier identifier = createTableIdentifier(1L); + TableRuntime tableRuntime = createTableRuntime(identifier); + + return new TestSetup(executor, identifier, tableRuntime, testTableIds); + } + + /** Helper class to hold test setup data */ + private static class TestSetup { + final PeriodicTableSchedulerTestBase executor; + final ServerTableIdentifier identifier; + final TableRuntime tableRuntime; + final List tableIds; + + TestSetup( + PeriodicTableSchedulerTestBase executor, + ServerTableIdentifier identifier, + TableRuntime tableRuntime, + List tableIds) { + this.executor = executor; + this.identifier = identifier; + this.tableRuntime = tableRuntime; + this.tableIds = tableIds; + } + } + + private void insertCleanupRecord(long tableId, CleanupOperation operation, long cleanupTime) { + long currentTime = System.currentTimeMillis(); + doAs( + TableCleanupMapper.class, + mapper -> { + TableCleanupProcessMeta meta = + new TableCleanupProcessMeta( + currentTime, + tableId, + TEST_CATALOG, + TEST_DB, + TEST_TABLE + "_" + tableId, + operation, + cleanupTime); + mapper.insertTableCleanupProcess(meta); + }); + } + + @Test + public void testBatchQueryExistingCleanupInfoWithNoExistingRecords() { + // Setup test environment + List testTableIds = Arrays.asList(1L, 2L, 3L); + prepareTestEnvironment(testTableIds); + + // Create test table runtimes + List tableRuntimeList = createTestTableRuntimes(testTableIds); + + // Create test executor with DANGLING_DELETE_FILES_CLEANING cleanup operation + PeriodicTableSchedulerTestBase executor = + createTestExecutor(CleanupOperation.DANGLING_DELETE_FILES_CLEANING); + + // Directly invoke batchQueryExistingCleanupInfo method using reflection + invokeBatchQueryExistingCleanupInfo( + executor, tableRuntimeList, CleanupOperation.DANGLING_DELETE_FILES_CLEANING); + + // Cache should be empty as no existing records + assertCacheEmpty(executor, "Cache should be empty when no existing cleanup records found"); + } + + @Test + public void testBatchQueryExistingCleanupInfoWithAllExistingRecords() { + // Setup test environment + List testTableIds = Arrays.asList(1L, 2L, 3L); + prepareTestEnvironment(testTableIds); + + // Create test table runtimes first to get actual table IDs + List tableRuntimeList = createTestTableRuntimes(testTableIds); + + // Get actual table IDs from the runtime list + long tableId1 = tableRuntimeList.get(0).getTableIdentifier().getId(); + long tableId2 = tableRuntimeList.get(1).getTableIdentifier().getId(); + long tableId3 = tableRuntimeList.get(2).getTableIdentifier().getId(); + + // Insert existing cleanup records for all tables using actual table IDs + long currentTime = System.currentTimeMillis(); + insertCleanupRecord( + tableId1, CleanupOperation.DATA_EXPIRING, currentTime - 3600000L); // 1 hour ago + insertCleanupRecord( + tableId2, CleanupOperation.DATA_EXPIRING, currentTime - 1800000L); // 30 minutes ago + insertCleanupRecord( + tableId3, CleanupOperation.DATA_EXPIRING, currentTime - 900000L); // 15 minutes ago + + // Create test executor with DATA_EXPIRING cleanup operation + PeriodicTableSchedulerTestBase executor = createTestExecutor(CleanupOperation.DATA_EXPIRING); + + // Directly invoke batchQueryExistingCleanupInfo method using reflection + invokeBatchQueryExistingCleanupInfo(executor, tableRuntimeList, CleanupOperation.DATA_EXPIRING); + + // Verify that all existing cleanup records are loaded into cache + Map cache = executor.getCacheForTest(); + + Assert.assertNotNull("Cache should not be null", cache); + Assert.assertEquals("Cache should contain 3 entries", 3, cache.size()); + Assert.assertTrue("Cache should contain first table", cache.containsKey(tableId1)); + Assert.assertTrue("Cache should contain second table", cache.containsKey(tableId2)); + Assert.assertTrue("Cache should contain third table", cache.containsKey(tableId3)); + + // Verify the cached times are correct + Assert.assertEquals( + "Table 1 cleanup time should match", currentTime - 3600000L, (long) cache.get(tableId1)); + Assert.assertEquals( + "Table 2 cleanup time should match", currentTime - 1800000L, (long) cache.get(tableId2)); + Assert.assertEquals( + "Table 3 cleanup time should match", currentTime - 900000L, (long) cache.get(tableId3)); + } + + @Test + public void testBatchQueryExistingCleanupInfoWithDifferentCleanupOperations() { + // Setup test environment + List testTableIds = Arrays.asList(1L, 2L); + prepareTestEnvironment(testTableIds); + + // Create test table runtimes first to get actual table IDs + List tableRuntimeList = createTestTableRuntimes(testTableIds); + + // Get actual table IDs from the runtime list + long tableId1 = tableRuntimeList.get(0).getTableIdentifier().getId(); + long tableId2 = tableRuntimeList.get(1).getTableIdentifier().getId(); + + // Insert records with different cleanup operations using actual table IDs + long currentTime = System.currentTimeMillis(); + insertCleanupRecord( + tableId1, + CleanupOperation.DANGLING_DELETE_FILES_CLEANING, + currentTime - 3600000L); // 1 hour ago + insertCleanupRecord( + tableId1, CleanupOperation.DATA_EXPIRING, currentTime - 1800000L); // 30 minutes ago + insertCleanupRecord( + tableId2, CleanupOperation.DATA_EXPIRING, currentTime - 900000L); // 15 minutes ago + + // Create test executor with DATA_EXPIRING cleanup operation + PeriodicTableSchedulerTestBase executor = createTestExecutor(CleanupOperation.DATA_EXPIRING); + + // Directly invoke batchQueryExistingCleanupInfo method using reflection + invokeBatchQueryExistingCleanupInfo(executor, tableRuntimeList, CleanupOperation.DATA_EXPIRING); + + // Verify only DATA_EXPIRING records are loaded + Map cache = executor.getCacheForTest(); + + Assert.assertNotNull("Cache should not be null", cache); + Assert.assertEquals("Cache should contain 2 entries", 2, cache.size()); + Assert.assertTrue("Cache should contain first table", cache.containsKey(tableId1)); + Assert.assertTrue("Cache should contain second table", cache.containsKey(tableId2)); + + // Verify the cached time matches DATA_EXPIRING operation + Assert.assertEquals( + "Table 1 cleanup time should match DATA_EXPIRING record", + currentTime - 1800000L, + (long) cache.get(tableId1)); + Assert.assertEquals( + "Table 2 cleanup time should match DATA_EXPIRING record", + currentTime - 900000L, + (long) cache.get(tableId2)); + } + + private void invokeBatchQueryExistingCleanupInfo( + PeriodicTableSchedulerTestBase executor, + List tableRuntimes, + CleanupOperation cleanupOperation) { + try { + Method method = + PeriodicTableScheduler.class.getDeclaredMethod( + "batchQueryExistingCleanupInfo", List.class, CleanupOperation.class); + method.setAccessible(true); + method.invoke(executor, tableRuntimes, cleanupOperation); + } catch (Exception e) { + throw new RuntimeException("Failed to invoke batchQueryExistingCleanupInfo method", e); + } + } + + private void assertCacheEmpty(PeriodicTableSchedulerTestBase executor, String message) { + Map cache = executor.getCacheForTest(); + Assert.assertTrue(message, cache != null && cache.isEmpty()); + } + + @Test + public void testUpsertWithInsertOperation() { + TestSetup setup = setupSingleTableTest(CleanupOperation.DATA_EXPIRING); + + // Execute a task which should insert the cleanup time record + setup.executor.executeTaskForTest(setup.tableRuntime); + + // Check that the cleanup record was inserted + List records = + getAs( + TableCleanupMapper.class, + mapper -> + mapper.selectTableIdAndLastCleanupEndTime( + setup.tableIds, CleanupOperation.DATA_EXPIRING)); + + Assert.assertEquals(1, records.size()); + TableCleanupProcessMeta record = records.get(0); + Assert.assertNotNull( + "Last cleanup end time should not be null", record.getLastCleanupEndTime()); + } + + @Test + public void testUpsertWithUpdateOperation() { + TestSetup setup = setupSingleTableTest(CleanupOperation.DATA_EXPIRING); + + // Insert initial cleanup record + long initialTime = System.currentTimeMillis() - 10000; // 10 seconds ago + doAs( + TableCleanupMapper.class, + mapper -> { + TableCleanupProcessMeta meta = + new TableCleanupProcessMeta( + System.currentTimeMillis(), + 1L, + TEST_CATALOG, + TEST_DB, + TEST_TABLE + "_1", + CleanupOperation.DATA_EXPIRING, + initialTime); + mapper.insertTableCleanupProcess(meta); + }); + + // Execute task which should update the existing record + setup.executor.executeTaskForTest(setup.tableRuntime); + + // Check that the cleanup record was updated (time should be more recent) + List records = + getAs( + TableCleanupMapper.class, + mapper -> + mapper.selectTableIdAndLastCleanupEndTime( + setup.tableIds, CleanupOperation.DATA_EXPIRING)); + + Assert.assertEquals(1, records.size()); + TableCleanupProcessMeta record = records.get(0); + Assert.assertTrue( + "Last cleanup end time should be updated", record.getLastCleanupEndTime() > initialTime); + } + + @Test + public void testShouldExecuteTaskWithNoPreviousCleanup() { + TestSetup setup = setupSingleTableTest(CleanupOperation.SNAPSHOTS_EXPIRING); + + // Should execute when there's no previous cleanup time + boolean shouldExecute = + setup.executor.shouldExecuteTaskForTest( + setup.tableRuntime, CleanupOperation.SNAPSHOTS_EXPIRING); + Assert.assertTrue("Should execute when there's no previous cleanup time", shouldExecute); + } + + @Test + public void testShouldExecuteTaskWithPreviousCleanup() { + // Setup test environment - only clean up table runtime data for this test + List testTableIds = Collections.singletonList(1L); + cleanUpTableRuntimeData(testTableIds); + + // Create test executor with SNAPSHOTS_EXPIRING cleanup operation + PeriodicTableSchedulerTestBase executor = + createTestExecutor(CleanupOperation.SNAPSHOTS_EXPIRING); + + // Add a previous cleanup time to the cache (more than 1 hour ago) + long oneHourAgo = System.currentTimeMillis() - 60 * 60 * 1000L; + executor.addToCacheForTest(1L, oneHourAgo); + + ServerTableIdentifier identifier = createTableIdentifier(1L); + TableRuntime tableRuntime = createTableRuntime(identifier); + + // Should execute because enough time has passed since last cleanup + boolean shouldExecute = + executor.shouldExecuteTaskForTest(tableRuntime, CleanupOperation.SNAPSHOTS_EXPIRING); + Assert.assertTrue( + "Should execute when enough time has passed since last cleanup", shouldExecute); + } + + @Test + public void testShouldNotExecuteTaskWithRecentCleanup() { + // Setup test environment - only clean up table runtime data for this test + List testTableIds = Collections.singletonList(1L); + cleanUpTableRuntimeData(testTableIds); + + // Create test executor with SNAPSHOTS_EXPIRING cleanup operation + PeriodicTableSchedulerTestBase executor = + createTestExecutor(CleanupOperation.SNAPSHOTS_EXPIRING); + + // Add a recent cleanup time to the cache (1 second ago) + long recentCleanupTime = System.currentTimeMillis() - 1000L; + executor.addToCacheForTest(1L, recentCleanupTime); + + ServerTableIdentifier identifier = createTableIdentifier(1L); + TableRuntime tableRuntime = createTableRuntime(identifier); + + // Should not execute because not enough time has passed since last cleanup + boolean shouldExecute = + executor.shouldExecuteTaskForTest(tableRuntime, CleanupOperation.SNAPSHOTS_EXPIRING); + Assert.assertFalse( + "Should not execute when not enough time has passed since last cleanup", shouldExecute); + } + + @Test + public void testShouldExecuteTaskWithNoneOperation() { + TestSetup setup = setupSingleTableTest(CleanupOperation.NONE); + + // Should always execute with NONE operation + boolean shouldExecute = + setup.executor.shouldExecuteTaskForTest(setup.tableRuntime, CleanupOperation.NONE); + Assert.assertTrue("Should always execute with NONE operation", shouldExecute); + } +}