Skip to content
158 changes: 84 additions & 74 deletions src/test/scala/com/boundary/ordasity/ClusterSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ import com.codahale.jerkson.Json
import org.apache.zookeeper.data.Stat
import org.apache.zookeeper.ZooDefs.Ids
import com.twitter.common.zookeeper.{ZooKeeperMap, ZooKeeperClient}
import org.apache.zookeeper.{Watcher, CreateMode, ZooKeeper}
import org.apache.zookeeper.{Watcher, CreateMode, ZooKeeper, WatchedEvent}
import org.apache.zookeeper.Watcher.Event.KeeperState

class ClusterSpec extends Spec with Logging {
Logging.configure()
Expand Down Expand Up @@ -272,13 +273,13 @@ class ClusterSpec extends Spec with Logging {

cluster.registerWatchers()

cluster.nodes.isInstanceOf[ZooKeeperMap[String]].must(be(true))
cluster.allWorkUnits.isInstanceOf[ZooKeeperMap[String]].must(be(true))
cluster.workUnitMap.isInstanceOf[ZooKeeperMap[String]].must(be(true))
cluster.nodes.isInstanceOf[ZooKeeperMap[_]].must(be(true))
cluster.allWorkUnits.isInstanceOf[ZooKeeperMap[_]].must(be(true))
cluster.workUnitMap.isInstanceOf[ZooKeeperMap[_]].must(be(true))

// Not using soft handoff (TODO: assert ZKMap w/soft handoff on)
cluster.handoffRequests.isInstanceOf[HashMap[String, String]].must(be(true))
cluster.handoffResults.isInstanceOf[HashMap[String, String]].must(be(true))
cluster.handoffRequests.isInstanceOf[HashMap[_, _]].must(be(true))
cluster.handoffResults.isInstanceOf[HashMap[_, _]].must(be(true))

// TODO: Test loadMap isinstanceof zkmap with smart balancing on.
}
Expand Down Expand Up @@ -390,107 +391,116 @@ class ClusterSpec extends Spec with Logging {
verify.one(mockClusterListener).onJoin(any)
verify.one(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
cluster.watchesRegistered.set(true)
}

@Test def `connect` {
val (mockZK, mockZKClient) = getMockZK()
val policy = mock[BalancingPolicy]
cluster.balancingPolicy = policy

// Pretend that we get a SyncConnected event during our register call (synchronously)
mockZKClient.register(any).answersWith { _.getArguments match {
case Array(watcher: Watcher) => watcher.process(new WatchedEvent(null, KeeperState.SyncConnected, null))
}}

// Pretend that the paths exist for the ZooKeeperMaps we're creating
mockZK.exists(any[String], any[Watcher]).returns(mock[Stat])

cluster.connect(Some(mockZKClient))

// Apply same verifications as onConnect, as all of these should be called.
//verify.one(mockClusterListener).onJoin(any)
//verify.one(policy).onConnect()
//cluster.state.get().must(be(NodeState.Started))
//cluster.watchesRegistered.set(true)
verify.one(mockClusterListener).onJoin(any)
verify.one(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
}
}

@Test def `join` {
val (mockZK, mockZKClient) = getMockZK()
cluster.zk = mockZKClient

val policy = mock[BalancingPolicy]
cluster.balancingPolicy = policy
class `join` {
val (mockZK, mockZKClient) = getMockZK()
cluster.zk = mockZKClient

// Should no-op if draining.
cluster.setState(NodeState.Draining)
cluster.join().must(be(NodeState.Draining.toString))
verify.exactly(0)(mockZKClient).get()
val policy = mock[BalancingPolicy]
cluster.balancingPolicy = policy

// Should no-op if started.
cluster.setState(NodeState.Started)
cluster.join().must(be(NodeState.Started.toString))
verify.exactly(0)(mockZKClient).get()
// Pretend that we get a SyncConnected event during our register call (synchronously)
mockZKClient.register(any).answersWith { _.getArguments match {
case Array(watcher: Watcher) => watcher.process(new WatchedEvent(null, KeeperState.SyncConnected, null))
}}

// Pretend that the paths exist for the ZooKeeperMaps we're creating
mockZK.exists(any[String], any[Watcher]).returns(mock[Stat])
// Pretend that the paths exist for any ZooKeeperMaps we might create
mockZK.exists(any[String], any[Watcher]).returns(mock[Stat])

cluster.setState(NodeState.Fresh)
cluster.join().must(be(NodeState.Started.toString))
@Test def `when draining` {
cluster.setState(NodeState.Draining)
cluster.join(Some(mockZKClient)).must(be(NodeState.Draining.toString))

// Apply same verifications as connect, as all of these should be called.
verify.one(mockClusterListener).onJoin(any)
verify.one(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
cluster.watchesRegistered.set(true)
}
// Should no-op if draining.
verify.no(mockClusterListener).onJoin(any)
verify.no(policy).onConnect()
cluster.state.get().must(be(NodeState.Draining))
cluster.watchesRegistered.set(false)
}

@Test def `after started` {
cluster.setState(NodeState.Started)
cluster.join(Some(mockZKClient)).must(be(NodeState.Started.toString))

@Test def `join after shutdown` {
val (mockZK, mockZKClient) = getMockZK()
cluster.zk = mockZKClient
// Should no-op if already started.
verify.no(mockClusterListener).onJoin(any)
verify.no(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
cluster.watchesRegistered.set(false)
}

val policy = mock[BalancingPolicy]
cluster.balancingPolicy = policy
@Test def `when fresh` {
cluster.setState(NodeState.Fresh)
cluster.join(Some(mockZKClient)).must(be(NodeState.Started.toString))

// Pretend that the paths exist for the ZooKeeperMaps we're creating
mockZK.exists(any[String], any[Watcher]).returns(mock[Stat])
// Apply same verifications as connect, as all of these should be called.
verify.one(mockClusterListener).onJoin(any)
verify.one(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
}

cluster.setState(NodeState.Shutdown)
cluster.join().must(be(NodeState.Started.toString))
@Test def `after shutdown` {
cluster.setState(NodeState.Shutdown)
cluster.join(Some(mockZKClient)).must(be(NodeState.Shutdown.toString))

// Apply same verifications as connect, as all of these should be called.
verify.one(mockClusterListener).onJoin(any)
verify.one(policy).onConnect()
cluster.state.get().must(be(NodeState.Started))
cluster.watchesRegistered.set(true)
}
// Should no-op if shutdown.
verify.no(mockClusterListener).onJoin(any)
verify.no(policy).onConnect()
cluster.state.get().must(be(NodeState.Shutdown))
}
}

@Test def `cluster constructor` {
val cluster = new Cluster("foo", mockClusterListener, config)
cluster.name.must(be("foo"))
cluster.listener.must(be(mockClusterListener))
}

@Test def `cluster constructor` {
val cluster = new Cluster("foo", mockClusterListener, config)
cluster.name.must(be("foo"))
cluster.listener.must(be(mockClusterListener))
}

@Test def `getOrElse String` {
val foo = new HashMap[String, String]
foo.put("foo", "bar")

@Test def `getOrElse String` {
val foo = new HashMap[String, String]
foo.put("foo", "bar")
cluster.getOrElse(foo, "foo", "taco").must(be("bar"))
cluster.getOrElse(foo, "bar", "taco").must(be("taco"))
}

cluster.getOrElse(foo, "foo", "taco").must(be("bar"))
cluster.getOrElse(foo, "bar", "taco").must(be("taco"))
}
@Test def `getOrElse Double` {
val foo = new HashMap[String, Double]
foo.put("foo", 0.01d)
cluster.getOrElse(foo, "foo", 0.02d).must(be(0.01d))
cluster.getOrElse(foo, "bar", 0.02d).must(be(0.02d))
}

@Test def `getOrElse Double` {
val foo = new HashMap[String, Double]
foo.put("foo", 0.01d)
cluster.getOrElse(foo, "foo", 0.02d).must(be(0.01d))
cluster.getOrElse(foo, "bar", 0.02d).must(be(0.02d))
}
def getMockZK() : (ZooKeeper, ZooKeeperClient) = {
val mockZK = mock[ZooKeeper]
val mockZKClient = mock[ZooKeeperClient]
mockZKClient.get().returns(mockZK)
(mockZK, mockZKClient)
}

def getMockZK() : (ZooKeeper, ZooKeeperClient) = {
val mockZK = mock[ZooKeeper]
val mockZKClient = mock[ZooKeeperClient]
mockZKClient.get().returns(mockZK)
(mockZK, mockZKClient)
}

}