Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -1856,7 +1856,9 @@ public void testDropTableWithSuffix() throws Exception {
}
MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService();
houseKeeperService.setConf(hiveConf);


Thread.sleep(MetastoreConf.getTimeVar(hiveConf,
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
houseKeeperService.run();
count = TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + tableName + "'");
Expand Down Expand Up @@ -1945,6 +1947,8 @@ public void testDropMaterializedViewWithSuffix() throws Exception {
MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService();
houseKeeperService.setConf(hiveConf);

Thread.sleep(MetastoreConf.getTimeVar(hiveConf,
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
houseKeeperService.run();
count = TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + mviewName + "'");
Expand Down
4 changes: 4 additions & 0 deletions ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
Original file line number Diff line number Diff line change
Expand Up @@ -2373,6 +2373,7 @@ public void testCleanerForTxnToWriteId() throws Exception {
txnHandler.compact(new CompactionRequest("default", Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR));
runWorker(hiveConf);
runCleaner(hiveConf);
txnHandler.performWriteSetGC();
txnHandler.cleanTxnToWriteIdTable();

// After compaction/cleanup, all entries from TXN_TO_WRITE_ID should be cleaned up as all txns are committed.
Expand Down Expand Up @@ -2416,6 +2417,7 @@ public void testCleanerForTxnToWriteId() throws Exception {
// aborted txn would be removed from TXNS only after the compaction. Also, committed txn > open txn is retained.
// As open txn doesn't allocate writeid, the 2 entries for aborted and committed should be retained.
txnHandler.cleanEmptyAbortedAndCommittedTxns();
txnHandler.performWriteSetGC();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID" + acidTblWhereClause),
3, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID" + acidTblWhereClause));
Expand All @@ -2428,6 +2430,7 @@ public void testCleanerForTxnToWriteId() throws Exception {
runWorker(hiveConf);
runCleaner(hiveConf);
txnHandler.cleanEmptyAbortedAndCommittedTxns();
txnHandler.performWriteSetGC();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"),
3, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from TXN_TO_WRITE_ID"));
Expand All @@ -2439,6 +2442,7 @@ public void testCleanerForTxnToWriteId() throws Exception {
// The txn opened after the compaction commit should not effect the Cleaner
runCleaner(hiveConf);
txnHandler.cleanEmptyAbortedAndCommittedTxns();
txnHandler.performWriteSetGC();
txnHandler.cleanTxnToWriteIdTable();

Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from TXN_TO_WRITE_ID"),
Expand Down
4 changes: 2 additions & 2 deletions ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public void testRenameTable() throws Exception {
"select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='s'"));
Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'"));
Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf,
Assert.assertEquals(2, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from WRITE_SET where WS_TABLE='s'"));
Assert.assertEquals(3, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'"));
Expand All @@ -125,7 +125,7 @@ public void testRenameTable() throws Exception {
"select count(*) from COMPLETED_TXN_COMPONENTS where CTC_TABLE='bar'"));
Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from COMPACTION_QUEUE where CQ_TABLE='bar'"));
Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf,
Assert.assertEquals(2, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from WRITE_SET where WS_TABLE='bar'"));
Assert.assertEquals(4, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='bar'"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfForTest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
Expand Down Expand Up @@ -78,7 +79,9 @@ public void setUp() throws Exception {
HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_LOCKS_PARTITION_THRESHOLD, -1);
HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, false);
HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);

MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, true);
TxnHandler.ConfVars.setUseMinHistoryWriteId(true);

driver = new Driver(new QueryState.Builder().withHiveConf(conf).nonIsolated().build());
driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.metastore.txn.service.CompactionHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
import org.apache.hadoop.hive.ql.Context;
Expand Down Expand Up @@ -1235,6 +1236,9 @@ public void testWriteSetTracking4() throws Exception {
*/
@Test
public void testWriteSetTracking5() throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
TxnHandler.ConfVars.setUseMinHistoryWriteId(false);

dropTable(new String[] {"TAB_PART"});
Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "select count(*) from \"WRITE_SET\""));
driver.run("create table if not exists TAB_PART (a int, b int) " +
Expand Down Expand Up @@ -2109,6 +2113,9 @@ public void testMergeUnpartitionedConflictSharedWrite() throws Exception {
* @param causeConflict true to make 2 operations such that they update the same entity
*/
private void testMergeUnpartitioned(boolean causeConflict, boolean sharedWrite) throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
TxnHandler.ConfVars.setUseMinHistoryWriteId(false);

dropTable(new String[] {"target","source"});
conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite);

Expand Down Expand Up @@ -2873,6 +2880,9 @@ public void testMergePartitionedConflictSharedWrite() throws Exception {
* @param causeConflict - true to make the operations cause a Write conflict
*/
private void testMergePartitioned(boolean causeConflict, boolean sharedWrite) throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
TxnHandler.ConfVars.setUseMinHistoryWriteId(false);

dropTable(new String[] {"target","source"});
conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite);

Expand Down Expand Up @@ -3537,7 +3547,8 @@ public void testSkipAcquireLocksForExplain() throws Exception {

@Test
public void testInsertSnapshotIsolationMinHistoryDisabled() throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, false);
TxnHandler.ConfVars.setUseMinHistoryWriteId(false);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
testInsertSnapshotIsolation();
}

Expand Down Expand Up @@ -3566,7 +3577,8 @@ public void testInsertSnapshotIsolation() throws Exception {

@Test
public void testUpdateSnapshotIsolationMinHistoryDisabled() throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, false);
TxnHandler.ConfVars.setUseMinHistoryWriteId(false);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
testUpdateSnapshotIsolation();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class TestCompactionMetrics extends CompactorTest {
public void setUp() throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED, true);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);
TxnHandler.ConfVars.setUseMinHistoryLevel(true);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
// re-initialize metrics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1675,10 +1675,11 @@ public enum ConfVars {
"time after which transactions are declared aborted if the client has not sent a heartbeat."),
TXN_OPENTXN_TIMEOUT("metastore.txn.opentxn.timeout", "hive.txn.opentxn.timeout", 1000, TimeUnit.MILLISECONDS,
"Time before an open transaction operation should persist, otherwise it is considered invalid and rolled back"),
@Deprecated
TXN_USE_MIN_HISTORY_LEVEL("metastore.txn.use.minhistorylevel", "hive.txn.use.minhistorylevel", true,
"Set this to false, for the TxnHandler and Cleaner to not use MIN_HISTORY_LEVEL table and take advantage of openTxn optimisation.\n"
+ "If the table is dropped HMS will switch this flag to false, any other value changes need a restart to take effect."),
TXN_USE_MIN_HISTORY_WRITE_ID("metastore.txn.use.minhistorywriteid", "hive.txn.use.minhistorywriteid", false,
TXN_USE_MIN_HISTORY_WRITE_ID("metastore.txn.use.minhistorywriteid", "hive.txn.use.minhistorywriteid", true,
"Set this to true, to avoid global minOpenTxn check in Cleaner.\n"
+ "If the table is dropped HMS will switch this flag to false."),
LOCK_NUMRETRIES("metastore.lock.numretries", "hive.lock.numretries", 100,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ private void updateStatus(CompactionInfo ci) throws MetaException {
String strState = CompactionState.fromSqlConst(ci.state).toString();

LOG.debug("Marking as {}: CompactionInfo: {}", strState, ci);
CompactionInfo ciActual = jdbcResource.execute(new GetCompactionInfoHandler(ci.id, false));
CompactionInfo ciActual = jdbcResource.execute(new GetCompactionInfoHandler(ci.id, false));

long endTime = getDbTime().getTime();
if (ciActual != null) {
Expand Down Expand Up @@ -505,7 +505,7 @@ public long findMinOpenTxnIdForCleaner() throws MetaException {
@RetrySemantics.Idempotent
@Deprecated
public long findMinTxnIdSeenOpen() {
if (!ConfVars.useMinHistoryLevel() || ConfVars.useMinHistoryWriteId()) {
if (!ConfVars.useMinHistoryLevel()) {
return Long.MAX_VALUE;
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ private ConfVars() {}
private boolean useMinHistoryWriteId;

public boolean useMinHistoryLevel() {
return useMinHistoryLevel;
return useMinHistoryLevel && !useMinHistoryWriteId;
}

public void setUseMinHistoryLevel(boolean useMinHistoryLevel) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,36 +18,18 @@
package org.apache.hadoop.hive.metastore.txn.jdbc.functions;

import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus;
import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionalFunction;
import org.apache.hadoop.hive.metastore.txn.jdbc.queries.MinUncommittedTxnIdHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;

import java.sql.Types;

public class CleanTxnToWriteIdTableFunction implements TransactionalFunction<Void> {

private static final Logger LOG = LoggerFactory.getLogger(CleanTxnToWriteIdTableFunction.class);

//language=SQL
private static String minHistoryLevelSql = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" +
" SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\"" +
" UNION" +
" SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = :abortedState) \"RES\"";
//language=SQL
private static String noMinHistoryLevelSql = "SELECT MIN(\"RES\".\"ID\") AS \"ID\" FROM (" +
" SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\"" +
" UNION" +
" SELECT MIN(\"WS_TXNID\") AS \"ID\" FROM \"WRITE_SET\"" +
" UNION" +
" SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = " + TxnStatus.ABORTED +
" OR \"TXN_STATE\" = " + TxnStatus.OPEN +
" ) \"RES\"";

private final long minTxnIdSeenOpen;

public CleanTxnToWriteIdTableFunction(long minTxnIdSeenOpen) {
Expand All @@ -56,32 +38,17 @@ public CleanTxnToWriteIdTableFunction(long minTxnIdSeenOpen) {

@Override
public Void execute(MultiDataSourceJdbcResource jdbcResource) throws MetaException {
NamedParameterJdbcTemplate jdbcTemplate = jdbcResource.getJdbcTemplate();
String sql = TxnHandler.ConfVars.useMinHistoryLevel() ? minHistoryLevelSql : noMinHistoryLevelSql;
MapSqlParameterSource params = new MapSqlParameterSource()
.addValue("abortedState", TxnStatus.ABORTED.getSqlConst(), Types.CHAR);
if (!TxnHandler.ConfVars.useMinHistoryLevel()) {
params.addValue("openState", TxnStatus.OPEN.getSqlConst(), Types.CHAR);
}

// First need to find the min_uncommitted_txnid which is currently seen by any open transactions.
// If there are no txns which are currently open or aborted in the system, then current value of
// max(TXNS.txn_id) could be min_uncommitted_txnid.
Long minTxnId = jdbcTemplate.query(sql, params, rs -> {
if (rs.next()) {
return rs.getLong(1);
} else {
return null;
}
});

Long minTxnId = jdbcResource.execute(new MinUncommittedTxnIdHandler());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

existing MinUncommittedTxnIdHandler wasn't used for unknown reason

if (minTxnId == null) {
throw new MetaException("Transaction tables not properly initialized, no record found in TXNS");
}
long minUncommitedTxnid = Math.min(minTxnId, minTxnIdSeenOpen);

// As all txns below min_uncommitted_txnid are either committed or empty_aborted, we are allowed
// to clean up the entries less than min_uncommitted_txnid from the TXN_TO_WRITE_ID table.
NamedParameterJdbcTemplate jdbcTemplate = jdbcResource.getJdbcTemplate();
int rc = jdbcTemplate.update("DELETE FROM \"TXN_TO_WRITE_ID\" WHERE \"T2W_TXNID\" < :txnId",
new MapSqlParameterSource("txnId", minUncommitedTxnid));
LOG.info("Removed {} rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark: {}", rc, minUncommitedTxnid);
Expand Down
Loading