Skip to content

Commit 956f6de

Browse files
author
arnett, stu
committed
v2.0.0
1 parent 44daedf commit 956f6de

19 files changed

+1283
-157
lines changed

src/main/java/com/emc/rest/smart/Host.java

Lines changed: 84 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@
2929
import org.apache.log4j.LogMF;
3030
import org.apache.log4j.Logger;
3131

32+
import java.util.ArrayDeque;
3233
import java.util.Date;
33-
import java.util.LinkedList;
3434
import java.util.Queue;
3535

3636
/**
@@ -46,12 +46,13 @@
4646
public class Host implements HostStats {
4747
private static final Logger l4j = Logger.getLogger(Host.class);
4848

49-
protected static final int DEFAULT_RESPONSE_WINDOW_SIZE = 20;
50-
protected static final int DEFAULT_ERROR_COOL_DOWN_SECS = 30;
49+
public static final int DEFAULT_RESPONSE_WINDOW_SIZE = 25;
50+
public static final int DEFAULT_ERROR_COOL_DOWN_SECS = 10;
5151

5252
private String name;
53-
protected int responseWindowSize;
54-
protected int errorCoolDownSecs;
53+
private boolean healthy = true;
54+
protected int responseWindowSize = DEFAULT_RESPONSE_WINDOW_SIZE;
55+
protected int errorCoolDownSecs = DEFAULT_ERROR_COOL_DOWN_SECS;
5556

5657
protected int openConnections;
5758
protected long lastConnectionTime;
@@ -60,24 +61,13 @@ public class Host implements HostStats {
6061
protected long consecutiveErrors;
6162
protected long responseQueueAverage;
6263

63-
protected Queue<Long> responseQueue = new LinkedList<Long>();
64+
protected Queue<Long> responseQueue = new ArrayDeque<Long>();
6465

6566
/**
66-
* Uses a default error cool down of 30 secs and a response window size of 20.
67+
* @param name the host name or IP address of this host
6768
*/
6869
public Host(String name) {
69-
this(name, DEFAULT_RESPONSE_WINDOW_SIZE, DEFAULT_ERROR_COOL_DOWN_SECS);
70-
}
71-
72-
/**
73-
* @param name the host name or IP address of this host
74-
* @param errorCoolDownSecs the cool down period for errors (number of seconds after an error when the host is
75-
* considered normalized). compounded for multiple consecutive errors
76-
*/
77-
public Host(String name, int responseWindowSize, int errorCoolDownSecs) {
7870
this.name = name;
79-
this.responseWindowSize = responseWindowSize;
80-
this.errorCoolDownSecs = errorCoolDownSecs;
8171
}
8272

8373
public synchronized void connectionOpened() {
@@ -102,7 +92,7 @@ public synchronized void callComplete(long duration, boolean isError) {
10292

10393
// log response time
10494
responseQueue.add(duration);
105-
if (responseQueue.size() > responseWindowSize)
95+
while (responseQueue.size() > responseWindowSize)
10696
responseQueue.remove();
10797

10898
// recalculate average
@@ -120,17 +110,39 @@ public String getName() {
120110
return name;
121111
}
122112

123-
public synchronized long getResponseIndex() {
124-
// error adjustment adjust the index up based on the number of consecutive errors
125-
long errorAdjustment = consecutiveErrors * errorCoolDownSecs * 1000;
113+
public boolean isHealthy() {
114+
return healthy;
115+
}
116+
117+
public void setHealthy(boolean healthy) {
118+
this.healthy = healthy;
119+
}
120+
121+
public long getResponseIndex() {
122+
long currentTime = System.currentTimeMillis();
126123

127-
// open connection adjustment adjusts the index up based on the number of open connections to the host
128-
long openConnectionAdjustment = openConnections * errorCoolDownSecs; // cool down secs as ms instead
124+
synchronized (this) {
125+
// error adjustment adjust the index up based on the number of consecutive errors
126+
long errorAdjustment = consecutiveErrors * errorCoolDownSecs * 1000;
129127

130-
// dormant adjustment adjusts the index down based on how long it's been since the host was last used
131-
long msSinceLastUse = System.currentTimeMillis() - lastConnectionTime;
128+
// open connection adjustment adjusts the index up based on the number of open connections to the host
129+
long openConnectionAdjustment = openConnections * errorCoolDownSecs; // cool down secs as ms instead
132130

133-
return responseQueueAverage + errorAdjustment + openConnectionAdjustment - msSinceLastUse;
131+
// dormant adjustment adjusts the index down based on how long it's been since the host was last used
132+
long msSinceLastUse = currentTime - lastConnectionTime;
133+
134+
return responseQueueAverage + errorAdjustment + openConnectionAdjustment - msSinceLastUse;
135+
}
136+
}
137+
138+
/**
139+
* Resets historical metrics. Use with care!
140+
*/
141+
public synchronized void resetStats() {
142+
totalConnections = openConnections;
143+
totalErrors = 0;
144+
consecutiveErrors = 0;
145+
responseQueueAverage = 0;
134146
}
135147

136148
@Override
@@ -158,9 +170,54 @@ public long getResponseQueueAverage() {
158170
return responseQueueAverage;
159171
}
160172

173+
public long getConsecutiveErrors() {
174+
return consecutiveErrors;
175+
}
176+
177+
@Override
178+
public boolean equals(Object o) {
179+
if (this == o) return true;
180+
if (!(o instanceof Host)) return false;
181+
182+
Host host = (Host) o;
183+
184+
return getName().equals(host.getName());
185+
}
186+
187+
@Override
188+
public int hashCode() {
189+
return getName().hashCode();
190+
}
191+
161192
@Override
162193
public String toString() {
163194
return String.format("%s{totalConnections=%d, totalErrors=%d, openConnections=%d, lastConnectionTime=%s, responseQueueAverage=%d}",
164195
name, totalConnections, totalErrors, openConnections, new Date(lastConnectionTime).toString(), responseQueueAverage);
165196
}
197+
198+
public int getResponseWindowSize() {
199+
return responseWindowSize;
200+
}
201+
202+
public void setResponseWindowSize(int responseWindowSize) {
203+
this.responseWindowSize = responseWindowSize;
204+
}
205+
206+
public int getErrorCoolDownSecs() {
207+
return errorCoolDownSecs;
208+
}
209+
210+
public void setErrorCoolDownSecs(int errorCoolDownSecs) {
211+
this.errorCoolDownSecs = errorCoolDownSecs;
212+
}
213+
214+
public Host withResponseWindowSize(int responseWindowSize) {
215+
setResponseWindowSize(responseWindowSize);
216+
return this;
217+
}
218+
219+
public Host withErrorCoolDownSecs(int errorCoolDownSecs) {
220+
setErrorCoolDownSecs(errorCoolDownSecs);
221+
return this;
222+
}
166223
}

src/main/java/com/emc/rest/smart/HostListProvider.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,12 @@
2929
import java.util.List;
3030

3131
public interface HostListProvider {
32-
public abstract List<String> getHostList();
32+
List<Host> getHostList();
33+
34+
/**
35+
* If this completes without throwing an exception, the host is considered healthy
36+
* (<code>host.setHealthy(true)</code> is called). Otherwise, the host is considered unhealthy/down
37+
* (<code>host.setHealthy(false)</code> is called).
38+
*/
39+
void runHealthCheck(Host host);
3340
}
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Copyright (c) 2015, EMC Corporation.
3+
* Redistribution and use in source and binary forms, with or without modification,
4+
* are permitted provided that the following conditions are met:
5+
*
6+
* + Redistributions of source code must retain the above copyright notice,
7+
* this list of conditions and the following disclaimer.
8+
* + Redistributions in binary form must reproduce the above copyright
9+
* notice, this list of conditions and the following disclaimer in the
10+
* documentation and/or other materials provided with the distribution.
11+
* + The name of EMC Corporation may not be used to endorse or promote
12+
* products derived from this software without specific prior written
13+
* permission.
14+
*
15+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
16+
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
17+
* TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
18+
* PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
19+
* BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
20+
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
21+
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
22+
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
23+
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
24+
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
25+
* POSSIBILITY OF SUCH DAMAGE.
26+
*/
27+
package com.emc.rest.smart;
28+
29+
import java.util.Map;
30+
31+
public interface HostVetoRule {
32+
boolean shouldVeto(Host host, Map<String, Object> requestProperties);
33+
}

src/main/java/com/emc/rest/smart/LoadBalancer.java

Lines changed: 58 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,34 +26,35 @@
2626
*/
2727
package com.emc.rest.smart;
2828

29-
import java.util.ArrayList;
30-
import java.util.Iterator;
31-
import java.util.List;
32-
import java.util.Queue;
33-
import java.util.concurrent.ConcurrentLinkedQueue;
29+
import java.util.*;
3430

3531
public class LoadBalancer {
36-
private final Queue<Host> hosts = new ConcurrentLinkedQueue<Host>();
32+
private final Queue<Host> hosts = new ArrayDeque<Host>();
33+
private List<HostVetoRule> vetoRules;
3734

38-
public LoadBalancer(List<String> initialHosts) {
35+
public LoadBalancer(List<Host> initialHosts) {
3936

4037
// seed the host map
41-
for (String host : initialHosts) {
42-
hosts.add(new Host(host));
43-
}
38+
hosts.addAll(initialHosts);
4439
}
4540

4641
/**
4742
* Returns the host with the lowest response index.
4843
*/
49-
public Host getTopHost() {
44+
public Host getTopHost(Map<String, Object> requestProperties) {
5045
Host topHost = null;
5146

5247
long lowestIndex = Long.MAX_VALUE;
5348

5449
synchronized (hosts) {
5550
for (Host host : hosts) {
5651

52+
// apply any veto rules
53+
if (shouldVeto(host, requestProperties)) continue;
54+
55+
// if the host is unhealthy/down, ignore it
56+
if (!host.isHealthy()) continue;
57+
5758
// get response index for a host
5859
long hostIndex = host.getResponseIndex();
5960

@@ -72,16 +73,41 @@ public Host getTopHost() {
7273
return topHost;
7374
}
7475

76+
protected boolean shouldVeto(Host host, Map<String, Object> requestProperties) {
77+
if (vetoRules != null) {
78+
for (HostVetoRule vetoRule : vetoRules) {
79+
if (vetoRule.shouldVeto(host, requestProperties)) return true;
80+
}
81+
}
82+
return false;
83+
}
84+
85+
/**
86+
* Returns a list of all known hosts. This list is a clone; modification will not affect the load balancer
87+
*/
88+
public synchronized List<Host> getAllHosts() {
89+
return new ArrayList<Host>(hosts);
90+
}
91+
7592
/**
7693
* Returns stats for all active hosts in this load balancer
7794
*/
78-
public HostStats[] getHostStats() {
95+
public synchronized HostStats[] getHostStats() {
7996
return hosts.toArray(new HostStats[hosts.size()]);
8097
}
8198

82-
protected void updateHosts(List<String> updatedHosts) throws Exception {
99+
/**
100+
* Resets connection metrics. Use with care!
101+
*/
102+
public void resetStats() {
103+
for (Host host : getAllHosts()) {
104+
host.resetStats();
105+
}
106+
}
107+
108+
protected void updateHosts(List<Host> updatedHosts) throws Exception {
83109
// don't modify the parameter
84-
List<String> hostList = new ArrayList<String>(updatedHosts);
110+
List<Host> hostList = new ArrayList<Host>(updatedHosts);
85111

86112
synchronized (hosts) {
87113
// remove hosts from stored list that are not present in updated list
@@ -90,10 +116,10 @@ protected void updateHosts(List<String> updatedHosts) throws Exception {
90116
while (hostI.hasNext()) {
91117
Host host = hostI.next();
92118
boolean stillThere = false;
93-
Iterator<String> hostListI = hostList.iterator();
119+
Iterator<Host> hostListI = hostList.iterator();
94120
while (hostListI.hasNext()) {
95-
String hostFromUpdate = hostListI.next();
96-
if (host.getName().equalsIgnoreCase(hostFromUpdate)) {
121+
Host hostFromUpdate = hostListI.next();
122+
if (host.equals(hostFromUpdate)) {
97123

98124
// this host is in both the stored list and the updated list
99125
stillThere = true;
@@ -107,9 +133,22 @@ protected void updateHosts(List<String> updatedHosts) throws Exception {
107133
}
108134

109135
// what's left in the updated list are new hosts, so add them
110-
for (String newHost : hostList) {
111-
hosts.add(new Host(newHost));
136+
for (Host newHost : hostList) {
137+
hosts.add(newHost);
112138
}
113139
}
114140
}
141+
142+
public List<HostVetoRule> getVetoRules() {
143+
return vetoRules;
144+
}
145+
146+
public void setVetoRules(List<HostVetoRule> vetoRules) {
147+
this.vetoRules = vetoRules;
148+
}
149+
150+
public LoadBalancer withVetoRules(HostVetoRule... vetoRules) {
151+
setVetoRules(Arrays.asList(vetoRules));
152+
return this;
153+
}
115154
}

src/main/java/com/emc/rest/smart/PollingDaemon.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,24 +54,43 @@ public void run() {
5454
LoadBalancer loadBalancer = smartConfig.getLoadBalancer();
5555
HostListProvider hostListProvider = smartConfig.getHostListProvider();
5656

57-
if (smartConfig.isDisablePolling()) {
58-
l4j.info("host polling is disabled; not updating hosts");
57+
if (!smartConfig.isHostUpdateEnabled()) {
58+
l4j.info("host update is disabled; not updating hosts");
5959
} else if (hostListProvider == null) {
6060
l4j.info("no host list provider; not updating hosts");
6161
} else {
6262
try {
6363
loadBalancer.updateHosts(hostListProvider.getHostList());
6464
} catch (Throwable t) {
65-
l4j.warn("Unable to enumerate servers", t);
65+
l4j.warn("unable to enumerate servers", t);
66+
}
67+
}
68+
69+
if (!smartConfig.isHealthCheckEnabled()) {
70+
l4j.info("health check is disabled; not checking hosts");
71+
} else if (hostListProvider == null) {
72+
l4j.info("no host list provider; not checking hosts");
73+
} else {
74+
for (Host host : loadBalancer.getAllHosts()) {
75+
try {
76+
hostListProvider.runHealthCheck(host);
77+
host.setHealthy(true);
78+
LogMF.debug(l4j, "health check successful for {0}; host is marked healthy", host.getName());
79+
} catch (Throwable t) {
80+
host.setHealthy(false);
81+
l4j.warn("health check failed for " + host.getName() + "; host is marked unhealthy", t);
82+
}
6683
}
6784
}
6885

6986
long callTime = System.currentTimeMillis() - start;
7087
try {
71-
LogMF.debug(l4j, "polling daemon finished; sleeping for {0} seconds..", smartConfig.getPollInterval());
72-
Thread.sleep(smartConfig.getPollInterval() * 1000 - callTime);
88+
long sleepTime = smartConfig.getPollInterval() * 1000 - callTime;
89+
if (sleepTime < 0) sleepTime = 0;
90+
LogMF.debug(l4j, "polling daemon finished; will poll again in {0}ms..", sleepTime);
91+
if (sleepTime > 0) Thread.sleep(sleepTime);
7392
} catch (InterruptedException e) {
74-
l4j.warn("Interrupted while sleeping");
93+
l4j.warn("interrupted while sleeping", e);
7594
}
7695
}
7796
}

0 commit comments

Comments
 (0)