Skip to content

Commit ffd922a

Browse files
committed
update
1 parent 8c87e9f commit ffd922a

1 file changed

Lines changed: 11 additions & 21 deletions

File tree

repl/src/main/scala/org/apache/livy/repl/Session.scala

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.livy.repl
1919

2020
import java.util.{LinkedHashMap => JLinkedHashMap}
2121
import java.util.Map.Entry
22-
import java.util.concurrent.{ConcurrentHashMap, Executors}
22+
import java.util.concurrent.Executors
2323
import java.util.concurrent.atomic.AtomicInteger
2424

2525
import scala.collection.JavaConverters._
@@ -63,8 +63,6 @@ class Session(
6363
private val cancelExecutor = ExecutionContext.fromExecutorService(
6464
Executors.newSingleThreadExecutor())
6565

66-
private val statementThreads = new ConcurrentHashMap[Int, Thread]()
67-
6866
private implicit val formats = DefaultFormats
6967

7068
private var _state: SessionState = SessionState.NotStarted
@@ -163,25 +161,18 @@ class Session(
163161
_statements.synchronized { _statements(statementId) = statement }
164162

165163
Future {
166-
val currentThread = Thread.currentThread()
167-
statementThreads.put(statementId, currentThread)
168-
try {
169-
setJobGroup(tpe, statementId)
170-
statement.compareAndTransit(StatementState.Waiting, StatementState.Running)
171-
172-
if (statement.state.get() == StatementState.Running) {
173-
statement.started = System.currentTimeMillis()
174-
statement.output = executeCode(interpreter(tpe), statementId, code)
175-
}
164+
setJobGroup(tpe, statementId)
165+
statement.compareAndTransit(StatementState.Waiting, StatementState.Running)
176166

177-
statement.compareAndTransit(StatementState.Running, StatementState.Available)
178-
statement.compareAndTransit(StatementState.Cancelling, StatementState.Cancelled)
179-
statement.updateProgress(1.0)
180-
statement.completed = System.currentTimeMillis()
181-
} finally {
182-
statementThreads.remove(statementId, currentThread)
183-
Thread.interrupted()
167+
if (statement.state.get() == StatementState.Running) {
168+
statement.started = System.currentTimeMillis()
169+
statement.output = executeCode(interpreter(tpe), statementId, code)
184170
}
171+
172+
statement.compareAndTransit(StatementState.Running, StatementState.Available)
173+
statement.compareAndTransit(StatementState.Cancelling, StatementState.Cancelled)
174+
statement.updateProgress(1.0)
175+
statement.completed = System.currentTimeMillis()
185176
}(interpreterExecutor)
186177

187178
statementId
@@ -221,7 +212,6 @@ class Session(
221212
info(s"Failed to cancel statement $statementId.")
222213
statement.compareAndTransit(StatementState.Cancelling, StatementState.Cancelled)
223214
} else {
224-
Option(statementThreads.get(statementId)).foreach(_.interrupt())
225215
sc.cancelJobGroup(statementId.toString)
226216
if (statement.state.get() == StatementState.Cancelling) {
227217
Thread.sleep(livyConf.getTimeAsMs(RSCConf.Entry.JOB_CANCEL_TRIGGER_INTERVAL))

0 commit comments

Comments
 (0)