Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit db79963

Browse files
committedJan 15, 2025··
KAFAK-14604: SASL session expiration time will be overflowed when calculation
Signed-off-by: PoAn Yang <payang@apache.org>
1 parent ae661de commit db79963

File tree

6 files changed

+136
-14
lines changed

6 files changed

+136
-14
lines changed
 

‎clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientAuthenticator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -690,7 +690,7 @@ public void setAuthenticationEndAndSessionReauthenticationTimes(long nowNanos) {
690690
double pctToUse = pctWindowFactorToTakeNetworkLatencyAndClockDriftIntoAccount + RNG.nextDouble()
691691
* pctWindowJitterToAvoidReauthenticationStormAcrossManyChannelsSimultaneously;
692692
sessionLifetimeMsToUse = (long) (positiveSessionLifetimeMs * pctToUse);
693-
clientSessionReauthenticationTimeNanos = authenticationEndNanos + 1000 * 1000 * sessionLifetimeMsToUse;
693+
clientSessionReauthenticationTimeNanos = Math.addExact(authenticationEndNanos, Utils.msToNs(sessionLifetimeMsToUse));
694694
log.debug(
695695
"Finished {} with session expiration in {} ms and session re-authentication on or after {} ms",
696696
authenticationOrReauthenticationText(), positiveSessionLifetimeMs, sessionLifetimeMsToUse);

‎clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -681,7 +681,7 @@ else if (!maxReauthSet)
681681
else
682682
retvalSessionLifetimeMs = zeroIfNegative(Math.min(credentialExpirationMs - authenticationEndMs, connectionsMaxReauthMs));
683683

684-
sessionExpirationTimeNanos = authenticationEndNanos + 1000 * 1000 * retvalSessionLifetimeMs;
684+
sessionExpirationTimeNanos = Math.addExact(authenticationEndNanos, Utils.msToNs(retvalSessionLifetimeMs));
685685
}
686686

687687
if (credentialExpirationMs != null) {

‎clients/src/main/java/org/apache/kafka/common/utils/Utils.java

+13
Original file line numberDiff line numberDiff line change
@@ -1697,4 +1697,17 @@ public static ConfigDef mergeConfigs(List<ConfigDef> configDefs) {
16971697
public interface ThrowingRunnable {
16981698
void run() throws Exception;
16991699
}
1700+
1701+
/**
1702+
* convert millisecond to nanosecond, or throw exception if overflow
1703+
* @param timeMs the time in millisecond
1704+
* @return the converted nanosecond
1705+
*/
1706+
public static long msToNs(long timeMs) {
1707+
try {
1708+
return Math.multiplyExact(1000 * 1000, timeMs);
1709+
} catch (ArithmeticException e) {
1710+
throw new IllegalArgumentException("Cannot convert " + timeMs + " millisecond to nanosecond due to arithmetic overflow", e);
1711+
}
1712+
}
17001713
}

‎clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java

+85-12
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ public class SaslAuthenticatorTest {
155155
private static final long CONNECTIONS_MAX_REAUTH_MS_VALUE = 100L;
156156
private static final int BUFFER_SIZE = 4 * 1024;
157157
private static Time time = Time.SYSTEM;
158+
private static boolean needLargeExpiration = false;
158159

159160
private NioEchoServer server;
160161
private Selector selector;
@@ -178,6 +179,7 @@ public void setup() throws Exception {
178179

179180
@AfterEach
180181
public void teardown() throws Exception {
182+
needLargeExpiration = false;
181183
if (server != null)
182184
this.server.close();
183185
if (selector != null)
@@ -1607,6 +1609,42 @@ public void testCannotReauthenticateWithDifferentPrincipal() throws Exception {
16071609
server.verifyReauthenticationMetrics(0, 1);
16081610
}
16091611

1612+
@Test
1613+
public void testReauthenticateWithLargeReauthValue() throws Exception {
1614+
// enable it, we'll get a large expiration timestamp token
1615+
needLargeExpiration = true;
1616+
String node = "0";
1617+
SecurityProtocol securityProtocol = SecurityProtocol.SASL_SSL;
1618+
1619+
configureMechanisms(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM,
1620+
List.of(OAuthBearerLoginModule.OAUTHBEARER_MECHANISM));
1621+
// set a large re-auth timeout in server side
1622+
saslServerConfigs.put(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG, Long.MAX_VALUE);
1623+
server = createEchoServer(securityProtocol);
1624+
1625+
// set to default value for sasl login configs for initialization in ExpiringCredentialRefreshConfig
1626+
saslClientConfigs.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR, SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_FACTOR);
1627+
saslClientConfigs.put(SaslConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER, SaslConfigs.DEFAULT_LOGIN_REFRESH_WINDOW_JITTER);
1628+
saslClientConfigs.put(SaslConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS, SaslConfigs.DEFAULT_LOGIN_REFRESH_MIN_PERIOD_SECONDS);
1629+
saslClientConfigs.put(SaslConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS, SaslConfigs.DEFAULT_LOGIN_REFRESH_BUFFER_SECONDS);
1630+
saslClientConfigs.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, AlternateLoginCallbackHandler.class);
1631+
1632+
createCustomClientConnection(securityProtocol, OAuthBearerLoginModule.OAUTHBEARER_MECHANISM, node, true);
1633+
1634+
// channel should be not null before sasl handshake
1635+
assertNotNull(selector.channel(node));
1636+
1637+
TestUtils.waitForCondition(() -> {
1638+
selector.poll(1000);
1639+
// this channel should be closed due to session timeout calculation overflow
1640+
return selector.channel(node) == null;
1641+
}, "channel didn't close with large re-authentication value");
1642+
1643+
// ensure metrics are as expected
1644+
server.verifyAuthenticationMetrics(0, 0);
1645+
server.verifyReauthenticationMetrics(0, 0);
1646+
}
1647+
16101648
@Test
16111649
public void testCorrelationId() {
16121650
SaslClientAuthenticator authenticator = new SaslClientAuthenticator(
@@ -1936,7 +1974,7 @@ private void createClientConnection(SecurityProtocol securityProtocol, String sa
19361974
if (enableSaslAuthenticateHeader)
19371975
createClientConnection(securityProtocol, node);
19381976
else
1939-
createClientConnectionWithoutSaslAuthenticateHeader(securityProtocol, saslMechanism, node);
1977+
createCustomClientConnection(securityProtocol, saslMechanism, node, false);
19401978
}
19411979

19421980
private NioEchoServer startServerApiVersionsUnsupportedByClient(final SecurityProtocol securityProtocol, String saslMechanism) throws Exception {
@@ -2024,15 +2062,13 @@ protected void enableKafkaSaslAuthenticateHeaders(boolean flag) {
20242062
return server;
20252063
}
20262064

2027-
private void createClientConnectionWithoutSaslAuthenticateHeader(final SecurityProtocol securityProtocol,
2028-
final String saslMechanism, String node) throws Exception {
2029-
2030-
final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
2031-
final Map<String, ?> configs = Collections.emptyMap();
2032-
final JaasContext jaasContext = JaasContext.loadClientContext(configs);
2033-
final Map<String, JaasContext> jaasContexts = Collections.singletonMap(saslMechanism, jaasContext);
2034-
2035-
SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(ConnectionMode.CLIENT, jaasContexts,
2065+
private SaslChannelBuilder saslChannelBuilderWithoutHeader(
2066+
final SecurityProtocol securityProtocol,
2067+
final String saslMechanism,
2068+
final Map<String, JaasContext> jaasContexts,
2069+
final ListenerName listenerName
2070+
) {
2071+
return new SaslChannelBuilder(ConnectionMode.CLIENT, jaasContexts,
20362072
securityProtocol, listenerName, false, saslMechanism,
20372073
null, null, null, time, new LogContext(), null) {
20382074

@@ -2059,6 +2095,42 @@ protected void setSaslAuthenticateAndHandshakeVersions(ApiVersionsResponse apiVe
20592095
};
20602096
}
20612097
};
2098+
}
2099+
2100+
private void createCustomClientConnection(
2101+
final SecurityProtocol securityProtocol,
2102+
final String saslMechanism,
2103+
String node,
2104+
boolean withSaslAuthenticateHeader
2105+
) throws Exception {
2106+
2107+
final ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
2108+
final Map<String, ?> configs = Collections.emptyMap();
2109+
final JaasContext jaasContext = JaasContext.loadClientContext(configs);
2110+
final Map<String, JaasContext> jaasContexts = Collections.singletonMap(saslMechanism, jaasContext);
2111+
2112+
SaslChannelBuilder clientChannelBuilder;
2113+
if (!withSaslAuthenticateHeader) {
2114+
clientChannelBuilder = saslChannelBuilderWithoutHeader(securityProtocol, saslMechanism, jaasContexts, listenerName);
2115+
} else {
2116+
clientChannelBuilder = new SaslChannelBuilder(ConnectionMode.CLIENT, jaasContexts,
2117+
securityProtocol, listenerName, false, saslMechanism,
2118+
null, null, null, time, new LogContext(), null) {
2119+
2120+
@Override
2121+
protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> configs,
2122+
AuthenticateCallbackHandler callbackHandler,
2123+
String id,
2124+
String serverHost,
2125+
String servicePrincipal,
2126+
TransportLayer transportLayer,
2127+
Subject subject) {
2128+
2129+
return new SaslClientAuthenticator(configs, callbackHandler, id, subject,
2130+
servicePrincipal, serverHost, saslMechanism, transportLayer, time, new LogContext());
2131+
}
2132+
};
2133+
}
20622134
clientChannelBuilder.configure(saslClientConfigs);
20632135
this.selector = NetworkTestUtils.createSelector(clientChannelBuilder, time);
20642136
InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
@@ -2507,10 +2579,11 @@ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallback
25072579
+ ++numInvocations;
25082580
String headerJson = "{" + claimOrHeaderJsonText("alg", "none") + "}";
25092581
/*
2510-
* Use a short lifetime so the background refresh thread replaces it before we
2582+
* If we're testing large expiration scenario, use a large lifetime.
2583+
* Otherwise, use a short lifetime so the background refresh thread replaces it before we
25112584
* re-authenticate
25122585
*/
2513-
String lifetimeSecondsValueToUse = "1";
2586+
String lifetimeSecondsValueToUse = needLargeExpiration ? String.valueOf(Long.MAX_VALUE) : "1";
25142587
String claimsJson;
25152588
try {
25162589
claimsJson = String.format("{%s,%s,%s}",

‎clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslServerAuthenticatorTest.java

+29
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,35 @@ public void testSessionExpiresAtTokenExpiry() throws IOException {
269269
}
270270
}
271271

272+
@Test
273+
public void testSessionWontExpireWithLargeExpirationTime() throws IOException {
274+
String mechanism = OAuthBearerLoginModule.OAUTHBEARER_MECHANISM;
275+
SaslServer saslServer = mock(SaslServer.class);
276+
MockTime time = new MockTime(0, 1, 1000);
277+
// set a Long.MAX_VALUE as the expiration time
278+
Duration largeExpirationTime = Duration.ofMillis(Long.MAX_VALUE);
279+
280+
try (
281+
MockedStatic<?> ignored = mockSaslServer(saslServer, mechanism, time, largeExpirationTime);
282+
MockedStatic<?> ignored2 = mockKafkaPrincipal("[principal-type]", "[principal-name");
283+
TransportLayer transportLayer = mockTransportLayer()
284+
) {
285+
286+
SaslServerAuthenticator authenticator = getSaslServerAuthenticatorForOAuth(mechanism, transportLayer, time, largeExpirationTime.toMillis());
287+
288+
mockRequest(saslHandshakeRequest(mechanism), transportLayer);
289+
authenticator.authenticate();
290+
291+
when(saslServer.isComplete()).thenReturn(false).thenReturn(true);
292+
mockRequest(saslAuthenticateRequest(), transportLayer);
293+
294+
Throwable t = assertThrows(IllegalArgumentException.class, () -> authenticator.authenticate());
295+
assertEquals(ArithmeticException.class, t.getCause().getClass());
296+
assertEquals("Cannot convert " + Long.MAX_VALUE + " millisecond to nanosecond due to arithmetic overflow",
297+
t.getMessage());
298+
}
299+
}
300+
272301
private SaslServerAuthenticator getSaslServerAuthenticatorForOAuth(String mechanism, TransportLayer transportLayer, Time time, Long maxReauth) {
273302
Map<String, ?> configs = Collections.singletonMap(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
274303
Collections.singletonList(mechanism));

‎clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java

+7
Original file line numberDiff line numberDiff line change
@@ -1109,6 +1109,13 @@ public void testTryAll() throws Throwable {
11091109
assertEquals(expected, recorded);
11101110
}
11111111

1112+
@Test
1113+
public void testMsToNs() {
1114+
assertEquals(1000000, Utils.msToNs(1));
1115+
assertEquals(0, Utils.msToNs(0));
1116+
assertThrows(IllegalArgumentException.class, () -> Utils.msToNs(Long.MAX_VALUE));
1117+
}
1118+
11121119
private Callable<Void> recordingCallable(Map<String, Object> recordingMap, String success, TestException failure) {
11131120
return () -> {
11141121
if (success == null)

0 commit comments

Comments
 (0)
Please sign in to comment.