Skip to content

Conversation

@dengzhhu653
Copy link
Member

@dengzhhu653 dengzhhu653 commented Oct 30, 2025

What changes were proposed in this pull request?

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

Tested on Postgres 17.2, MariaDB 10.3.39-MariaDB-1, MySQL 9.1.0-1.el9 and 5.7.44 and Oracle 23

@sonarqubecloud
Copy link

@deniskuzZ
Copy link
Member

deniskuzZ commented Nov 12, 2025

It is still using pessimistic locking. how about

Transaction A: UPDATE version = version + 1 (starts at v=5)
Transaction B: UPDATE version = version + 1 (starts at v=5)

Database MVCC:
├─ Transaction A gets version 5, increments to 6, commits
├─ Transaction B sees old version 5 (MVCC snapshot)
├─ When B tries to commit:
│ ├─ Detects conflict (row changed by A)
│ ├─ updCount = 0 (WHERE clause fails - version is now 6, not 5)
│ └─ Returns null to signal conflict

 // ✅ OPTIMISTIC LOCKING: Read current version, increment, and prepare for atomic check
String currentVersionStr = table.getParameters().get(versionParamKey);
long currentVersion = (currentVersionStr != null ? Long.parseLong(currentVersionStr) : 0L);
long newVersion = currentVersion + 1;
newParams.put(versionParamKey, String.valueOf(newVersion));
        
oldt.setParameters(newParams);
        
 // ✅ Atomically increment version with conflict detection
// This UPDATE will fail if another transaction changed the version
int updCount = incrementTableVersionAtomic(mTable.getId(), versionParamKey, currentVersion, newVersion);
        
if (updCount == 0) {
   // Concurrent modification detected - retry
   LOG.debug("Table {}.{} was modified by another transaction (version {} changed), retrying...", dbname, name, currentVersion);
   throw new RetryingExecutor.RetryException(
              new MetaException("Optimistic lock failure - table version changed"));
}
        
LOG.debug("Successfully updated table {}.{} version: {} -> {}", dbname, name, currentVersion, newVersion);

.............

  private int incrementTableVersionAtomic(long tblId, String versionParamKey, 
      long expectedVersion, long newVersion) throws MetaException {
    
    try {
      // First, try to UPDATE with optimistic lock check
      String updateSQL = "UPDATE \"TABLE_PARAMS\" " +
          "SET \"PARAM_VALUE\" = '" + newVersion + "' " +
          "WHERE \"TBL_ID\" = " + tblId + 
          " AND \"PARAM_KEY\" = '" + versionParamKey + "' " +
          " AND \"PARAM_VALUE\" = '" + expectedVersion + "'";
      
      int updCount = executePlainSQLUpdate(updateSQL);
      
      if (updCount == 1) {
        // Success - version was incremented
        return 1;
      }

@deniskuzZ
Copy link
Member

deniskuzZ commented Nov 12, 2025

patch example

Subject: [PATCH] DRAFT
---
Index: standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java
--- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java	(revision c729ea19807c0c0ca6f1df4870fff49660e95a85)
+++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/ObjectStore.java	(date 1762970970505)
@@ -27,6 +27,7 @@
 import java.io.IOException;
 import java.net.InetAddress;
 import java.sql.Connection;
+import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.SQLIntegrityConstraintViolationException;
 import java.sql.Statement;
@@ -9171,21 +9172,35 @@
       int maxRetries = MetastoreConf.getIntVar(conf, ConfVars.METASTORE_S4U_NOWAIT_MAX_RETRIES);
       long sleepInterval = MetastoreConf.getTimeVar(conf,
           ConfVars.METASTORE_S4U_NOWAIT_RETRY_SLEEP_INTERVAL, TimeUnit.MILLISECONDS);
+      
+      final String versionParamKey = "hive.metastore.table.version";
+      
       Map<String, String> result = new RetryingExecutor<>(maxRetries, () -> {
-        Ref<Exception> exceptionRef = new Ref<>();
-        String savePoint = "uts_" + ThreadLocalRandom.current().nextInt(10000) + "_" + System.nanoTime();
-        setTransactionSavePoint(savePoint);
-        executePlainSQL(
-            sqlGenerator.addForUpdateNoWait("SELECT \"TBL_ID\" FROM \"TBLS\" WHERE \"TBL_ID\" = " + mTable.getId()),
-            exception -> {
-              rollbackTransactionToSavePoint(savePoint);
-              exceptionRef.t = exception;
-            });
-        if (exceptionRef.t != null) {
-          throw new RetryingExecutor.RetryException(exceptionRef.t);
-        }
         pm.refresh(mTable);
         Table table = convertToTable(mTable);
+        String dbname = table.getDbName();
+        String name = table.getTableName();
+        
+        // ✅ STEP 1: Read current version snapshot from TABLE_PARAMS
+        String currentVersionStr = table.getParameters().get(versionParamKey);
+        long expectedVersion = (currentVersionStr != null ? Long.parseLong(currentVersionStr) : 0L);
+        long newVersion = expectedVersion + 1;
+        
+        // ✅ STEP 2: Atomically claim the version with snapshot check
+        // This UPDATE will ONLY succeed if PARAM_VALUE hasn't changed (optimistic lock)
+        boolean claimed = claimTableVersionWithSnapshot(mTable.getId(), versionParamKey, expectedVersion, newVersion);
+        
+        if (!claimed) {
+          // Version conflict - PARAM_VALUE changed since we read it
+          LOG.debug("Table {}.{} version conflict (expected={}), retrying...", dbname, name, expectedVersion);
+          throw new RetryingExecutor.RetryException(
+              new MetaException("Optimistic lock failure - table version changed from " + expectedVersion));
+        }
+        
+        // ✅ STEP 3: Successfully claimed version - now do the work
+        LOG.debug("Claimed table {}.{} version {} -> {}, proceeding with stats update", 
+            dbname, name, expectedVersion, newVersion);
+        
         List<String> colNames = new ArrayList<>();
         for (ColumnStatisticsObj statsObj : statsObjs) {
           colNames.add(statsObj.getColName());
@@ -9201,17 +9216,14 @@
           MTableColumnStatistics mStatsObj = StatObjectConverter.convertToMTableColumnStatistics(mTable, statsDesc,
               statsObj, colStats.getEngine());
           writeMTableColumnStatistics(table, mStatsObj, oldStats.get(statsObj.getColName()));
-          // There is no need to add colname again, otherwise we will get duplicate colNames.
         }
 
         // TODO: (HIVE-20109) ideally the col stats stats should be in colstats, not in the table!
         // Set the table properties
-        // No need to check again if it exists.
-        String dbname = table.getDbName();
-        String name = table.getTableName();
         MTable oldt = mTable;
         Map<String, String> newParams = new HashMap<>(table.getParameters());
         StatsSetupConst.setColumnStatsState(newParams, colNames);
+        
         boolean isTxn = TxnUtils.isTransactionalTable(oldt.getParameters());
         if (isTxn) {
           if (!areTxnStatsSupported) {
@@ -9223,14 +9235,19 @@
               throw new MetaException(errorMsg);
             }
             if (!isCurrentStatsValidForTheQuery(oldt, validWriteIds, true)) {
-              // Make sure we set the flag to invalid regardless of the current value.
               StatsSetupConst.setBasicStatsState(newParams, StatsSetupConst.FALSE);
               LOG.info("Removed COLUMN_STATS_ACCURATE from the parameters of the table " + dbname + "." + name);
             }
             oldt.setWriteId(writeId);
           }
         }
+        
+        // ✅ STEP 4: Add the new version to params (already updated in DB, keep in-memory consistent)
+        newParams.put(versionParamKey, String.valueOf(newVersion));
         oldt.setParameters(newParams);
+        
+        LOG.debug("Successfully updated table {}.{} stats with version {}", dbname, name, newVersion);
+        
         return newParams;
       }).onRetry(e -> e instanceof RetryingExecutor.RetryException)
         .commandName("updateTableColumnStatistics").sleepInterval(sleepInterval, interval ->
@@ -14183,4 +14200,113 @@
       }
     }
   }
+
+  /**
+   * Atomically claim the table version with optimistic locking.
+   * Uses UPDATE with WHERE clause to verify the snapshot (expected version) hasn't changed.
+   * 
+   * Strategy (Optimistic Locking with MVCC):
+   * 1. UPDATE version to newVersion WHERE current value = expectedVersion
+   * 2. If updCount = 1: Success (snapshot unchanged, version claimed)
+   * 3. If updCount = 0: Either conflict (version changed) OR first time (no row)
+   * 4. For first time, try INSERT with version 1
+   * 
+   * @param tblId the table ID
+   * @param versionParamKey the parameter key for version
+   * @param expectedVersion the version snapshot we read (must match for UPDATE to succeed)
+   * @param newVersion the new version to set
+   * @return true if claimed successfully, false if conflict detected
+   */
+  private boolean claimTableVersionWithSnapshot(long tblId, String versionParamKey, 
+      long expectedVersion, long newVersion) {
+    try {
+      // ✅ MVCC: Atomically increment version using single UPDATE statement
+      // Database MVCC ensures proper serialization of concurrent updates
+      
+      DatabaseProduct dbProduct = sqlGenerator.getDbProduct();
+      String s = dbType.getPrepareTxnStmt();
+      assert pm.currentTransaction().isActive();
+      JDOConnection jdoConn = pm.getDataStoreConnection();
+      Connection conn = (Connection) jdoConn.getNativeConnection();
+      
+      try {
+          String updateSQL = "UPDATE \"TABLE_PARAMS\" " +
+              "SET \"PARAM_VALUE\" = '" + newVersion + "' " +
+              "WHERE \"TBL_ID\" = " + tblId + 
+              " AND \"PARAM_KEY\" = '" + versionParamKey + "' " +
+              " AND \"PARAM_VALUE\" = '" + expectedVersion + "'";  // ✅ CHECK SNAPSHOT!
+          
+          try (Statement statement = conn.createStatement()) {
+            if (s != null) {
+              statement.execute(s);
+            }
+            int updCount = statement.executeUpdate(updateSQL);
+            
+            if (updCount == 1) {
+              // Successfully claimed! Snapshot matched
+              return true;
+            }
+            // updCount == 0: fall through to INSERT (first time)
+          }
+        
+        // First time (no row exists) - try INSERT with version 1
+        // This only happens when expectedVersion = 0
+        if (expectedVersion == 0) {
+          String insertSQL = "INSERT INTO \"TABLE_PARAMS\" (\"TBL_ID\", \"PARAM_KEY\", \"PARAM_VALUE\") " +
+              "VALUES (" + tblId + ", '" + versionParamKey + "', '" + newVersion + "')";
+          
+          try (Statement statement = conn.createStatement()) {
+            if (s != null) {
+              statement.execute(s);
+            }
+            int insertCount = statement.executeUpdate(insertSQL);
+            if (insertCount == 1) {
+              return true; // Successfully inserted
+            }
+          } catch (SQLException e) {
+            // Duplicate key - another transaction inserted concurrently (conflict)
+            LOG.debug("Concurrent insert detected for tblId={}, conflict", tblId);
+            return false;
+          }
+        }
+        
+        // Snapshot mismatch (conflict)
+        return false;
+        
+      } finally {
+        jdoConn.close();
+      }
+      
+    } catch (Exception e) {
+      LOG.error("Failed to claim table version for tblId={}", tblId, e);
+      // On error, treat as conflict to trigger retry
+      return false;
+    }
+  }
 }


@dengzhhu653
Copy link
Member Author

Thank you @deniskuzZ for the comment.

 String updateSQL = "UPDATE \"TABLE_PARAMS\" " +   "SET \"PARAM_VALUE\" = '" + newVersion + "' " +
          "WHERE \"TBL_ID\" = " + tblId + " AND \"PARAM_KEY\" = '" + versionParamKey + "' " +
          AND \"PARAM_VALUE\" = '" + expectedVersion + "'";  // ✅ CHECK SNAPSHOT!

The result of this query seems important to the example, let's say there is a row(TBL_ID, PARAM_KEY, PARAM_VALUE) (1, hive.metastore.table.version, 1) on the table TABLE_PARAMS, if transaction A and B happens to execute the update(set hive.metastore.table.version = 2) at the same time, say if A takes the row lock, then B needs to wait for A committing or rollbacking to release the row before B is allowed to update this row, then B re-evaluates the where condition and see there is no row matched, then return 0.

If there are more transactions to update this row, then they are piled up to get a change to take over the row lock. In my opinion, this is similar to the s4u way I proposed in the old PR.

@dengzhhu653
Copy link
Member Author

I tried the similar update on MySQL, the black transaction is waiting until "Lock wait timeout exceeded",
Screenshot 2025-11-13 at 09 06 43

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants