Skip to content

Commit e554134

Browse files
committed
Reapply "HBASE-29473 Obtain target cluster's token for cross clusters job (#7198)"
This reverts commit 3bed95f.
1 parent 3a7cd94 commit e554134

File tree

4 files changed

+186
-40
lines changed

4 files changed

+186
-40
lines changed

hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileOutputFormat2.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -811,7 +811,7 @@ public static void configureIncrementalLoadMap(Job job, TableDescriptor tableDes
811811
* @see #REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY
812812
* @see #REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY
813813
*/
814-
public static void configureRemoteCluster(Job job, Configuration clusterConf) {
814+
public static void configureRemoteCluster(Job job, Configuration clusterConf) throws IOException {
815815
Configuration conf = job.getConfiguration();
816816

817817
if (!conf.getBoolean(LOCALITY_SENSITIVE_CONF_KEY, DEFAULT_LOCALITY_SENSITIVE)) {
@@ -828,6 +828,8 @@ public static void configureRemoteCluster(Job job, Configuration clusterConf) {
828828
conf.setInt(REMOTE_CLUSTER_ZOOKEEPER_CLIENT_PORT_CONF_KEY, clientPort);
829829
conf.set(REMOTE_CLUSTER_ZOOKEEPER_ZNODE_PARENT_CONF_KEY, parent);
830830

831+
TableMapReduceUtil.initCredentialsForCluster(job, clusterConf);
832+
831833
LOG.info("ZK configs for remote cluster of bulkload is configured: " + quorum + ":" + clientPort
832834
+ "/" + parent);
833835
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.mapreduce;
19+
20+
import static org.apache.hadoop.security.UserGroupInformation.loginUserFromKeytab;
21+
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertTrue;
23+
24+
import java.io.Closeable;
25+
import java.io.File;
26+
import java.util.ArrayList;
27+
import java.util.List;
28+
import org.apache.commons.io.IOUtils;
29+
import org.apache.hadoop.conf.Configuration;
30+
import org.apache.hadoop.hbase.HBaseClassTestRule;
31+
import org.apache.hadoop.hbase.HBaseTestingUtility;
32+
import org.apache.hadoop.hbase.KeyValue;
33+
import org.apache.hadoop.hbase.TableName;
34+
import org.apache.hadoop.hbase.client.RegionLocator;
35+
import org.apache.hadoop.hbase.client.Table;
36+
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
37+
import org.apache.hadoop.hbase.testclassification.LargeTests;
38+
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
39+
import org.apache.hadoop.hbase.util.Bytes;
40+
import org.apache.hadoop.io.Text;
41+
import org.apache.hadoop.mapreduce.Job;
42+
import org.apache.hadoop.minikdc.MiniKdc;
43+
import org.apache.hadoop.security.UserGroupInformation;
44+
import org.junit.After;
45+
import org.junit.Before;
46+
import org.junit.ClassRule;
47+
import org.junit.Test;
48+
import org.junit.experimental.categories.Category;
49+
50+
/**
51+
* Tests for {@link HFileOutputFormat2} with secure mode.
52+
*/
53+
@Category({ VerySlowMapReduceTests.class, LargeTests.class })
54+
public class TestHFileOutputFormat2WithSecurity {
55+
@ClassRule
56+
public static final HBaseClassTestRule CLASS_RULE =
57+
HBaseClassTestRule.forClass(TestHFileOutputFormat2WithSecurity.class);
58+
59+
private static final byte[] FAMILIES = Bytes.toBytes("test_cf");
60+
61+
private static final String HTTP_PRINCIPAL = "HTTP/localhost";
62+
63+
private HBaseTestingUtility utilA;
64+
65+
private Configuration confA;
66+
67+
private HBaseTestingUtility utilB;
68+
69+
private MiniKdc kdc;
70+
71+
private List<Closeable> clusters = new ArrayList<>();
72+
73+
@Before
74+
public void setupSecurityClusters() throws Exception {
75+
utilA = new HBaseTestingUtility();
76+
confA = utilA.getConfiguration();
77+
78+
utilB = new HBaseTestingUtility();
79+
80+
// Prepare security configs.
81+
File keytab = new File(utilA.getDataTestDir("keytab").toUri().getPath());
82+
kdc = utilA.setupMiniKdc(keytab);
83+
String username = UserGroupInformation.getLoginUser().getShortUserName();
84+
String userPrincipal = username + "/localhost";
85+
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
86+
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
87+
88+
// Start security clusterA
89+
clusters.add(utilA.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL));
90+
91+
// Start security clusterB
92+
clusters.add(utilB.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL));
93+
}
94+
95+
@After
96+
public void teardownSecurityClusters() {
97+
IOUtils.closeQuietly(clusters);
98+
clusters.clear();
99+
if (kdc != null) {
100+
kdc.stop();
101+
}
102+
}
103+
104+
@Test
105+
public void testIncrementalLoadInMultiClusterWithSecurity() throws Exception {
106+
TableName tableName = TableName.valueOf("testIncrementalLoadInMultiClusterWithSecurity");
107+
108+
// Create table in clusterB
109+
try (Table table = utilB.createTable(tableName, FAMILIES);
110+
RegionLocator r = utilB.getConnection().getRegionLocator(tableName)) {
111+
112+
// Create job in clusterA
113+
Job job = Job.getInstance(confA, "testIncrementalLoadInMultiClusterWithSecurity");
114+
job.setWorkingDirectory(
115+
utilA.getDataTestDirOnTestFS("testIncrementalLoadInMultiClusterWithSecurity"));
116+
job.setInputFormatClass(NMapInputFormat.class);
117+
job.setMapperClass(TestHFileOutputFormat2.RandomKVGeneratingMapper.class);
118+
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
119+
job.setMapOutputValueClass(KeyValue.class);
120+
HFileOutputFormat2.configureIncrementalLoad(job, table, r);
121+
122+
assertEquals(2, job.getCredentials().getAllTokens().size());
123+
124+
String remoteClusterId = utilB.getHBaseClusterInterface().getClusterMetrics().getClusterId();
125+
assertTrue(job.getCredentials().getToken(new Text(remoteClusterId)) != null);
126+
} finally {
127+
if (utilB.getAdmin().tableExists(tableName)) {
128+
utilB.deleteTable(tableName);
129+
}
130+
}
131+
}
132+
}

hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceUtil.java

Lines changed: 7 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,8 @@
2929
import org.apache.hadoop.hbase.HBaseClassTestRule;
3030
import org.apache.hadoop.hbase.HBaseTestingUtility;
3131
import org.apache.hadoop.hbase.client.Scan;
32-
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
33-
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
34-
import org.apache.hadoop.hbase.security.access.AccessController;
35-
import org.apache.hadoop.hbase.security.access.PermissionStorage;
36-
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
3732
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProviders;
3833
import org.apache.hadoop.hbase.security.token.AuthenticationTokenIdentifier;
39-
import org.apache.hadoop.hbase.security.token.TokenProvider;
40-
import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
4134
import org.apache.hadoop.hbase.testclassification.MapReduceTests;
4235
import org.apache.hadoop.hbase.testclassification.MediumTests;
4336
import org.apache.hadoop.hbase.util.Bytes;
@@ -48,7 +41,6 @@
4841
import org.apache.hadoop.minikdc.MiniKdc;
4942
import org.apache.hadoop.security.Credentials;
5043
import org.apache.hadoop.security.UserGroupInformation;
51-
import org.apache.hadoop.security.authentication.util.KerberosName;
5244
import org.apache.hadoop.security.token.Token;
5345
import org.apache.hadoop.security.token.TokenIdentifier;
5446
import org.junit.After;
@@ -134,33 +126,6 @@ public void testInitTableMapperJob4() throws Exception {
134126
assertEquals("Table", job.getConfiguration().get(TableInputFormat.INPUT_TABLE));
135127
}
136128

137-
private static Closeable startSecureMiniCluster(HBaseTestingUtility util, MiniKdc kdc,
138-
String principal) throws Exception {
139-
Configuration conf = util.getConfiguration();
140-
141-
SecureTestUtil.enableSecurity(conf);
142-
VisibilityTestUtil.enableVisiblityLabels(conf);
143-
SecureTestUtil.verifyConfiguration(conf);
144-
145-
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
146-
AccessController.class.getName() + ',' + TokenProvider.class.getName());
147-
148-
HBaseKerberosUtils.setSecuredConfiguration(conf, principal + '@' + kdc.getRealm(),
149-
HTTP_PRINCIPAL + '@' + kdc.getRealm());
150-
151-
KerberosName.resetDefaultRealm();
152-
153-
util.startMiniCluster();
154-
try {
155-
util.waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
156-
} catch (Exception e) {
157-
util.shutdownMiniCluster();
158-
throw e;
159-
}
160-
161-
return util::shutdownMiniCluster;
162-
}
163-
164129
@Test
165130
public void testInitCredentialsForCluster1() throws Exception {
166131
HBaseTestingUtility util1 = new HBaseTestingUtility();
@@ -199,8 +164,9 @@ public void testInitCredentialsForCluster2() throws Exception {
199164
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
200165
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
201166

202-
try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal);
203-
Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) {
167+
try (
168+
Closeable util1Closeable = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL);
169+
Closeable util2Closeable = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
204170
try {
205171
Configuration conf1 = util1.getConfiguration();
206172
Job job = Job.getInstance(conf1);
@@ -233,7 +199,8 @@ public void testInitCredentialsForCluster3() throws Exception {
233199
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
234200
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
235201

236-
try (Closeable util1Closeable = startSecureMiniCluster(util1, kdc, userPrincipal)) {
202+
try (
203+
Closeable util1Closeable = util1.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
237204
try {
238205
HBaseTestingUtility util2 = new HBaseTestingUtility();
239206
// Assume util2 is insecure cluster
@@ -269,7 +236,8 @@ public void testInitCredentialsForCluster4() throws Exception {
269236
kdc.createPrincipal(keytab, userPrincipal, HTTP_PRINCIPAL);
270237
loginUserFromKeytab(userPrincipal + '@' + kdc.getRealm(), keytab.getAbsolutePath());
271238

272-
try (Closeable util2Closeable = startSecureMiniCluster(util2, kdc, userPrincipal)) {
239+
try (
240+
Closeable util2Closeable = util2.startSecureMiniCluster(kdc, userPrincipal, HTTP_PRINCIPAL)) {
273241
try {
274242
Configuration conf1 = util1.getConfiguration();
275243
Job job = Job.getInstance(conf1);

hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.junit.Assert.fail;
2323

2424
import edu.umd.cs.findbugs.annotations.Nullable;
25+
import java.io.Closeable;
2526
import java.io.File;
2627
import java.io.IOException;
2728
import java.io.OutputStream;
@@ -88,6 +89,7 @@
8889
import org.apache.hadoop.hbase.client.TableDescriptor;
8990
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
9091
import org.apache.hadoop.hbase.client.TableState;
92+
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
9193
import org.apache.hadoop.hbase.fs.HFileSystem;
9294
import org.apache.hadoop.hbase.io.compress.Compression;
9395
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
@@ -122,7 +124,12 @@
122124
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
123125
import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
124126
import org.apache.hadoop.hbase.security.User;
127+
import org.apache.hadoop.hbase.security.access.AccessController;
128+
import org.apache.hadoop.hbase.security.access.PermissionStorage;
129+
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
130+
import org.apache.hadoop.hbase.security.token.TokenProvider;
125131
import org.apache.hadoop.hbase.security.visibility.VisibilityLabelsCache;
132+
import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
126133
import org.apache.hadoop.hbase.util.Bytes;
127134
import org.apache.hadoop.hbase.util.CommonFSUtils;
128135
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -151,6 +158,7 @@
151158
import org.apache.hadoop.mapred.JobConf;
152159
import org.apache.hadoop.mapred.MiniMRCluster;
153160
import org.apache.hadoop.minikdc.MiniKdc;
161+
import org.apache.hadoop.security.authentication.util.KerberosName;
154162
import org.apache.yetus.audience.InterfaceAudience;
155163
import org.apache.zookeeper.WatchedEvent;
156164
import org.apache.zookeeper.ZooKeeper;
@@ -393,6 +401,42 @@ public static void closeRegionAndWAL(final HRegion r) throws IOException {
393401
r.getWAL().close();
394402
}
395403

404+
/**
405+
* Start mini secure cluster with given kdc and principals.
406+
* @param kdc Mini kdc server
407+
* @param servicePrincipal Service principal without realm.
408+
* @param spnegoPrincipal Spnego principal without realm.
409+
* @return Handler to shutdown the cluster
410+
*/
411+
public Closeable startSecureMiniCluster(MiniKdc kdc, String servicePrincipal,
412+
String spnegoPrincipal) throws Exception {
413+
Configuration conf = getConfiguration();
414+
415+
SecureTestUtil.enableSecurity(conf);
416+
VisibilityTestUtil.enableVisiblityLabels(conf);
417+
SecureTestUtil.verifyConfiguration(conf);
418+
419+
// Reset the static default realm forcibly for hadoop-2.0.
420+
// It has no impact but not required for hadoop-3.0.
421+
KerberosName.resetDefaultRealm();
422+
423+
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
424+
AccessController.class.getName() + ',' + TokenProvider.class.getName());
425+
426+
HBaseKerberosUtils.setSecuredConfiguration(conf, servicePrincipal + '@' + kdc.getRealm(),
427+
spnegoPrincipal + '@' + kdc.getRealm());
428+
429+
startMiniCluster();
430+
try {
431+
waitUntilAllRegionsAssigned(PermissionStorage.ACL_TABLE_NAME);
432+
} catch (Exception e) {
433+
shutdownMiniCluster();
434+
throw e;
435+
}
436+
437+
return this::shutdownMiniCluster;
438+
}
439+
396440
/**
397441
* Returns this classes's instance of {@link Configuration}. Be careful how you use the returned
398442
* Configuration since {@link Connection} instances can be shared. The Map of Connections is keyed

0 commit comments

Comments
 (0)