Skip to content

Commit 51934cf

Browse files
ankitsolasolomonC
andauthored
HBASE-29656 Scan WALs to identify bulkload operations for incremental backup (#7400)
* Scan WALs to identify bulkload operations for incremental backup * Update unit test * Info log * Minor test fix * Address review comments * Spotless apply * Addressed review comment * spotless * Remove log * Retrigger CI --------- Co-authored-by: Ankit Solomon <[email protected]>
1 parent 6e9561e commit 51934cf

File tree

7 files changed

+182
-201
lines changed

7 files changed

+182
-201
lines changed

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/BackupObserver.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,12 +96,18 @@ private void registerBulkLoad(ObserverContext<? extends RegionCoprocessorEnviron
9696
try (Connection connection = ConnectionFactory.createConnection(cfg);
9797
BackupSystemTable tbl = new BackupSystemTable(connection)) {
9898
Set<TableName> fullyBackedUpTables = tbl.getTablesIncludedInBackups();
99+
Map<TableName, Long> continuousBackupTableSet = tbl.getContinuousBackupTableSet();
99100

100-
if (fullyBackedUpTables.contains(tableName)) {
101+
// Tables in continuousBackupTableSet do not rely on BackupSystemTable but rather
102+
// scan on WAL backup directory to identify bulkload operation HBASE-29656
103+
if (
104+
fullyBackedUpTables.contains(tableName) && !continuousBackupTableSet.containsKey(tableName)
105+
) {
101106
tbl.registerBulkLoad(tableName, region.getEncodedNameAsBytes(), cfToHFilePaths);
102107
} else {
103108
if (LOG.isTraceEnabled()) {
104-
LOG.trace("Table {} has not gone through full backup - skipping.", tableName);
109+
LOG.trace("Table {} has either not gone through full backup or is "
110+
+ "part of continuousBackupTableSet - skipping", tableName);
105111
}
106112
}
107113
}

hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java

Lines changed: 3 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,16 @@
2020
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
2121
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR;
2222
import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.DEFAULT_CONTINUOUS_BACKUP_PITR_WINDOW_DAYS;
23-
import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS;
24-
import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR;
25-
import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT;
2623
import static org.apache.hadoop.hbase.mapreduce.WALPlayer.IGNORE_EMPTY_FILES;
2724

2825
import java.io.IOException;
29-
import java.text.ParseException;
30-
import java.text.SimpleDateFormat;
3126
import java.util.ArrayList;
3227
import java.util.Arrays;
33-
import java.util.Collections;
34-
import java.util.Date;
3528
import java.util.List;
3629
import java.util.Map;
3730
import java.util.concurrent.TimeUnit;
3831
import java.util.stream.Collectors;
3932
import org.apache.hadoop.conf.Configuration;
40-
import org.apache.hadoop.fs.FileStatus;
41-
import org.apache.hadoop.fs.FileSystem;
4233
import org.apache.hadoop.fs.Path;
4334
import org.apache.hadoop.hbase.HBaseConfiguration;
4435
import org.apache.hadoop.hbase.TableName;
@@ -47,7 +38,6 @@
4738
import org.apache.hadoop.hbase.backup.RestoreJob;
4839
import org.apache.hadoop.hbase.backup.RestoreRequest;
4940
import org.apache.hadoop.hbase.backup.util.BackupUtils;
50-
import org.apache.hadoop.hbase.backup.util.BulkFilesCollector;
5141
import org.apache.hadoop.hbase.client.Connection;
5242
import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
5343
import org.apache.hadoop.hbase.mapreduce.WALPlayer;
@@ -342,8 +332,8 @@ private void reBulkloadFiles(TableName sourceTable, TableName targetTable, long
342332

343333
RestoreJob restoreService = BackupRestoreFactory.getRestoreJob(conf);
344334

345-
List<Path> bulkloadFiles =
346-
collectBulkFiles(sourceTable, targetTable, startTime, endTime, new Path(restoreRootDir));
335+
List<Path> bulkloadFiles = BackupUtils.collectBulkFiles(conn, sourceTable, targetTable,
336+
startTime, endTime, new Path(restoreRootDir), new ArrayList<String>());
347337

348338
if (bulkloadFiles.isEmpty()) {
349339
LOG.info("No bulk-load files found for {} in time range {}-{}. Skipping bulkload restore.",
@@ -380,7 +370,7 @@ private void replayWal(TableName sourceTable, TableName targetTable, long startT
380370
sourceTable, targetTable, startTime, endTime, walDirPath);
381371

382372
List<String> validDirs =
383-
getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
373+
BackupUtils.getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
384374
if (validDirs.isEmpty()) {
385375
LOG.warn("No valid WAL directories found for range {} - {}. Skipping WAL replay.", startTime,
386376
endTime);
@@ -390,62 +380,6 @@ private void replayWal(TableName sourceTable, TableName targetTable, long startT
390380
executeWalReplay(validDirs, sourceTable, targetTable, startTime, endTime);
391381
}
392382

393-
private List<Path> collectBulkFiles(TableName sourceTable, TableName targetTable, long startTime,
394-
long endTime, Path restoreRootDir) throws IOException {
395-
396-
String walBackupDir = conn.getConfiguration().get(CONF_CONTINUOUS_BACKUP_WAL_DIR);
397-
Path walDirPath = new Path(walBackupDir);
398-
LOG.info(
399-
"Starting WAL bulk-file collection for source: {}, target: {}, time range: {} - {}, WAL backup dir: {}, restore root: {}",
400-
sourceTable, targetTable, startTime, endTime, walDirPath, restoreRootDir);
401-
402-
List<String> validDirs =
403-
getValidWalDirs(conn.getConfiguration(), walDirPath, startTime, endTime);
404-
if (validDirs.isEmpty()) {
405-
LOG.warn("No valid WAL directories found for range {} - {}. Skipping bulk-file collection.",
406-
startTime, endTime);
407-
return Collections.emptyList();
408-
}
409-
410-
String walDirsCsv = String.join(",", validDirs);
411-
412-
return BulkFilesCollector.collectFromWalDirs(HBaseConfiguration.create(conn.getConfiguration()),
413-
walDirsCsv, restoreRootDir, sourceTable, targetTable, startTime, endTime);
414-
}
415-
416-
/**
417-
* Fetches valid WAL directories based on the given time range.
418-
*/
419-
private List<String> getValidWalDirs(Configuration conf, Path walBackupDir, long startTime,
420-
long endTime) throws IOException {
421-
FileSystem backupFs = FileSystem.get(walBackupDir.toUri(), conf);
422-
FileStatus[] dayDirs = backupFs.listStatus(new Path(walBackupDir, WALS_DIR));
423-
424-
List<String> validDirs = new ArrayList<>();
425-
SimpleDateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
426-
427-
for (FileStatus dayDir : dayDirs) {
428-
if (!dayDir.isDirectory()) {
429-
continue; // Skip files, only process directories
430-
}
431-
432-
String dirName = dayDir.getPath().getName();
433-
try {
434-
Date dirDate = dateFormat.parse(dirName);
435-
long dirStartTime = dirDate.getTime(); // Start of that day (00:00:00)
436-
long dirEndTime = dirStartTime + ONE_DAY_IN_MILLISECONDS - 1; // End time of day (23:59:59)
437-
438-
// Check if this day's WAL files overlap with the required time range
439-
if (dirEndTime >= startTime && dirStartTime <= endTime) {
440-
validDirs.add(dayDir.getPath().toString());
441-
}
442-
} catch (ParseException e) {
443-
LOG.warn("Skipping invalid directory name: {}", dirName, e);
444-
}
445-
}
446-
return validDirs;
447-
}
448-
449383
/**
450384
* Executes WAL replay using WALPlayer.
451385
*/

0 commit comments

Comments
 (0)