Skip to content

Commit 388e806

Browse files
authored
HIVE-29272: Query-based MINOR compaction should not consider minOpenWriteId (#6143)
+ Fix the output directory generation in MergeCompactor
1 parent 7c15c5d commit 388e806

14 files changed

+125
-49
lines changed

itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3369,7 +3369,7 @@ public void testMinorCompactionAfterMajorWithMerge() throws Exception {
33693369
testCompactionWithMerge(CompactionType.MINOR, false, false, null,
33703370
Collections.singletonList("bucket_00000"),
33713371
Arrays.asList("delta_0000004_0000004_0000", "delta_0000005_0000005_0000", "delta_0000006_0000006_0000"),
3372-
Collections.singletonList("delta_0000001_0000006_v0000013"), false, true, false);
3372+
Collections.singletonList("delta_0000004_0000006_v0000013"), false, true, false);
33733373
}
33743374

33753375
@Test
@@ -3709,4 +3709,97 @@ public void testMajorCompactionUpdateMissingColumnStatsOfPartition() throws Exce
37093709

37103710
Assert.assertEquals(3, StatsSetupConst.getColumnsHavingStats(partition.getParameters()).size());
37113711
}
3712+
3713+
@Test
3714+
public void testMinorWithAbortedAndOpenTnx() throws Exception {
3715+
String dbName = "default";
3716+
String tableName = "testAbortedAndOpenTnxTbl";
3717+
// Create test table
3718+
TestDataProvider testDataProvider = new TestDataProvider();
3719+
testDataProvider.createFullAcidTable(tableName, false, false);
3720+
IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
3721+
Table table = metaStoreClient.getTable(dbName, tableName);
3722+
FileSystem fs = FileSystem.get(conf);
3723+
3724+
// Abort the first insert transaction
3725+
driver.getConf().setBoolVar(HiveConf.ConfVars.HIVE_TEST_MODE_ROLLBACK_TXN, true);
3726+
testDataProvider.insertOnlyTestData(tableName, 1);
3727+
driver.getConf().setBoolVar(HiveConf.ConfVars.HIVE_TEST_MODE_ROLLBACK_TXN, false);
3728+
// Do threee successful insert to create 3 deltas
3729+
testDataProvider.insertOnlyTestData(tableName, 3);
3730+
3731+
// Start an insert and leave it open when the compaction is running
3732+
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
3733+
StreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tableName)
3734+
.withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter(writer)
3735+
.withTransactionBatchSize(1).connect();
3736+
connection.beginTransaction();
3737+
connection.write("4,4".getBytes());
3738+
// Run query-based MINOR compaction
3739+
CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
3740+
// Finish the open transaction
3741+
connection.commitTransaction();
3742+
connection.close();
3743+
List<String> expectedData = testDataProvider.getAllData(tableName, false);
3744+
// Run cleaner. It is expected to delete all deltas except the one created by the compaction and the one belong to the open transaction.
3745+
CompactorTestUtil.runCleaner(conf);
3746+
3747+
verifySuccessfulCompaction(1);
3748+
List<String> resultData = testDataProvider.getAllData(tableName);
3749+
Assert.assertEquals(expectedData, resultData);
3750+
List<String> deltas = CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
3751+
Assert.assertEquals(2, deltas.size());
3752+
Assert.assertEquals("Delta directory names are not matching after compaction",
3753+
Arrays.asList("delta_0000002_0000004_v0000007", "delta_0000005_0000005"), deltas);
3754+
for (String delta: deltas) {
3755+
// Check if none of the delta directories are empty
3756+
List<String> files = CompactorTestUtil.getBucketFileNames(fs, table, null, delta);
3757+
Assert.assertFalse(files.isEmpty());
3758+
}
3759+
}
3760+
3761+
@Test
3762+
public void testMinorWithOpenTnx() throws Exception {
3763+
String dbName = "default";
3764+
String tableName = "testAbortedAndOpenTnxTbl";
3765+
// Create test table
3766+
TestDataProvider testDataProvider = new TestDataProvider();
3767+
testDataProvider.createFullAcidTable(tableName, false, false);
3768+
IMetaStoreClient metaStoreClient = new HiveMetaStoreClient(conf);
3769+
Table table = metaStoreClient.getTable(dbName, tableName);
3770+
FileSystem fs = FileSystem.get(conf);
3771+
3772+
// Do threee successful insert to create 3 deltas
3773+
testDataProvider.insertOnlyTestData(tableName, 3);
3774+
3775+
// Start an insert and leave it open when the compaction is running
3776+
StrictDelimitedInputWriter writer = StrictDelimitedInputWriter.newBuilder().withFieldDelimiter(',').build();
3777+
StreamingConnection connection = HiveStreamingConnection.newBuilder().withDatabase(dbName).withTable(tableName)
3778+
.withAgentInfo("UT_" + Thread.currentThread().getName()).withHiveConf(conf).withRecordWriter(writer)
3779+
.withTransactionBatchSize(1).connect();
3780+
connection.beginTransaction();
3781+
connection.write("4,4".getBytes());
3782+
// Run query-based MINOR compaction
3783+
CompactorTestUtil.runCompaction(conf, dbName, tableName, CompactionType.MINOR, true);
3784+
// Finish the open transaction
3785+
connection.commitTransaction();
3786+
connection.close();
3787+
List<String> expectedData = testDataProvider.getAllData(tableName, false);
3788+
// Run cleaner. It is expected to delete all deltas except the one created by the compaction and the one belong to the open transaction.
3789+
CompactorTestUtil.runCleaner(conf);
3790+
3791+
verifySuccessfulCompaction(1);
3792+
List<String> resultData = testDataProvider.getAllData(tableName);
3793+
Assert.assertEquals(expectedData, resultData);
3794+
List<String> deltas = CompactorTestUtil.getBaseOrDeltaNames(fs, AcidUtils.deltaFileFilter, table, null);
3795+
Assert.assertEquals(2, deltas.size());
3796+
Assert.assertEquals("Delta directory names are not matching after compaction",
3797+
Arrays.asList("delta_0000001_0000003_v0000006", "delta_0000004_0000004"), deltas);
3798+
for (String delta: deltas) {
3799+
// Check if none of the delta directories are empty
3800+
List<String> files = CompactorTestUtil.getBucketFileNames(fs, table, null, delta);
3801+
Assert.assertFalse(files.isEmpty());
3802+
}
3803+
}
3804+
37123805
}

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
import org.apache.hadoop.hive.ql.io.AcidUtils;
3131
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
3232
import org.apache.hive.common.util.HiveStringUtils;
33+
import org.slf4j.Logger;
34+
import org.slf4j.LoggerFactory;
3335

3436
import java.util.Arrays;
3537
import java.util.ArrayList;
@@ -39,6 +41,9 @@
3941
import java.util.stream.Collectors;
4042

4143
abstract class CompactionQueryBuilder {
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(CompactionQueryBuilder.class.getName());
46+
4247
// required fields, set in constructor
4348
protected Operation operation;
4449
protected String resultTableName;
@@ -317,15 +322,20 @@ protected void addTblProperties(StringBuilder query, Map<String, String> tblProp
317322

318323
private void buildAddClauseForAlter(StringBuilder query) {
319324
if (validWriteIdList == null || dir == null) {
325+
LOG.warn("There is no delta to be added as partition to the temp external table used by the minor compaction. " +
326+
"This may result an empty compaction directory.");
320327
query.setLength(0);
321328
return; // avoid NPEs, don't throw an exception but return an empty query
322329
}
323-
long minWriteID = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId();
324330
long highWatermark = validWriteIdList.getHighWatermark();
325331
List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories().stream().filter(
326-
delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark && delta.getMinWriteId() >= minWriteID)
332+
delta -> delta.isDeleteDelta() == isDeleteDelta && delta.getMaxWriteId() <= highWatermark)
327333
.collect(Collectors.toList());
328334
if (deltas.isEmpty()) {
335+
String warnMsg = String.format("No %s delta is found below the highWaterMark %s to be added as partition " +
336+
"to the temp external table, used by the minor compaction. This may result an empty compaction directory.",
337+
isDeleteDelta ? "delete" : "", highWatermark);
338+
LOG.warn(warnMsg);
329339
query.setLength(0); // no alter query needed; clear StringBuilder
330340
return;
331341
}

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public boolean run(CompactorContext context) throws IOException {
4949

5050
String tmpTableName = getTempTableName(table);
5151
Path tmpTablePath = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds,
52-
conf, true, false, false, null);
52+
conf, true, false, null);
5353

5454
List<String> createQueries = getCreateQueries(tmpTableName, table, tmpTablePath.toString());
5555
List<String> compactionQueries = getCompactionQueries(table, context.getPartition(), tmpTableName);

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MergeCompactor.java

Lines changed: 2 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ public boolean run(CompactorContext context) throws IOException, HiveException,
5656

5757
if (isMergeCompaction(hiveConf, dir, storageDescriptor)) {
5858
// Only inserts happened, it is much more performant to merge the files than running a query
59-
Path outputDirPath = getOutputDirPath(hiveConf, writeIds,
60-
compactionInfo.isMajorCompaction(), storageDescriptor);
59+
Path outputDirPath = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds,
60+
hiveConf, compactionInfo.isMajorCompaction(), false, dir);
6161
try {
6262
return mergeFiles(hiveConf, compactionInfo.isMajorCompaction(),
6363
dir, outputDirPath, AcidUtils.isInsertOnlyTable(table.getParameters()));
@@ -161,27 +161,6 @@ private Map<Integer, List<Reader>> getBucketFiles(HiveConf conf, Path dirPath, b
161161
return bucketIdToBucketFilePath;
162162
}
163163

164-
/**
165-
* Generate output path for compaction. This can be used to generate delta or base directories.
166-
* @param conf hive configuration, must be non-null
167-
* @param writeIds list of valid write IDs
168-
* @param isBaseDir if base directory path should be generated
169-
* @param sd the resolved storadge descriptor
170-
* @return output path, always non-null
171-
*/
172-
private Path getOutputDirPath(HiveConf conf, ValidWriteIdList writeIds, boolean isBaseDir,
173-
StorageDescriptor sd) {
174-
long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId();
175-
long highWatermark = writeIds.getHighWatermark();
176-
long compactorTxnId = Compactor.getCompactorTxnId(conf);
177-
AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
178-
.writingBase(isBaseDir).writingDeleteDelta(false)
179-
.isCompressed(false)
180-
.minimumWriteId(minOpenWriteId).maximumWriteId(highWatermark)
181-
.statementId(-1).visibilityTxnId(compactorTxnId);
182-
return AcidUtils.baseOrDeltaSubdirPath(new Path(sd.getLocation()), options);
183-
}
184-
185164
/**
186165
* Merge files from base/delta directories. If the directories contains multiple buckets, the result will also
187166
* contain the same amount.

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@ public boolean run(CompactorContext context) throws IOException {
5959
table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + System.currentTimeMillis();
6060

6161
Path resultDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor,
62-
writeIds, conf, false, false, false, dir);
62+
writeIds, conf, false, false, dir);
6363
Path resultDeleteDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor,
64-
writeIds, conf, false, true, false, dir);
64+
writeIds, conf, false, true, dir);
6565

6666
List<String> createQueries = getCreateQueries(table, tmpTableName, dir, writeIds,
6767
resultDeltaDir, resultDeleteDeltaDir);

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public boolean run(CompactorContext context) throws IOException {
5656
// "insert overwrite directory" command if there were no bucketing or list bucketing.
5757
String tmpTableName = getTempTableName(table);
5858
Path resultBaseDir = QueryCompactor.Util.getCompactionResultDir(
59-
storageDescriptor, writeIds, driverConf, true, true, false, null);
59+
storageDescriptor, writeIds, driverConf, true, true, null);
6060

6161
List<String> createTableQueries = getCreateQueries(tmpTableName, table, storageDescriptor,
6262
resultBaseDir.toString());

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public boolean run(CompactorContext context) throws IOException {
5757
String tmpTableName = getTempTableName(table);
5858
String resultTmpTableName = tmpTableName + "_result";
5959
Path resultDeltaDir = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds, driverConf,
60-
false, false, false, dir);
60+
false, false, dir);
6161

6262
List<String> createTableQueries = getCreateQueries(tmpTableName, table, storageDescriptor, dir,
6363
writeIds, resultDeltaDir);
@@ -79,8 +79,9 @@ protected HiveConf setUpDriverSession(HiveConf hiveConf) {
7979
/**
8080
* Clean up the empty table dir of 'tmpTableName'.
8181
*/
82-
@Override protected void commitCompaction(String tmpTableName, HiveConf conf) throws IOException, HiveException {
83-
Util.cleanupEmptyTableDir(conf, tmpTableName);
82+
@Override
83+
protected void commitCompaction(String tmpTableName, HiveConf conf) throws IOException, HiveException {
84+
Util.cleanupEmptyTableDir(conf, tmpTableName + "_result");
8485
}
8586

8687
/**

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
2828
import org.apache.hadoop.hive.metastore.api.Table;
2929
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
30-
import org.apache.hadoop.hive.metastore.utils.StringableMap;
3130
import org.apache.hadoop.hive.ql.DriverUtils;
3231
import org.apache.hadoop.hive.ql.io.AcidDirectory;
3332
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
@@ -43,8 +42,6 @@
4342
import java.io.IOException;
4443
import java.util.List;
4544
import java.util.Map;
46-
import java.util.Objects;
47-
import java.util.stream.Stream;
4845

4946
import static org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.overrideConfProps;
5047

@@ -188,23 +185,19 @@ public static class Util {
188185
* @param conf HiveConf
189186
* @param writingBase if true, we are creating a base directory, otherwise a delta
190187
* @param createDeleteDelta if true, the delta dir we are creating is a delete delta
191-
* @param bucket0 whether to specify 0 as the bucketid
192188
* @param directory AcidUtils.Directory - only required for minor compaction result (delta) dirs
193189
*
194190
* @return Path of new base/delta/delete delta directory
195191
*/
196192
public static Path getCompactionResultDir(StorageDescriptor sd, ValidWriteIdList writeIds, HiveConf conf,
197-
boolean writingBase, boolean createDeleteDelta, boolean bucket0, AcidDirectory directory) {
193+
boolean writingBase, boolean createDeleteDelta, AcidDirectory directory) {
198194
long minWriteID = writingBase ? 1 : getMinWriteID(directory);
199195
long highWatermark = writeIds.getHighWatermark();
200196
long compactorTxnId = Compactor.getCompactorTxnId(conf);
201197
AcidOutputFormat.Options options =
202198
new AcidOutputFormat.Options(conf).isCompressed(false).minimumWriteId(minWriteID)
203199
.maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId)
204200
.writingBase(writingBase).writingDeleteDelta(createDeleteDelta);
205-
if (bucket0) {
206-
options = options.bucket(0);
207-
}
208201
Path location = new Path(sd.getLocation());
209202
return AcidUtils.baseOrDeltaSubdirPath(location, options);
210203
}

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public boolean run(CompactorContext context)
5252

5353
String tmpTableName = getTempTableName(table);
5454
Path tmpTablePath = QueryCompactor.Util.getCompactionResultDir(storageDescriptor, writeIds,
55-
conf, true, false, false, null);
55+
conf, true, false, null);
5656
int numBuckets = context.getCompactionInfo().numberOfBuckets;
5757
if (numBuckets <= 0) {
5858
//TODO: This is quite expensive, a better way should be found to get the number of buckets for an implicitly bucketed table

ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private AcidDirectory getAcidStateForWorker(CompactionInfo ci, StorageDescriptor
105105
public void cleanupResultDirs(CompactionInfo ci) {
106106
// result directory for compactor to write new files
107107
Path resultDir = QueryCompactor.Util.getCompactionResultDir(sd, tblValidWriteIds, conf,
108-
ci.type == CompactionType.MAJOR, false, false, dir);
108+
ci.type == CompactionType.MAJOR, false, dir);
109109
LOG.info("Deleting result directories created by the compactor:\n");
110110
try {
111111
FileSystem fs = resultDir.getFileSystem(conf);
@@ -114,7 +114,7 @@ public void cleanupResultDirs(CompactionInfo ci) {
114114

115115
if (ci.type == CompactionType.MINOR) {
116116
Path deleteDeltaDir = QueryCompactor.Util.getCompactionResultDir(sd, tblValidWriteIds, conf,
117-
false, true, false, dir);
117+
false, true, dir);
118118

119119
LOG.info(deleteDeltaDir.toString());
120120
fs.delete(deleteDeltaDir, true);

0 commit comments

Comments
 (0)