Skip to content

Commit bce7e0d

Browse files
author
张文领
committed
Save the last completion time for each cleanup operation performed on each optimization table.
1 parent 0d7a006 commit bce7e0d

19 files changed

+1572
-10
lines changed

amoro-ams/src/main/java/org/apache/amoro/server/persistence/SqlSessionFactoryProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.amoro.server.persistence.mapper.PlatformFileMapper;
3232
import org.apache.amoro.server.persistence.mapper.ResourceMapper;
3333
import org.apache.amoro.server.persistence.mapper.TableBlockerMapper;
34+
import org.apache.amoro.server.persistence.mapper.TableCleanupMapper;
3435
import org.apache.amoro.server.persistence.mapper.TableMetaMapper;
3536
import org.apache.amoro.server.persistence.mapper.TableProcessMapper;
3637
import org.apache.amoro.server.persistence.mapper.TableRuntimeMapper;
@@ -74,6 +75,7 @@ public void init(DataSource dataSource) throws SQLException {
7475
configuration.addMapper(TableBlockerMapper.class);
7576
configuration.addMapper(TableProcessMapper.class);
7677
configuration.addMapper(TableRuntimeMapper.class);
78+
configuration.addMapper(TableCleanupMapper.class);
7779

7880
PageInterceptor interceptor = new PageInterceptor();
7981
Properties interceptorProperties = new Properties();
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.amoro.server.persistence.converter;
20+
21+
import org.apache.amoro.server.table.cleanup.CleanupOperation;
22+
import org.apache.ibatis.type.BaseTypeHandler;
23+
import org.apache.ibatis.type.JdbcType;
24+
import org.apache.ibatis.type.MappedJdbcTypes;
25+
import org.apache.ibatis.type.MappedTypes;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.sql.CallableStatement;
30+
import java.sql.PreparedStatement;
31+
import java.sql.ResultSet;
32+
import java.sql.SQLException;
33+
34+
/**
35+
* MyBatis type handler for converting between CleanupOperation enum and database integer values.
36+
* This handler maps CleanupOperation enum values to their corresponding integer codes for
37+
* persistence, and converts integer codes back to enum values when reading from the database.
38+
*/
39+
@MappedJdbcTypes(JdbcType.INTEGER)
40+
@MappedTypes(CleanupOperation.class)
41+
public class CleanupOperationConverter extends BaseTypeHandler<CleanupOperation> {
42+
private static final Logger LOG = LoggerFactory.getLogger(CleanupOperationConverter.class);
43+
44+
@Override
45+
public void setNonNullParameter(
46+
PreparedStatement ps, int i, CleanupOperation parameter, JdbcType jdbcType)
47+
throws SQLException {
48+
ps.setInt(i, parameter.getCode());
49+
}
50+
51+
@Override
52+
public CleanupOperation getNullableResult(ResultSet rs, String columnName) throws SQLException {
53+
return convertFromString(rs.getString(columnName));
54+
}
55+
56+
@Override
57+
public CleanupOperation getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
58+
return convertFromString(rs.getString(columnIndex));
59+
}
60+
61+
@Override
62+
public CleanupOperation getNullableResult(CallableStatement cs, int columnIndex)
63+
throws SQLException {
64+
return convertFromString(cs.getString(columnIndex));
65+
}
66+
67+
/**
68+
* Converts a string value to a CleanupOperation enum value. Handles null values and parsing
69+
* errors gracefully.
70+
*
71+
* @param s The string value from the database
72+
* @return The corresponding CleanupOperation enum value, or null if conversion fails
73+
*/
74+
private CleanupOperation convertFromString(String s) {
75+
if (s == null) {
76+
return null;
77+
}
78+
try {
79+
return CleanupOperation.fromCode(Integer.parseInt(s));
80+
} catch (NumberFormatException e) {
81+
LOG.warn("Failed to parse CleanupOperation from string: {}", s, e);
82+
return null;
83+
}
84+
}
85+
}
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.amoro.server.persistence.mapper;
20+
21+
import org.apache.amoro.server.persistence.converter.Long2TsConverter;
22+
import org.apache.amoro.server.table.cleanup.CleanupOperation;
23+
import org.apache.amoro.server.table.cleanup.TableCleanupProcessMeta;
24+
import org.apache.ibatis.annotations.Delete;
25+
import org.apache.ibatis.annotations.Insert;
26+
import org.apache.ibatis.annotations.Param;
27+
import org.apache.ibatis.annotations.Result;
28+
import org.apache.ibatis.annotations.Results;
29+
import org.apache.ibatis.annotations.Select;
30+
import org.apache.ibatis.annotations.Update;
31+
32+
import java.util.List;
33+
34+
public interface TableCleanupMapper {
35+
String TABLE_NAME = "table_cleanup_process";
36+
37+
/** Insert a new table cleanup process record */
38+
@Insert(
39+
"INSERT INTO "
40+
+ TABLE_NAME
41+
+ " (cleanup_process_id, table_id, catalog_name, db_name, table_name, cleanup_operation_code, last_cleanup_end_time) "
42+
+ "VALUES (#{cleanupProcessMeta.cleanupProcessId},"
43+
+ " #{cleanupProcessMeta.tableId}, #{cleanupProcessMeta.catalogName}, "
44+
+ "#{cleanupProcessMeta.dbName}, #{cleanupProcessMeta.tableName},"
45+
+ "#{cleanupProcessMeta.cleanupOperation,"
46+
+ " typeHandler=org.apache.amoro.server.persistence.converter.CleanupOperationConverter},"
47+
+ "#{cleanupProcessMeta.lastCleanupEndTime,"
48+
+ " typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter})")
49+
void insertTableCleanupProcess(
50+
@Param("cleanupProcessMeta") TableCleanupProcessMeta cleanupProcessMeta);
51+
52+
/** Update lastCleanupEndTime field by table_id and cleanup_operation_code */
53+
@Update(
54+
"UPDATE "
55+
+ TABLE_NAME
56+
+ " SET last_cleanup_end_time = #{lastCleanupEndTime, typeHandler=org.apache.amoro.server.persistence.converter.Long2TsConverter} "
57+
+ "WHERE table_id = #{tableId} AND cleanup_operation_code = #{cleanupOperation,"
58+
+ " typeHandler=org.apache.amoro.server.persistence.converter.CleanupOperationConverter}")
59+
int updateLastCleanupEndTimeByTableIdAndCleanupOperation(
60+
@Param("tableId") long tableId,
61+
@Param("cleanupOperation") CleanupOperation cleanupOperation,
62+
@Param("lastCleanupEndTime") long lastCleanupEndTime);
63+
64+
/**
65+
* Select table_id and last_cleanup_end_time by cleanup_operation_code and table_ids If
66+
* last_cleanup_end_time is NULL in database, it will be converted to 0L as default value
67+
*/
68+
@Select(
69+
"<script>"
70+
+ "SELECT table_id,last_cleanup_end_time "
71+
+ "FROM "
72+
+ TABLE_NAME
73+
+ " WHERE cleanup_operation_code = #{cleanupOperation,"
74+
+ " typeHandler=org.apache.amoro.server.persistence.converter.CleanupOperationConverter}"
75+
+ " AND table_id IN "
76+
+ "<foreach item='tableId' collection='tableIds' open='(' separator=',' close=')'>"
77+
+ "#{tableId}"
78+
+ "</foreach>"
79+
+ "</script>")
80+
@Results({
81+
@Result(property = "tableId", column = "table_id"),
82+
@Result(
83+
property = "lastCleanupEndTime",
84+
column = "last_cleanup_end_time",
85+
typeHandler = Long2TsConverter.class)
86+
})
87+
List<TableCleanupProcessMeta> selectTableIdAndLastCleanupEndTime(
88+
@Param("tableIds") List<Long> tableIds,
89+
@Param("cleanupOperation") CleanupOperation cleanupOperation);
90+
91+
/** Delete all cleanup process records for a specific table */
92+
@Delete("DELETE FROM " + TABLE_NAME + " WHERE table_id = #{tableId}")
93+
void deleteTableCleanupProcesses(@Param("tableId") long tableId);
94+
}

0 commit comments

Comments
 (0)