Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala2.13 + Spark v3.3 #199

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Prev Previous commit
Next Next commit
pattern matching on Results enum and migrate from iMain.beSilentNow t…
…o withoutWarnings to capture return type
requaos committed Feb 13, 2023
commit 43fe854e7f4927f97750c3f578d73691171bbbdf
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ class SignatureCheckerActor(
val isValidSignature = hmacString == signature
logger.trace(s"Signature ${signature} validity checked against " +
s"hmac ${hmacString} with outcome ${isValidSignature}")
sender ! isValidSignature
sender() ! isValidSignature
}
}

Original file line number Diff line number Diff line change
@@ -38,7 +38,7 @@ class SignatureProducerActor(
Json.stringify(Json.toJson(message.metadata)),
message.contentString
)
sender ! signature
sender() ! signature
}
}

Original file line number Diff line number Diff line change
@@ -52,7 +52,7 @@ trait OrderedSupport extends Actor with Stash with LogLike {
*/
def startProcessing(): Unit = {
logger.debug("Actor is in processing state and will stash messages of " +
s"types: ${orderedTypes.mkString(" ")}")
s"types: ${orderedTypes().mkString(" ")}")
context.become(waiting, discardOld = false)
}

Original file line number Diff line number Diff line change
@@ -24,6 +24,8 @@ import org.apache.toree.global.StreamState
import org.apache.toree.interpreter.imports.printers.{WrapperConsole, WrapperSystem}
import org.apache.toree.interpreter.{ExecuteError, Interpreter}
import scala.tools.nsc.interpreter._
import scala.tools.nsc.interpreter.shell._
import scala.language.postfixOps
import scala.concurrent.Future
import scala.tools.nsc.{Global, Settings, util}
import scala.util.Try
@@ -32,16 +34,16 @@ trait ScalaInterpreterSpecific extends SettingsProducerLike { this: ScalaInterpr
private val ExecutionExceptionName = "lastException"

private[toree] var iMain: IMain = _
private var completer: PresentationCompilerCompleter = _
private var completer: Completion = _
private val exceptionHack = new ExceptionHack()

def _runtimeClassloader = {
_thisClassloader
}

protected def newIMain(settings: Settings, out: JPrintWriter): IMain = {
val s = new IMain(settings, out)
s.initializeSynchronous()
protected def newIMain(settings: Settings, out: PrintWriter): IMain = {
val s = new IMain(settings, new ReplReporterImpl(settings, out))
s.initializeCompiler()
s
}

@@ -82,7 +84,7 @@ trait ScalaInterpreterSpecific extends SettingsProducerLike { this: ScalaInterpr
val modifiers = buildModifierList(termNameString)
logger.debug(s"Rebinding of $termNameString as " +
s"${modifiers.mkString(" ")} $termTypeString")
Try(iMain.beSilentDuring {
Try(iMain.beQuietDuring {
iMain.bind(
termNameString, termTypeString, termValue, modifiers
)
@@ -145,7 +147,7 @@ trait ScalaInterpreterSpecific extends SettingsProducerLike { this: ScalaInterpr
case Left(ex) =>
logger.error("Set failed in bind(%s, %s, %s)".format(variableName, typeName, value))
logger.error(util.stackTraceString(ex))
IR.Error
Results.Error

case Right(_) =>
val line = "%sval %s = %s.value".format(modifiers map (_ + " ") mkString, variableName, bindRep.evalPath)
@@ -164,7 +166,7 @@ trait ScalaInterpreterSpecific extends SettingsProducerLike { this: ScalaInterpr
*/
override def doQuietly[T](body: => T): T = {
require(iMain != null)
iMain.beQuietDuring[T](body)
iMain.withoutWarnings[T](body)
}


@@ -271,13 +273,13 @@ trait ScalaInterpreterSpecific extends SettingsProducerLike { this: ScalaInterpr
logger.debug("Initializing task manager")
taskManager.start()

iMain = newIMain(settings, new JPrintWriter(lastResultOut, true))
iMain = newIMain(settings, new PrintWriter(lastResultOut, true))

//logger.debug("Initializing interpreter")
//iMain.initializeSynchronous()

logger.debug("Initializing completer")
completer = new PresentationCompilerCompleter(iMain)
completer = new ReplCompletion(iMain)

iMain.beQuietDuring {
//logger.info("Rerouting Console and System related input and output")
@@ -307,7 +309,7 @@ trait ScalaInterpreterSpecific extends SettingsProducerLike { this: ScalaInterpr
logger.debug(s"Attempting code completion for ${code}")
val result = completer.complete(code, pos)

(result.cursor, result.candidates)
(result.cursor, result.candidates.map(_.toString))
}

/**
@@ -318,11 +320,13 @@ trait ScalaInterpreterSpecific extends SettingsProducerLike { this: ScalaInterpr
* @return tuple of (completeStatus, indent)
*/
override def isComplete(code: String): (String, String) = {
val result = iMain.beSilentDuring {
val parse = iMain.parse
parse(code) match {
case t: parse.Error => ("invalid", "")
case t: parse.Success =>
import scala.language.existentials
val result = iMain.withoutWarnings {
val parse = iMain.parse(code)
parse match {
case Left(Results.Error) => ("invalid", "")
case Right(_) => ("invalid", "")
case Left(Results.Success) =>
val lines = code.split("\n", -1)
val numLines = lines.length
// for multiline code blocks, require an empty line before executing
@@ -332,7 +336,7 @@ trait ScalaInterpreterSpecific extends SettingsProducerLike { this: ScalaInterpr
} else {
("complete", "")
}
case t: parse.Incomplete =>
case Left(Results.Incomplete) =>
val lines = code.split("\n", -1)
// For now lets just grab the indent of the current line, if none default to 2 spaces.
("incomplete", startingWhiteSpace(lines.last))
@@ -367,14 +371,14 @@ trait ScalaInterpreterSpecific extends SettingsProducerLike { this: ScalaInterpr
s
}

protected def interpretAddTask(code: String, silent: Boolean): Future[IR.Result] = {
protected def interpretAddTask(code: String, silent: Boolean): Future[Results.Result] = {
if (iMain == null) throw new IllegalArgumentException("Interpreter not started yet!")

taskManager.add {
// Add a task using the given state of our streams
StreamState.withStreams {
if (silent) {
iMain.beSilentDuring {
iMain.withoutWarnings {
iMain.interpret(code)
}
} else {
@@ -385,7 +389,7 @@ trait ScalaInterpreterSpecific extends SettingsProducerLike { this: ScalaInterpr
}

private def retrieveLastException: Throwable = {
iMain.beSilentDuring {
iMain.withoutWarnings {
iMain.interpret("_exceptionHack.lastException = lastException")
}
exceptionHack.lastException
Original file line number Diff line number Diff line change
@@ -35,7 +35,8 @@ import scala.jdk.CollectionConverters._
import scala.concurrent.{Await, Future}
import scala.language.reflectiveCalls
import scala.tools.nsc.Settings
import scala.tools.nsc.interpreter.{IR, OutputStream}
import scala.tools.nsc.interpreter.Results
import java.io.OutputStream
import scala.tools.nsc.util.ClassPath
import scala.util.matching.Regex
import scala.concurrent.duration.Duration
@@ -188,7 +189,7 @@ class ScalaInterpreter(private val config:Config = ConfigFactory.load) extends I
}

override def interpret(code: String, silent: Boolean = false, output: Option[OutputStream]):
(Results.Result, Either[ExecuteOutput, ExecuteFailure]) = {
(org.apache.toree.interpreter.Results.Result, Either[ExecuteOutput, ExecuteFailure]) = {
interpretBlock(code, silent)
}

@@ -277,7 +278,7 @@ class ScalaInterpreter(private val config:Config = ConfigFactory.load) extends I
}

protected def interpretBlock(code: String, silent: Boolean = false):
(Results.Result, Either[ExecuteOutput, ExecuteFailure]) = {
(org.apache.toree.interpreter.Results.Result, Either[ExecuteOutput, ExecuteFailure]) = {

logger.trace(s"Interpreting line: $code")

@@ -294,23 +295,23 @@ class ScalaInterpreter(private val config:Config = ConfigFactory.load) extends I
Await.result(futureResultAndExecuteInfo, Duration.Inf)
}

protected def interpretMapToCustomResult(future: Future[IR.Result]): Future[Results.Result] = {
protected def interpretMapToCustomResult(future: Future[Results.Result]): Future[org.apache.toree.interpreter.Results.Result] = {
import scala.concurrent.ExecutionContext.Implicits.global
future map {
case IR.Success => Results.Success
case IR.Error => Results.Error
case IR.Incomplete => Results.Incomplete
case Results.Success => org.apache.toree.interpreter.Results.Success
case Results.Error => org.apache.toree.interpreter.Results.Error
case Results.Incomplete => org.apache.toree.interpreter.Results.Incomplete
} recover {
case ex: ExecutionException => Results.Aborted
case ex: ExecutionException => org.apache.toree.interpreter.Results.Aborted
}
}

protected def interpretMapToResultAndOutput(future: Future[Results.Result]):
Future[(Results.Result, Either[Map[String, String], ExecuteError])] = {
protected def interpretMapToResultAndOutput(future: Future[org.apache.toree.interpreter.Results.Result]):
Future[(org.apache.toree.interpreter.Results.Result, Either[Map[String, String], ExecuteError])] = {
import scala.concurrent.ExecutionContext.Implicits.global

future map {
case result @ (Results.Success | Results.Incomplete) =>
case result @ (org.apache.toree.interpreter.Results.Success | org.apache.toree.interpreter.Results.Incomplete) =>
val lastOutput = lastResultOut.toString("UTF-8").trim
lastResultOut.reset()

@@ -320,17 +321,17 @@ class ScalaInterpreter(private val config:Config = ConfigFactory.load) extends I
val output = obj.map(Displayers.display(_).asScala.toMap).getOrElse(Map.empty)
(result, Left(output))

case Results.Error =>
case org.apache.toree.interpreter.Results.Error =>
val lastOutput = lastResultOut.toString("UTF-8").trim
lastResultOut.reset()

val (obj, defStr, text) = prepareResult(lastOutput)
defStr.foreach(kernel.display.content(MIMEType.PlainText, _))
val output = interpretConstructExecuteError(text.get)
(Results.Error, Right(output))
(org.apache.toree.interpreter.Results.Error, Right(output))

case Results.Aborted =>
(Results.Aborted, Right(null))
case org.apache.toree.interpreter.Results.Aborted =>
(org.apache.toree.interpreter.Results.Aborted, Right(null))
}
}