-
Notifications
You must be signed in to change notification settings - Fork 3.4k
HBASE-29656 Scan WALs to identify bulkload operations for incremental backup #7400
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 4 commits
9db5f51
fa71d8d
12f1d39
d218c57
0bc69a0
a2dd167
4f414d2
b4f88b3
730dac3
192353a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -20,7 +20,6 @@ | |||||||||||||||||||||
| import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.CONF_CONTINUOUS_BACKUP_WAL_DIR; | ||||||||||||||||||||||
| import static org.apache.hadoop.hbase.backup.BackupRestoreConstants.JOB_NAME_CONF_KEY; | ||||||||||||||||||||||
| import static org.apache.hadoop.hbase.backup.replication.ContinuousBackupReplicationEndpoint.ONE_DAY_IN_MILLISECONDS; | ||||||||||||||||||||||
| import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.BULKLOAD_FILES_DIR; | ||||||||||||||||||||||
| import static org.apache.hadoop.hbase.backup.util.BackupFileSystemManager.WALS_DIR; | ||||||||||||||||||||||
| import static org.apache.hadoop.hbase.backup.util.BackupUtils.DATE_FORMAT; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
@@ -37,6 +36,7 @@ | |||||||||||||||||||||
| import java.util.Map; | ||||||||||||||||||||||
| import java.util.Set; | ||||||||||||||||||||||
| import java.util.TimeZone; | ||||||||||||||||||||||
| import java.util.stream.Collectors; | ||||||||||||||||||||||
| import org.apache.commons.io.FilenameUtils; | ||||||||||||||||||||||
| import org.apache.commons.lang3.StringUtils; | ||||||||||||||||||||||
| import org.apache.hadoop.fs.FileStatus; | ||||||||||||||||||||||
|
|
@@ -55,6 +55,7 @@ | |||||||||||||||||||||
| import org.apache.hadoop.hbase.backup.mapreduce.MapReduceBackupCopyJob; | ||||||||||||||||||||||
| import org.apache.hadoop.hbase.backup.mapreduce.MapReduceHFileSplitterJob; | ||||||||||||||||||||||
| import org.apache.hadoop.hbase.backup.util.BackupUtils; | ||||||||||||||||||||||
| import org.apache.hadoop.hbase.backup.util.BulkFilesCollector; | ||||||||||||||||||||||
| import org.apache.hadoop.hbase.client.Admin; | ||||||||||||||||||||||
| import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; | ||||||||||||||||||||||
| import org.apache.hadoop.hbase.client.Connection; | ||||||||||||||||||||||
|
|
@@ -86,6 +87,7 @@ | |||||||||||||||||||||
| @InterfaceAudience.Private | ||||||||||||||||||||||
| public class IncrementalTableBackupClient extends TableBackupClient { | ||||||||||||||||||||||
| private static final Logger LOG = LoggerFactory.getLogger(IncrementalTableBackupClient.class); | ||||||||||||||||||||||
| private static final String BULKLOAD_COLLECTOR_OUTPUT = "bulkload-collector-output"; | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| protected IncrementalTableBackupClient() { | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
@@ -137,89 +139,88 @@ protected static int getIndex(TableName tbl, List<TableName> sTableList) { | |||||||||||||||||||||
| * the backup is marked as complete. | ||||||||||||||||||||||
| * @param tablesToBackup list of tables to be backed up | ||||||||||||||||||||||
| */ | ||||||||||||||||||||||
| protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup) throws IOException { | ||||||||||||||||||||||
| protected List<BulkLoad> handleBulkLoad(List<TableName> tablesToBackup, | ||||||||||||||||||||||
| Map<TableName, List<String>> tablesToWALFileList, Map<TableName, Long> tablesToPrevBackupTs) | ||||||||||||||||||||||
| throws IOException { | ||||||||||||||||||||||
| Map<TableName, MergeSplitBulkloadInfo> toBulkload = new HashMap<>(); | ||||||||||||||||||||||
| List<BulkLoad> bulkLoads; | ||||||||||||||||||||||
| if (backupInfo.isContinuousBackupEnabled()) { | ||||||||||||||||||||||
| bulkLoads = | ||||||||||||||||||||||
| backupManager.readBulkloadRows(tablesToBackup, backupInfo.getIncrCommittedWalTs()); | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| bulkLoads = backupManager.readBulkloadRows(tablesToBackup); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| List<BulkLoad> bulkLoads = new ArrayList<>(); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| FileSystem tgtFs; | ||||||||||||||||||||||
| try { | ||||||||||||||||||||||
| tgtFs = FileSystem.get(new URI(backupInfo.getBackupRootDir()), conf); | ||||||||||||||||||||||
| } catch (URISyntaxException use) { | ||||||||||||||||||||||
| throw new IOException("Unable to get FileSystem", use); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Path rootdir = CommonFSUtils.getRootDir(conf); | ||||||||||||||||||||||
| Path tgtRoot = new Path(new Path(backupInfo.getBackupRootDir()), backupId); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| for (BulkLoad bulkLoad : bulkLoads) { | ||||||||||||||||||||||
| TableName srcTable = bulkLoad.getTableName(); | ||||||||||||||||||||||
| MergeSplitBulkloadInfo bulkloadInfo = | ||||||||||||||||||||||
| toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new); | ||||||||||||||||||||||
| String regionName = bulkLoad.getRegion(); | ||||||||||||||||||||||
| String fam = bulkLoad.getColumnFamily(); | ||||||||||||||||||||||
| String filename = FilenameUtils.getName(bulkLoad.getHfilePath()); | ||||||||||||||||||||||
| if (!backupInfo.isContinuousBackupEnabled()) { | ||||||||||||||||||||||
| bulkLoads = backupManager.readBulkloadRows(tablesToBackup); | ||||||||||||||||||||||
| for (BulkLoad bulkLoad : bulkLoads) { | ||||||||||||||||||||||
| TableName srcTable = bulkLoad.getTableName(); | ||||||||||||||||||||||
| MergeSplitBulkloadInfo bulkloadInfo = | ||||||||||||||||||||||
| toBulkload.computeIfAbsent(srcTable, MergeSplitBulkloadInfo::new); | ||||||||||||||||||||||
| String regionName = bulkLoad.getRegion(); | ||||||||||||||||||||||
| String fam = bulkLoad.getColumnFamily(); | ||||||||||||||||||||||
| String filename = FilenameUtils.getName(bulkLoad.getHfilePath()); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if (!tablesToBackup.contains(srcTable)) { | ||||||||||||||||||||||
| LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable); | ||||||||||||||||||||||
| continue; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
||||||||||||||||||||||
| Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); | ||||||||||||||||||||||
| Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| String srcTableQualifier = srcTable.getQualifierAsString(); | ||||||||||||||||||||||
| String srcTableNs = srcTable.getNamespaceAsString(); | ||||||||||||||||||||||
| Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier | ||||||||||||||||||||||
| + Path.SEPARATOR + regionName + Path.SEPARATOR + fam); | ||||||||||||||||||||||
| if (!tgtFs.mkdirs(tgtFam)) { | ||||||||||||||||||||||
| throw new IOException("couldn't create " + tgtFam); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| Path tgt = new Path(tgtFam, filename); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| Path archiveDir = HFileArchiveUtil.getStoreArchivePath(conf, srcTable, regionName, fam); | ||||||||||||||||||||||
| Path archive = new Path(archiveDir, filename); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| if (!tablesToBackup.contains(srcTable)) { | ||||||||||||||||||||||
| LOG.debug("Skipping {} since it is not in tablesToBackup", srcTable); | ||||||||||||||||||||||
| continue; | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| Path tblDir = CommonFSUtils.getTableDir(rootdir, srcTable); | ||||||||||||||||||||||
| Path p = new Path(tblDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| // For continuous backup: bulkload files are copied from backup directory defined by | ||||||||||||||||||||||
| // CONF_CONTINUOUS_BACKUP_WAL_DIR instead of source cluster. | ||||||||||||||||||||||
| String backupRootDir = conf.get(CONF_CONTINUOUS_BACKUP_WAL_DIR); | ||||||||||||||||||||||
| if (backupInfo.isContinuousBackupEnabled() && !Strings.isNullOrEmpty(backupRootDir)) { | ||||||||||||||||||||||
| String dayDirectoryName = BackupUtils.formatToDateString(bulkLoad.getTimestamp()); | ||||||||||||||||||||||
| Path bulkLoadBackupPath = | ||||||||||||||||||||||
| new Path(backupRootDir, BULKLOAD_FILES_DIR + Path.SEPARATOR + dayDirectoryName); | ||||||||||||||||||||||
| Path bulkLoadDir = new Path(bulkLoadBackupPath, | ||||||||||||||||||||||
| srcTable.getNamespaceAsString() + Path.SEPARATOR + srcTable.getNameAsString()); | ||||||||||||||||||||||
| FileSystem backupFs = FileSystem.get(bulkLoadDir.toUri(), conf); | ||||||||||||||||||||||
| Path fullBulkLoadBackupPath = | ||||||||||||||||||||||
| new Path(bulkLoadDir, regionName + Path.SEPARATOR + fam + Path.SEPARATOR + filename); | ||||||||||||||||||||||
| if (backupFs.exists(fullBulkLoadBackupPath)) { | ||||||||||||||||||||||
| LOG.debug("Backup bulkload file found {}", fullBulkLoadBackupPath); | ||||||||||||||||||||||
| p = fullBulkLoadBackupPath; | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| LOG.warn("Backup bulkload file not found {}", fullBulkLoadBackupPath); | ||||||||||||||||||||||
| if (fs.exists(p)) { | ||||||||||||||||||||||
| if (LOG.isTraceEnabled()) { | ||||||||||||||||||||||
| LOG.trace("found bulk hfile {} in {} for {}", bulkLoad.getHfilePath(), p.getParent(), | ||||||||||||||||||||||
| srcTableQualifier); | ||||||||||||||||||||||
| LOG.trace("copying {} to {}", p, tgt); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| bulkloadInfo.addActiveFile(p.toString()); | ||||||||||||||||||||||
| } else if (fs.exists(archive)) { | ||||||||||||||||||||||
| LOG.debug("copying archive {} to {}", archive, tgt); | ||||||||||||||||||||||
| bulkloadInfo.addArchiveFiles(archive.toString()); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
| String srcTableQualifier = srcTable.getQualifierAsString(); | ||||||||||||||||||||||
| String srcTableNs = srcTable.getNamespaceAsString(); | ||||||||||||||||||||||
| Path tgtFam = new Path(tgtRoot, srcTableNs + Path.SEPARATOR + srcTableQualifier | ||||||||||||||||||||||
| + Path.SEPARATOR + regionName + Path.SEPARATOR + fam); | ||||||||||||||||||||||
| if (!tgtFs.mkdirs(tgtFam)) { | ||||||||||||||||||||||
| throw new IOException("couldn't create " + tgtFam); | ||||||||||||||||||||||
| for (MergeSplitBulkloadInfo bulkloadInfo : toBulkload.values()) { | ||||||||||||||||||||||
| mergeSplitAndCopyBulkloadedHFiles(bulkloadInfo.getActiveFiles(), | ||||||||||||||||||||||
| bulkloadInfo.getArchiveFiles(), bulkloadInfo.getSrcTable(), tgtFs); | ||||||||||||||||||||||
| } | ||||||||||||||||||||||
| Path tgt = new Path(tgtFam, filename); | ||||||||||||||||||||||
| } else { | ||||||||||||||||||||||
| // Continuous incremental backup: run BulkLoadCollectorJob over backed-up WALs | ||||||||||||||||||||||
| Path collectorOutput = new Path(getBulkOutputDir(), BULKLOAD_COLLECTOR_OUTPUT); | ||||||||||||||||||||||
| for (TableName table : tablesToBackup) { | ||||||||||||||||||||||
| String walDirsCsv = String.join(",", tablesToWALFileList.get(table)); | ||||||||||||||||||||||
|
||||||||||||||||||||||
| String walDirsCsv = String.join(",", tablesToWALFileList.get(table)); | |
| List<String> walDirs = tablesToWALFileList.get(table); | |
| String walDirsCsv = String.join(",", walDirs != null ? walDirs : java.util.Collections.emptyList()); |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than calling BulkFilesCollector directly, we can use the org.apache.hadoop.hbase.backup.impl.AbstractPitrRestoreHandler#collectBulkFiles() method, which serves as a higher-level approach and internally invokes BulkFilesCollector.collectFromWalDirs(). This helps us avoid duplicating code. In both restore and incremental backup scenarios, we need to extract bulkload files by reading WAL files within a given time range, so it makes sense to have a single logic for this. We should consider placing this common logic in a utility class under the util package.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BulkFilesCollector#collectFromWalDirs() is itself a utility function. I have computed valid WAL directory using BackupUtils#getValidWalDirs() once already in IncrementalTableBackupClient#convertWALsToHFiles() so here I am reusing that. If I call AbstractPitrRestoreHandler#collectBulkFiles() it would again call BackupUtils#getValidWalDirs()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with @ankitsol . This class should not make a call to an abstract class - you would have to make the method public -, instead move more logic to the utility class if you want to share more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BulkFilesCollector#collectFromWalDirs()is itself a utility function. I have computed valid WAL directory usingBackupUtils#getValidWalDirs()once already inIncrementalTableBackupClient#convertWALsToHFiles()so here I am reusing that. If I callAbstractPitrRestoreHandler#collectBulkFiles()it would again callBackupUtils#getValidWalDirs()
Consider passing that as a parameter. Adjust the original methods as minimally as possible to accommodate both scenarios.
This class should not make a call to an abstract class
No, as mentioned earlier, we should move the shared elements to a utility class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider passing that as a parameter. Adjust the original methods as minimally as possible to accommodate both scenarios.
Please elaborate
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I call AbstractPitrRestoreHandler#collectBulkFiles() it would again call BackupUtils#getValidWalDirs()
instead of calling BackupUtils#getValidWalDirs() inside AbstractPitrRestoreHandler#collectBulkFiles(), take the output of BackupUtils#getValidWalDirs() as parameter.
Outdated
Copilot
AI
Oct 21, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential NullPointerException if tablesToPrevBackupTs.get(table) returns null. The map may not contain an entry for the table if no previous backup exists, which would cause an NPE when the primitive long is expected.
| tablesToPrevBackupTs.get(table), backupInfo.getIncrCommittedWalTs()); | |
| tablesToPrevBackupTs.get(table) != null ? tablesToPrevBackupTs.get(table) : 0L, backupInfo.getIncrCommittedWalTs()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] using continue may align the style with the other loop of !backupInfo.isContinuousBackupEnabled()
| if (bulkLoadFiles.isEmpty()) { | |
| LOG.info("No bulk-load files found for table {}", table); | |
| } else { | |
| mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs); | |
| } | |
| if (bulkLoadFiles.isEmpty()) { | |
| LOG.info("No bulk-load files found for table {}", table); | |
| continue; | |
| } | |
| mergeSplitAndCopyBulkloadedHFiles(bulkLoadFiles, table, tgtFs); |
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid duplicating code. We already have similar functionality for retrieving log files within a time range in org.apache.hadoop.hbase.backup.impl.AbstractPitrRestoreHandler#getValidWalDirs. Can we use that instead? We could move the file to a common location such as src/main/java/org/apache/hadoop/hbase/backup/util.
taklwu marked this conversation as resolved.
Show resolved
Hide resolved
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -38,6 +38,7 @@ | |
| import org.apache.hadoop.hbase.backup.BackupType; | ||
| import org.apache.hadoop.hbase.backup.HBackupFileSystem; | ||
| import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage; | ||
| import org.apache.hadoop.hbase.backup.util.BackupUtils; | ||
| import org.apache.hadoop.hbase.client.Admin; | ||
| import org.apache.hadoop.hbase.client.Connection; | ||
| import org.apache.hadoop.hbase.util.CommonFSUtils; | ||
|
|
@@ -113,6 +114,12 @@ protected void beginBackup(BackupManager backupManager, BackupInfo backupInfo) | |
| // set the start timestamp of the overall backup | ||
| long startTs = EnvironmentEdgeManager.currentTime(); | ||
| backupInfo.setStartTs(startTs); | ||
| if (backupInfo.getType() == BackupType.INCREMENTAL && backupInfo.isContinuousBackupEnabled()) { | ||
|
||
| // committedWALsTs is needed only for Incremental backups with continuous backup | ||
| // since these do not depend on log roll ts | ||
| long committedWALsTs = BackupUtils.getReplicationCheckpoint(conn); | ||
| backupInfo.setIncrCommittedWalTs(committedWALsTs); | ||
| } | ||
| // set overall backup status: ongoing | ||
| backupInfo.setState(BackupState.RUNNING); | ||
| backupInfo.setPhase(BackupPhase.REQUEST); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nit] do you see a lot of entries before this change that keeps registering for the same table? if so and if this is not only unit test, do you think it's a logic error from that trigger?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a suggestion. Perhaps we could add a comment stating that for continuous backup, this isn't necessary, as everything will be utilized from the WAL backup location.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I didn't understand the question completely. This
BackupObserver#registerBulkLoad()is called for each bulkload operation and registers them in backup system tableThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
after adding the comment should have addressed my concerns, and yeah
!continuousBackupTableSet.containsKey(tableName)means onlynon-continuous backupneed this register bulkload.