Skip to content

Commit fac5055

Browse files
author
ukumawat
committed
HBASE-28158 Decouple RIT list management from TRSP
1 parent e575525 commit fac5055

28 files changed

+204
-145
lines changed

hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/AssignmentManagerStatusTmpl.jamon

Whitespace-only changes.

hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1033,6 +1033,8 @@ private void finishActiveMasterInitialization() throws IOException, InterruptedE
10331033
Map<Class<?>, List<Procedure<MasterProcedureEnv>>> procsByType = procedureExecutor
10341034
.getActiveProceduresNoCopy().stream().collect(Collectors.groupingBy(p -> p.getClass()));
10351035

1036+
// This manager must be accessed AFTER hbase:meta is confirmed on line..
1037+
this.tableStateManager = new TableStateManager(this);
10361038
// Create Assignment Manager
10371039
this.assignmentManager = createAssignmentManager(this, masterRegion);
10381040
this.assignmentManager.start();
@@ -1056,8 +1058,6 @@ private void finishActiveMasterInitialization() throws IOException, InterruptedE
10561058
.map(p -> (ServerCrashProcedure) p).map(p -> p.getServerName()).collect(Collectors.toSet()),
10571059
Sets.union(rsListStorage.getAll(), walManager.getLiveServersFromWALDir()),
10581060
walManager.getSplittingServersFromWALDir());
1059-
// This manager must be accessed AFTER hbase:meta is confirmed on line..
1060-
this.tableStateManager = new TableStateManager(this);
10611061

10621062
startupTaskGroup.addTask("Initializing ZK system trackers");
10631063
initializeZKBasedSystemTrackers();
@@ -2007,7 +2007,7 @@ private void balanceThrottling(long nextBalanceStartTime, int maxRegionsInTransi
20072007
// But if there are zero regions in transition, it can skip sleep to speed up.
20082008
while (
20092009
!interrupted && EnvironmentEdgeManager.currentTime() < nextBalanceStartTime
2010-
&& this.assignmentManager.getRegionStates().hasRegionsInTransition()
2010+
&& this.assignmentManager.hasRegionsInTransition()
20112011
) {
20122012
try {
20132013
Thread.sleep(100);
@@ -2019,7 +2019,7 @@ private void balanceThrottling(long nextBalanceStartTime, int maxRegionsInTransi
20192019
// Throttling by max number regions in transition
20202020
while (
20212021
!interrupted && maxRegionsInTransition > 0
2022-
&& this.assignmentManager.getRegionStates().getRegionsInTransitionCount()
2022+
&& this.assignmentManager.getRegionsInTransitionCount()
20232023
>= maxRegionsInTransition
20242024
&& EnvironmentEdgeManager.currentTime() <= cutoffTime
20252025
) {
@@ -3081,7 +3081,7 @@ public ClusterMetrics getClusterMetricsWithoutCoprocessor(EnumSet<Option> option
30813081
case REGIONS_IN_TRANSITION: {
30823082
if (assignmentManager != null) {
30833083
builder.setRegionsInTransition(
3084-
assignmentManager.getRegionStates().getRegionsStateInTransition());
3084+
assignmentManager.getRegionsStateInTransition());
30853085
}
30863086
break;
30873087
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManager.java

Lines changed: 61 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,14 @@
2222
import java.util.ArrayList;
2323
import java.util.Collection;
2424
import java.util.Collections;
25+
import java.util.Comparator;
2526
import java.util.HashMap;
2627
import java.util.HashSet;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.Set;
31+
import java.util.SortedSet;
32+
import java.util.TreeSet;
3033
import java.util.concurrent.CompletableFuture;
3134
import java.util.concurrent.Future;
3235
import java.util.concurrent.TimeUnit;
@@ -232,12 +235,15 @@ public class AssignmentManager {
232235

233236
private final int forceRegionRetainmentRetries;
234237

238+
private final RegionInTransitionTracker regionInTransitionTracker;
239+
235240
public AssignmentManager(MasterServices master, MasterRegion masterRegion) {
236241
this(master, masterRegion, new RegionStateStore(master, masterRegion));
237242
}
238243

239244
AssignmentManager(MasterServices master, MasterRegion masterRegion, RegionStateStore stateStore) {
240245
this.master = master;
246+
regionInTransitionTracker = new RegionInTransitionTracker(master.getTableStateManager());
241247
this.regionStateStore = stateStore;
242248
this.metrics = new MetricsAssignmentManager();
243249
this.masterRegion = masterRegion;
@@ -411,6 +417,7 @@ public void stop() {
411417

412418
// Stop the RegionStateStore
413419
regionStates.clear();
420+
regionInTransitionTracker.clear();
414421

415422
// Update meta events (for testing)
416423
if (hasProcExecutor) {
@@ -1093,7 +1100,7 @@ private int submitUnassignProcedure(TableName tableName,
10931100
regionNode.lock();
10941101
try {
10951102
if (shouldSubmit.apply(regionNode)) {
1096-
if (regionNode.isInTransition()) {
1103+
if (regionNode.isOngoingTRSP()) {
10971104
logRIT.accept(regionNode);
10981105
inTransitionCount++;
10991106
continue;
@@ -1704,7 +1711,7 @@ public boolean isRegionTwiceOverThreshold(final RegionInfo regionInfo) {
17041711
protected void update(final AssignmentManager am) {
17051712
final RegionStates regionStates = am.getRegionStates();
17061713
this.statTimestamp = EnvironmentEdgeManager.currentTime();
1707-
update(regionStates.getRegionsStateInTransition(), statTimestamp);
1714+
update(am.getRegionsStateInTransition(), statTimestamp);
17081715
update(regionStates.getRegionFailedOpen(), statTimestamp);
17091716

17101717
if (LOG.isDebugEnabled() && ritsOverThreshold != null && !ritsOverThreshold.isEmpty()) {
@@ -1794,6 +1801,7 @@ public void joinCluster() throws IOException {
17941801
*/
17951802
// Public so can be run by the Master as part of the startup. Needs hbase:meta to be online.
17961803
// Needs to be done after the table state manager has been started.
1804+
//TODO umesh we can update the rit map here
17971805
public void processOfflineRegions() {
17981806
TransitRegionStateProcedure[] procs =
17991807
regionStates.getRegionStateNodes().stream().filter(rsn -> rsn.isInState(State.OFFLINE))
@@ -1873,6 +1881,7 @@ public void visitRegionState(Result result, final RegionInfo regionInfo, final S
18731881
if (regionNode.getProcedure() != null) {
18741882
regionNode.getProcedure().stateLoaded(AssignmentManager.this, regionNode);
18751883
}
1884+
regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
18761885
}
18771886
};
18781887

@@ -2046,17 +2055,59 @@ public Pair<Integer, Integer> getReopenStatus(TableName tableName) {
20462055
return new Pair<Integer, Integer>(ritCount, states.size());
20472056
}
20482057

2058+
// This comparator sorts the RegionStates by time stamp then Region name.
2059+
// Comparing by timestamp alone can lead us to discard different RegionStates that happen
2060+
// to share a timestamp.
2061+
private static class RegionStateStampComparator implements Comparator<RegionState> {
2062+
@Override
2063+
public int compare(final RegionState l, final RegionState r) {
2064+
int stampCmp = Long.compare(l.getStamp(), r.getStamp());
2065+
return stampCmp != 0 ? stampCmp : RegionInfo.COMPARATOR.compare(l.getRegion(), r.getRegion());
2066+
}
2067+
}
2068+
2069+
public final static RegionStateStampComparator REGION_STATE_STAMP_COMPARATOR =
2070+
new RegionStateStampComparator();
2071+
20492072
// ============================================================================================
20502073
// TODO: Region State In Transition
20512074
// ============================================================================================
20522075
public boolean hasRegionsInTransition() {
2053-
return regionStates.hasRegionsInTransition();
2076+
return regionInTransitionTracker.hasRegionsInTransition();
20542077
}
20552078

20562079
public List<RegionStateNode> getRegionsInTransition() {
2057-
return regionStates.getRegionsInTransition();
2080+
return new ArrayList<RegionStateNode>(regionInTransitionTracker.getRegionsInTransition().values());
20582081
}
20592082

2083+
public boolean isRegionInTransition(final RegionInfo regionInfo) {
2084+
return regionInTransitionTracker.isRegionInTransition(regionInfo);
2085+
}
2086+
2087+
/**
2088+
* Get the number of regions in transition.
2089+
*/
2090+
public int getRegionsInTransitionCount() {
2091+
return regionInTransitionTracker.getRegionsInTransition().size();
2092+
}
2093+
2094+
public List<RegionState> getRegionsStateInTransition() {
2095+
final List<RegionState> rit = new ArrayList<RegionState>(getRegionsInTransitionCount());
2096+
for (RegionStateNode node : getRegionsInTransition()) {
2097+
rit.add(node.toRegionState());
2098+
}
2099+
return rit;
2100+
}
2101+
2102+
public SortedSet<RegionState> getRegionsInTransitionOrderedByTimestamp() {
2103+
final SortedSet<RegionState> rit = new TreeSet<RegionState>(REGION_STATE_STAMP_COMPARATOR);
2104+
for (RegionStateNode node : getRegionsInTransition()) {
2105+
rit.add(node.toRegionState());
2106+
}
2107+
return rit;
2108+
}
2109+
2110+
20602111
public List<RegionInfo> getAssignedRegions() {
20612112
return regionStates.getAssignedRegions();
20622113
}
@@ -2122,6 +2173,8 @@ private CompletableFuture<Void> transitStateAndUpdate(RegionStateNode regionNode
21222173
if (e != null) {
21232174
// revert
21242175
regionNode.setState(state);
2176+
}else{
2177+
regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
21252178
}
21262179
});
21272180
return future;
@@ -2170,6 +2223,7 @@ CompletableFuture<Void> regionFailedOpen(RegionStateNode regionNode, boolean giv
21702223
if (regionLocation != null) {
21712224
regionStates.removeRegionFromServer(regionLocation, regionNode);
21722225
}
2226+
regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
21732227
} else {
21742228
// revert
21752229
regionNode.setState(state);
@@ -2230,6 +2284,7 @@ CompletableFuture<Void> persistToMeta(RegionStateNode regionNode) {
22302284
// on table that contains state.
22312285
setMetaAssigned(regionInfo, true);
22322286
}
2287+
regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
22332288
});
22342289
}
22352290

@@ -2247,6 +2302,7 @@ public CompletableFuture<Void> regionClosedAbnormally(RegionStateNode regionNode
22472302
regionNode.setLastHost(regionLocation);
22482303
regionStates.removeRegionFromServer(regionLocation, regionNode);
22492304
}
2305+
regionInTransitionTracker.handleRegionStateNodeOperation(regionNode);
22502306
} else {
22512307
// revert
22522308
regionNode.setState(state);
@@ -2307,6 +2363,7 @@ public void markRegionAsMerged(final RegionInfo child, final ServerName serverNa
23072363
for (RegionInfo ri : mergeParents) {
23082364
regionStates.deleteRegion(ri);
23092365
}
2366+
//TODO need to handle delete and new region
23102367
TableDescriptor td = master.getTableDescriptors().get(child.getTable());
23112368
regionStateStore.mergeRegions(child, mergeParents, serverName, td);
23122369
if (shouldAssignFavoredNodes(child)) {

hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,15 +154,15 @@ private static TransitRegionStateProcedure[] createAssignProcedures(MasterProced
154154
regionNode.lock();
155155
try {
156156
if (ignoreIfInTransition) {
157-
if (regionNode.isInTransition()) {
157+
if (regionNode.isOngoingTRSP()) {
158158
return null;
159159
}
160160
} else {
161161
// should never fail, as we have the exclusive region lock, and the region is newly
162162
// created, or has been successfully closed so should not be on any servers, so SCP
163163
// will
164164
// not process it either.
165-
assert !regionNode.isInTransition();
165+
assert !regionNode.isOngoingTRSP();
166166
}
167167
regionNode.setProcedure(proc);
168168
} finally {
@@ -184,7 +184,7 @@ private static TransitRegionStateProcedure[] createAssignProcedures(MasterProced
184184
// apply ignoreRITs to replica regions as well.
185185
if (
186186
!ignoreIfInTransition || !env.getAssignmentManager().getRegionStates()
187-
.getOrCreateRegionStateNode(ri).isInTransition()
187+
.getOrCreateRegionStateNode(ri).isOngoingTRSP()
188188
) {
189189
replicaRegionInfos.add(ri);
190190
}
@@ -232,7 +232,7 @@ private static TransitRegionStateProcedure[] createRoundRobinAssignProcedures(
232232
for (RegionInfo region : regionsAndReplicas) {
233233
if (
234234
env.getAssignmentManager().getRegionStates().getOrCreateRegionStateNode(region)
235-
.isInTransition()
235+
.isOngoingTRSP()
236236
) {
237237
return null;
238238
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package org.apache.hadoop.hbase.master.assignment;
2+
3+
import java.util.List;
4+
import java.util.concurrent.ConcurrentSkipListMap;
5+
import org.apache.hadoop.hbase.client.RegionInfo;
6+
import org.apache.hadoop.hbase.client.TableState;
7+
import org.apache.hadoop.hbase.master.RegionState;
8+
import org.apache.hadoop.hbase.master.TableStateManager;
9+
import org.apache.yetus.audience.InterfaceAudience;
10+
import org.slf4j.Logger;
11+
import org.slf4j.LoggerFactory;
12+
13+
@InterfaceAudience.Private
14+
public class RegionInTransitionTracker {
15+
16+
private final List<RegionState.State> DISABLE_TABLE_REGION_STATE =
17+
List.of(RegionState.State.OFFLINE, RegionState.State.CLOSED);
18+
private final List<RegionState.State> ENABLE_TABLE_REGION_STATE =
19+
List.of(RegionState.State.OPEN, RegionState.State.SPLIT, RegionState.State.MERGED);
20+
21+
private static final Logger LOG = LoggerFactory.getLogger(RegionInTransitionTracker.class);
22+
23+
private final ConcurrentSkipListMap<RegionInfo, RegionStateNode> regionInTransition =
24+
new ConcurrentSkipListMap<>(RegionInfo.COMPARATOR);
25+
private final TableStateManager tableStateManager;
26+
27+
public RegionInTransitionTracker(TableStateManager tableStateManager) {
28+
this.tableStateManager = tableStateManager;
29+
}
30+
31+
public boolean isRegionInTransition(final RegionInfo regionInfo) {
32+
return regionInTransition.containsKey(regionInfo);
33+
}
34+
35+
public void handleRegionStateNodeOperation(RegionStateNode regionStateNode) {
36+
RegionState.State currentState = regionStateNode.getState();
37+
38+
if (!getExceptedRegionStates(regionStateNode).contains(currentState)) {
39+
addRegionInTransition(regionStateNode);
40+
} else {
41+
removeRegionInTransition(regionStateNode.getRegionInfo());
42+
}
43+
}
44+
45+
public void handleRegionDelete(RegionStateNode regionStateNode) {
46+
removeRegionInTransition(regionStateNode.getRegionInfo());
47+
}
48+
49+
private List<RegionState.State> getExceptedRegionStates(RegionStateNode regionStateNode) {
50+
if (tableStateManager.isTableState(regionStateNode.getTable(), TableState.State.ENABLED,
51+
TableState.State.ENABLING)) {
52+
return ENABLE_TABLE_REGION_STATE;
53+
} else {
54+
return DISABLE_TABLE_REGION_STATE;
55+
}
56+
}
57+
58+
public void addRegionInTransition(final RegionStateNode regionStateNode) {
59+
if (regionInTransition.putIfAbsent(regionStateNode.getRegionInfo(), regionStateNode) == null) {
60+
LOG.info("Added to RIT list" + regionStateNode.getRegionInfo().getEncodedName());
61+
}
62+
}
63+
64+
public void removeRegionInTransition(final RegionInfo regionInfo) {
65+
if (regionInTransition.remove(regionInfo) != null) {
66+
LOG.info("Removed from RIT list" + regionInfo);
67+
}
68+
}
69+
70+
public void clear() {
71+
regionInTransition.clear();
72+
}
73+
74+
public boolean hasRegionsInTransition() {
75+
return !regionInTransition.isEmpty();
76+
}
77+
78+
public ConcurrentSkipListMap<RegionInfo, RegionStateNode> getRegionsInTransition() {
79+
return regionInTransition;
80+
}
81+
82+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateNode.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.hadoop.hbase.master.assignment;
1919

2020
import java.util.Arrays;
21-
import java.util.concurrent.ConcurrentMap;
2221
import org.apache.hadoop.hbase.HConstants;
2322
import org.apache.hadoop.hbase.ServerName;
2423
import org.apache.hadoop.hbase.TableName;
@@ -78,7 +77,6 @@ public AssignmentProcedureEvent(final RegionInfo regionInfo) {
7877
private final RegionStateNodeLock lock;
7978
private final RegionInfo regionInfo;
8079
private final ProcedureEvent<?> event;
81-
private final ConcurrentMap<RegionInfo, RegionStateNode> ritMap;
8280

8381
// volatile only for getLastUpdate and test usage, the upper layer should sync on the
8482
// RegionStateNode before accessing usually.
@@ -102,10 +100,9 @@ public AssignmentProcedureEvent(final RegionInfo regionInfo) {
102100

103101
private volatile long openSeqNum = HConstants.NO_SEQNUM;
104102

105-
RegionStateNode(RegionInfo regionInfo, ConcurrentMap<RegionInfo, RegionStateNode> ritMap) {
103+
RegionStateNode(RegionInfo regionInfo) {
106104
this.regionInfo = regionInfo;
107105
this.event = new AssignmentProcedureEvent(regionInfo);
108-
this.ritMap = ritMap;
109106
this.lock = new RegionStateNodeLock(regionInfo);
110107
}
111108

@@ -161,7 +158,7 @@ public boolean isStuck() {
161158
return isInState(State.FAILED_OPEN) && getProcedure() != null;
162159
}
163160

164-
public boolean isInTransition() {
161+
public boolean isOngoingTRSP() {
165162
return getProcedure() != null;
166163
}
167164

@@ -207,14 +204,12 @@ public ServerName setRegionLocation(final ServerName serverName) {
207204
public TransitRegionStateProcedure setProcedure(TransitRegionStateProcedure proc) {
208205
assert this.procedure == null;
209206
this.procedure = proc;
210-
ritMap.put(regionInfo, this);
211207
return proc;
212208
}
213209

214210
public void unsetProcedure(TransitRegionStateProcedure proc) {
215211
assert this.procedure == proc;
216212
this.procedure = null;
217-
ritMap.remove(regionInfo, this);
218213
}
219214

220215
public TransitRegionStateProcedure getProcedure() {

0 commit comments

Comments
 (0)