Skip to content

Commit 09cd522

Browse files
[LIVY-758] Document how to attach to an existing session from Java client and add a property to the builder
1 parent ee7fdfc commit 09cd522

3 files changed

Lines changed: 53 additions & 12 deletions

File tree

api/src/main/java/org/apache/livy/LivyClientBuilder.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,10 @@
3636
public final class LivyClientBuilder {
3737

3838
public static final String LIVY_URI_KEY = "livy.uri";
39+
public static final String LIVY_SESSION_ID_KEY = "livy.sessionId";
3940

4041
private static final ServiceLoader<LivyClientFactory> CLIENT_FACTORY_LOADER =
41-
ServiceLoader.load(LivyClientFactory.class, classLoader());
42+
ServiceLoader.load(LivyClientFactory.class, classLoader());
4243

4344
private static List<LivyClientFactory> getLivyClientFactories() {
4445
List<LivyClientFactory> factories = new ArrayList<>();
@@ -96,11 +97,30 @@ public LivyClientBuilder(boolean loadDefaults) throws IOException {
9697
}
9798
}
9899

100+
/**
101+
*
102+
* @param uri the uri of the livy server,
103+
* if the uri contains <pre>sessions/{sessionId}</pre>
104+
* the client will connect to the existing session,
105+
* otherwise it will create a new session.
106+
* @return the builder itself.
107+
*/
99108
public LivyClientBuilder setURI(URI uri) {
100109
config.setProperty(LIVY_URI_KEY, uri.toString());
101110
return this;
102111
}
103112

113+
/**
114+
*
115+
* @param sessionId the id of the session to attach to,
116+
* if not set a new session will be created when the client is built.
117+
* @return the builder itself.
118+
*/
119+
public LivyClientBuilder setSessionId(int sessionId) {
120+
config.setProperty(LIVY_SESSION_ID_KEY, "" + sessionId);
121+
return this;
122+
}
123+
104124
public LivyClientBuilder setConf(String key, String value) {
105125
if (value != null) {
106126
config.setProperty(key, value);

client-http/src/main/java/org/apache/livy/client/http/HttpClient.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.livy.Job;
3434
import org.apache.livy.JobHandle;
3535
import org.apache.livy.LivyClient;
36+
import org.apache.livy.LivyClientBuilder;
3637
import org.apache.livy.client.common.Serializer;
3738
import static org.apache.livy.client.common.HttpMessages.*;
3839

@@ -54,17 +55,18 @@ class HttpClient implements LivyClient {
5455
this.config = httpConf;
5556
this.stopped = false;
5657

57-
// If the given URI looks like it refers to an existing session, then try to connect to
58-
// an existing session. Note this means that any Spark configuration in httpConf will be
59-
// unused.
60-
Matcher m = Pattern.compile("(.*)" + LivyConnection.SESSIONS_URI + "/([0-9]+)")
61-
.matcher(uri.getPath());
62-
58+
Matcher m = Pattern.compile("(.*)" + LivyConnection.SESSIONS_URI + "/([0-9]+)").matcher(uri.getPath());
59+
String sessionIdFromConf = httpConf.get(LivyClientBuilder.LIVY_SESSION_ID_KEY);
6360
try {
64-
if (m.matches()) {
61+
if (sessionIdFromConf != null && m.matches()) {
62+
throw new IllegalArgumentException("Cannot set existing session both from URI and configuration");
63+
} else if (sessionIdFromConf != null) {
64+
this.conn = new LivyConnection(uri, httpConf);
65+
this.sessionId = Integer.parseInt(sessionIdFromConf);
66+
conn.post(null, SessionInfo.class, "/%d/connect", sessionId);
67+
} else if (m.matches()) {
6568
URI base = new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), uri.getPort(),
66-
m.group(1), uri.getQuery(), uri.getFragment());
67-
69+
m.group(1), uri.getQuery(), uri.getFragment());
6870
this.conn = new LivyConnection(base, httpConf);
6971
this.sessionId = Integer.parseInt(m.group(2));
7072
conn.post(null, SessionInfo.class, "/%d/connect", sessionId);

client-http/src/test/scala/org/apache/livy/client/http/HttpClientSpec.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -181,15 +181,34 @@ class HttpClientSpec extends FunSpecLike with BeforeAndAfterAll with LivyBaseUni
181181
testJob(false, response = Some(null))
182182
}
183183

184-
withClient("should connect to existing sessions") {
185-
var sid = client.asInstanceOf[HttpClient].getSessionId()
184+
withClient("should connect to existing sessions using the URI") {
185+
val sid = client.asInstanceOf[HttpClient].getSessionId()
186186
val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" +
187187
s"${LivyConnection.SESSIONS_URI}/$sid"
188188
val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).build()
189189
newClient.stop(false)
190190
verify(session, never()).stop()
191191
}
192192

193+
withClient("should connect to existing sessions using the conf") {
194+
val sid = client.asInstanceOf[HttpClient].getSessionId()
195+
val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}"
196+
val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).setSessionId(sid).build()
197+
newClient.stop(false)
198+
verify(session, never()).stop()
199+
}
200+
201+
withClient("should throw an exception if both the sessionId conf is set and the URI contains the session") {
202+
val sid = client.asInstanceOf[HttpClient].getSessionId()
203+
val uri = s"http://${InetAddress.getLocalHost.getHostAddress}:${server.port}" +
204+
s"${LivyConnection.SESSIONS_URI}/$sid"
205+
intercept[IllegalArgumentException]{
206+
val newClient = new LivyClientBuilder(false).setURI(new URI(uri)).setSessionId(sid).build()
207+
newClient.stop(false)
208+
verify(session, never()).stop()
209+
}
210+
}
211+
193212
withClient("should tear down clients") {
194213
client.stop(true)
195214
verify(session, times(1)).stop()

0 commit comments

Comments
 (0)