diff --git a/src/test/scala/com/boundary/ordasity/ClusterSpec.scala b/src/test/scala/com/boundary/ordasity/ClusterSpec.scala index fa47efb..ae532da 100644 --- a/src/test/scala/com/boundary/ordasity/ClusterSpec.scala +++ b/src/test/scala/com/boundary/ordasity/ClusterSpec.scala @@ -26,7 +26,8 @@ import com.codahale.jerkson.Json import org.apache.zookeeper.data.Stat import org.apache.zookeeper.ZooDefs.Ids import com.twitter.common.zookeeper.{ZooKeeperMap, ZooKeeperClient} -import org.apache.zookeeper.{Watcher, CreateMode, ZooKeeper} +import org.apache.zookeeper.{Watcher, CreateMode, ZooKeeper, WatchedEvent} +import org.apache.zookeeper.Watcher.Event.KeeperState class ClusterSpec extends Spec with Logging { Logging.configure() @@ -272,13 +273,13 @@ class ClusterSpec extends Spec with Logging { cluster.registerWatchers() - cluster.nodes.isInstanceOf[ZooKeeperMap[String]].must(be(true)) - cluster.allWorkUnits.isInstanceOf[ZooKeeperMap[String]].must(be(true)) - cluster.workUnitMap.isInstanceOf[ZooKeeperMap[String]].must(be(true)) + cluster.nodes.isInstanceOf[ZooKeeperMap[_]].must(be(true)) + cluster.allWorkUnits.isInstanceOf[ZooKeeperMap[_]].must(be(true)) + cluster.workUnitMap.isInstanceOf[ZooKeeperMap[_]].must(be(true)) // Not using soft handoff (TODO: assert ZKMap w/soft handoff on) - cluster.handoffRequests.isInstanceOf[HashMap[String, String]].must(be(true)) - cluster.handoffResults.isInstanceOf[HashMap[String, String]].must(be(true)) + cluster.handoffRequests.isInstanceOf[HashMap[_, _]].must(be(true)) + cluster.handoffResults.isInstanceOf[HashMap[_, _]].must(be(true)) // TODO: Test loadMap isinstanceof zkmap with smart balancing on. } @@ -390,7 +391,6 @@ class ClusterSpec extends Spec with Logging { verify.one(mockClusterListener).onJoin(any) verify.one(policy).onConnect() cluster.state.get().must(be(NodeState.Started)) - cluster.watchesRegistered.set(true) } @Test def `connect` { @@ -398,99 +398,109 @@ class ClusterSpec extends Spec with Logging { val policy = mock[BalancingPolicy] cluster.balancingPolicy = policy + // Pretend that we get a SyncConnected event during our register call (synchronously) + mockZKClient.register(any).answersWith { _.getArguments match { + case Array(watcher: Watcher) => watcher.process(new WatchedEvent(null, KeeperState.SyncConnected, null)) + }} + // Pretend that the paths exist for the ZooKeeperMaps we're creating mockZK.exists(any[String], any[Watcher]).returns(mock[Stat]) cluster.connect(Some(mockZKClient)) // Apply same verifications as onConnect, as all of these should be called. - //verify.one(mockClusterListener).onJoin(any) - //verify.one(policy).onConnect() - //cluster.state.get().must(be(NodeState.Started)) - //cluster.watchesRegistered.set(true) + verify.one(mockClusterListener).onJoin(any) + verify.one(policy).onConnect() + cluster.state.get().must(be(NodeState.Started)) } - } - - @Test def `join` { - val (mockZK, mockZKClient) = getMockZK() - cluster.zk = mockZKClient - val policy = mock[BalancingPolicy] - cluster.balancingPolicy = policy + class `join` { + val (mockZK, mockZKClient) = getMockZK() + cluster.zk = mockZKClient - // Should no-op if draining. - cluster.setState(NodeState.Draining) - cluster.join().must(be(NodeState.Draining.toString)) - verify.exactly(0)(mockZKClient).get() + val policy = mock[BalancingPolicy] + cluster.balancingPolicy = policy - // Should no-op if started. - cluster.setState(NodeState.Started) - cluster.join().must(be(NodeState.Started.toString)) - verify.exactly(0)(mockZKClient).get() + // Pretend that we get a SyncConnected event during our register call (synchronously) + mockZKClient.register(any).answersWith { _.getArguments match { + case Array(watcher: Watcher) => watcher.process(new WatchedEvent(null, KeeperState.SyncConnected, null)) + }} - // Pretend that the paths exist for the ZooKeeperMaps we're creating - mockZK.exists(any[String], any[Watcher]).returns(mock[Stat]) + // Pretend that the paths exist for any ZooKeeperMaps we might create + mockZK.exists(any[String], any[Watcher]).returns(mock[Stat]) - cluster.setState(NodeState.Fresh) - cluster.join().must(be(NodeState.Started.toString)) + @Test def `when draining` { + cluster.setState(NodeState.Draining) + cluster.join(Some(mockZKClient)).must(be(NodeState.Draining.toString)) - // Apply same verifications as connect, as all of these should be called. - verify.one(mockClusterListener).onJoin(any) - verify.one(policy).onConnect() - cluster.state.get().must(be(NodeState.Started)) - cluster.watchesRegistered.set(true) - } + // Should no-op if draining. + verify.no(mockClusterListener).onJoin(any) + verify.no(policy).onConnect() + cluster.state.get().must(be(NodeState.Draining)) + cluster.watchesRegistered.set(false) + } + @Test def `after started` { + cluster.setState(NodeState.Started) + cluster.join(Some(mockZKClient)).must(be(NodeState.Started.toString)) - @Test def `join after shutdown` { - val (mockZK, mockZKClient) = getMockZK() - cluster.zk = mockZKClient + // Should no-op if already started. + verify.no(mockClusterListener).onJoin(any) + verify.no(policy).onConnect() + cluster.state.get().must(be(NodeState.Started)) + cluster.watchesRegistered.set(false) + } - val policy = mock[BalancingPolicy] - cluster.balancingPolicy = policy + @Test def `when fresh` { + cluster.setState(NodeState.Fresh) + cluster.join(Some(mockZKClient)).must(be(NodeState.Started.toString)) - // Pretend that the paths exist for the ZooKeeperMaps we're creating - mockZK.exists(any[String], any[Watcher]).returns(mock[Stat]) + // Apply same verifications as connect, as all of these should be called. + verify.one(mockClusterListener).onJoin(any) + verify.one(policy).onConnect() + cluster.state.get().must(be(NodeState.Started)) + } - cluster.setState(NodeState.Shutdown) - cluster.join().must(be(NodeState.Started.toString)) + @Test def `after shutdown` { + cluster.setState(NodeState.Shutdown) + cluster.join(Some(mockZKClient)).must(be(NodeState.Shutdown.toString)) - // Apply same verifications as connect, as all of these should be called. - verify.one(mockClusterListener).onJoin(any) - verify.one(policy).onConnect() - cluster.state.get().must(be(NodeState.Started)) - cluster.watchesRegistered.set(true) - } + // Should no-op if shutdown. + verify.no(mockClusterListener).onJoin(any) + verify.no(policy).onConnect() + cluster.state.get().must(be(NodeState.Shutdown)) + } + } + @Test def `cluster constructor` { + val cluster = new Cluster("foo", mockClusterListener, config) + cluster.name.must(be("foo")) + cluster.listener.must(be(mockClusterListener)) + } - @Test def `cluster constructor` { - val cluster = new Cluster("foo", mockClusterListener, config) - cluster.name.must(be("foo")) - cluster.listener.must(be(mockClusterListener)) - } + @Test def `getOrElse String` { + val foo = new HashMap[String, String] + foo.put("foo", "bar") - @Test def `getOrElse String` { - val foo = new HashMap[String, String] - foo.put("foo", "bar") + cluster.getOrElse(foo, "foo", "taco").must(be("bar")) + cluster.getOrElse(foo, "bar", "taco").must(be("taco")) + } - cluster.getOrElse(foo, "foo", "taco").must(be("bar")) - cluster.getOrElse(foo, "bar", "taco").must(be("taco")) - } + @Test def `getOrElse Double` { + val foo = new HashMap[String, Double] + foo.put("foo", 0.01d) + cluster.getOrElse(foo, "foo", 0.02d).must(be(0.01d)) + cluster.getOrElse(foo, "bar", 0.02d).must(be(0.02d)) + } - @Test def `getOrElse Double` { - val foo = new HashMap[String, Double] - foo.put("foo", 0.01d) - cluster.getOrElse(foo, "foo", 0.02d).must(be(0.01d)) - cluster.getOrElse(foo, "bar", 0.02d).must(be(0.02d)) - } + def getMockZK() : (ZooKeeper, ZooKeeperClient) = { + val mockZK = mock[ZooKeeper] + val mockZKClient = mock[ZooKeeperClient] + mockZKClient.get().returns(mockZK) + (mockZK, mockZKClient) + } - def getMockZK() : (ZooKeeper, ZooKeeperClient) = { - val mockZK = mock[ZooKeeper] - val mockZKClient = mock[ZooKeeperClient] - mockZKClient.get().returns(mockZK) - (mockZK, mockZKClient) } - }