diff --git a/src/main/scala/com/redis/IO.scala b/src/main/scala/com/redis/IO.scala index c98d6758..a0a581e9 100644 --- a/src/main/scala/com/redis/IO.scala +++ b/src/main/scala/com/redis/IO.scala @@ -17,9 +17,6 @@ trait IO extends Log { def connected = { socket != null && socket.isBound() && !socket.isClosed() && socket.isConnected() && !socket.isInputShutdown() && !socket.isOutputShutdown(); } - def reconnect = { - disconnect && connect - } // Connects the socket, and sets the input and output streams. def connect: Boolean = { diff --git a/src/main/scala/com/redis/Operations.scala b/src/main/scala/com/redis/Operations.scala index ec73baef..a9d46f0d 100644 --- a/src/main/scala/com/redis/Operations.scala +++ b/src/main/scala/com/redis/Operations.scala @@ -122,7 +122,7 @@ trait Operations { self: Redis => // SELECT (index) // selects the DB to connect, defaults to 0 (zero). def select(index: Int): Boolean = - send("SELECT", List(index))(asBoolean match { + sendWithoutAuth("SELECT", List(index))(asBoolean match { case true => { db = index true @@ -154,7 +154,7 @@ trait Operations { self: Redis => // AUTH // auths with the server. def auth(secret: Any)(implicit format: Format): Boolean = - send("AUTH", List(secret))(asBoolean) + sendWithoutAuth("AUTH", List(secret))(asBoolean) // PERSIST (key) // Remove the existing timeout on key, turning the key from volatile (a key with an expire set) diff --git a/src/main/scala/com/redis/Pool.scala b/src/main/scala/com/redis/Pool.scala index 7dae70b4..aa9f628e 100644 --- a/src/main/scala/com/redis/Pool.scala +++ b/src/main/scala/com/redis/Pool.scala @@ -9,11 +9,7 @@ private [redis] class RedisClientFactory(val host: String, val port: Int, val da // when we make an object it's already connected def makeObject = { - val cl = new RedisClient(host, port) - if (database != 0) - cl.select(database) - secret.foreach(cl auth _) - cl + new RedisClient(host, port, database, secret) } // quit & disconnect diff --git a/src/main/scala/com/redis/RedisClient.scala b/src/main/scala/com/redis/RedisClient.scala index 3be897aa..1012a9ea 100644 --- a/src/main/scala/com/redis/RedisClient.scala +++ b/src/main/scala/com/redis/RedisClient.scala @@ -27,6 +27,18 @@ trait Redis extends IO with Protocol { if (reconnect) send(command, args)(result) else throw e } + + def sendWithoutAuth[A](command: String, args: Seq[Any])(result: => A)(implicit format: Format): A = try { + write(Commands.multiBulk(command.getBytes("UTF-8") +: (args map (format.apply)))) + result + } catch { + case e: RedisConnectionException => + if (reconnectWithoutAuth) sendWithoutAuth(command, args)(result) + else throw e + case e: SocketException => + if (reconnectWithoutAuth) sendWithoutAuth(command, args)(result) + else throw e + } def send[A](command: String)(result: => A): A = try { write(Commands.multiBulk(List(command.getBytes("UTF-8")))) @@ -44,23 +56,57 @@ trait Redis extends IO with Protocol { protected def flattenPairs(in: Iterable[Product2[Any, Any]]): List[Any] = in.iterator.flatMap(x => Iterator(x._1, x._2)).toList + + def reconnectWithoutAuth: Boolean = { + disconnect && connect + } + + def reconnect: Boolean = { + disconnect && initialize + } + + protected def initialize : Boolean } -trait RedisCommand extends Redis - with Operations +trait RedisCommand extends Redis with Operations with NodeOperations with StringOperations with ListOperations with SetOperations with SortedSetOperations with HashOperations - with EvalOperations + with EvalOperations { + + val database: Int = 0 + val secret: Option[Any] = None + + override def initialize : Boolean = { + if(connect) { + selectDatabase + authenticate + true + } else { + false + } + } + + private def selectDatabase { + if (database != 0) + select(database) + } + + private def authenticate { + secret.foreach(auth _) + } + +} -class RedisClient(override val host: String, override val port: Int) +class RedisClient(override val host: String, override val port: Int, + override val database: Int = 0, override val secret: Option[Any] = None) extends RedisCommand with PubSub { - connect + initialize def this() = this("localhost", 6379) override def toString = host + ":" + String.valueOf(port) @@ -82,6 +128,7 @@ class RedisClient(override val host: String, override val port: Int) None } } + import serialization.Parse import scala.concurrent.{Promise, Future} @@ -155,6 +202,8 @@ class RedisClient(override val host: String, override val port: Int) val host = parent.host val port = parent.port + override val secret = parent.secret + override val database = parent.database // TODO: Find a better abstraction override def connected = parent.connected @@ -165,5 +214,6 @@ class RedisClient(override val host: String, override val port: Int) override def write(data: Array[Byte]) = parent.write(data) override def readLine = parent.readLine override def readCounted(count: Int) = parent.readCounted(count) + override def initialize = parent.initialize } } diff --git a/src/main/scala/com/redis/RedisProtocol.scala b/src/main/scala/com/redis/RedisProtocol.scala index a6350d2a..1999c4d5 100644 --- a/src/main/scala/com/redis/RedisProtocol.scala +++ b/src/main/scala/com/redis/RedisProtocol.scala @@ -43,7 +43,6 @@ private [redis] trait Reply { def readLine: Array[Byte] def readCounted(c: Int): Array[Byte] - def reconnect: Boolean val integerReply: Reply[Option[Int]] = { case (INT, s) => Some(Parsers.parseInt(s)) @@ -91,8 +90,8 @@ private [redis] trait Reply { } val errReply: Reply[Nothing] = { - case (ERR, s) => reconnect; throw new Exception(Parsers.parseString(s)) - case x => reconnect; throw new Exception("Protocol error: Got " + x + " as initial reply byte") + case (ERR, s) => throw new Exception(Parsers.parseString(s)) + case x => throw new Exception("Protocol error: Got " + x + " as initial reply byte") } def queuedReplyInt: Reply[Option[Int]] = { diff --git a/src/main/scala/com/redis/ds/Deque.scala b/src/main/scala/com/redis/ds/Deque.scala index 8d0b6e7a..f4d20b04 100644 --- a/src/main/scala/com/redis/ds/Deque.scala +++ b/src/main/scala/com/redis/ds/Deque.scala @@ -73,14 +73,16 @@ abstract class RedisDeque[A](val blocking: Boolean = false, val timeoutInSecs: I } } -import com.redis.{Redis, ListOperations} +import com.redis.{Redis, RedisCommand} -class RedisDequeClient(val h: String, val p: Int) { +class RedisDequeClient(val h: String, val p: Int, val d: Int = 0, val s: Option[Any] = None) { def getDeque[A](k: String, blocking: Boolean = false, timeoutInSecs: Int = 0)(implicit format: Format, parse: Parse[A]) = - new RedisDeque(blocking, timeoutInSecs)(format, parse) with ListOperations with Redis { + new RedisDeque(blocking, timeoutInSecs)(format, parse) with RedisCommand { val host = h val port = p val key = k - connect + override val database = d + override val secret = s + initialize } }