From 39b80652d0e1abb5cdfde666e8d3ed91a598968c Mon Sep 17 00:00:00 2001 From: Ioakeim Perros Date: Wed, 6 Aug 2014 18:00:10 +0300 Subject: [PATCH 1/2] Top-k algorithm first implementation --- topk-plugin/README.md | 2 + topk-plugin/pom.xml | 91 ++++++++++++ .../java/eu/leads/distsum/ComChannel.java | 50 +++++++ .../main/java/eu/leads/distsum/Constrain.java | 26 ++++ .../java/eu/leads/distsum/Coordinator.java | 135 ++++++++++++++++++ .../main/java/eu/leads/distsum/Message.java | 88 ++++++++++++ .../src/main/java/eu/leads/distsum/Node.java | 37 +++++ .../main/java/eu/leads/distsum/Worker.java | 109 ++++++++++++++ .../test/java/eu/leads/distsum/UnitTest.java | 128 +++++++++++++++++ 9 files changed, 666 insertions(+) create mode 100644 topk-plugin/README.md create mode 100644 topk-plugin/pom.xml create mode 100644 topk-plugin/src/main/java/eu/leads/distsum/ComChannel.java create mode 100644 topk-plugin/src/main/java/eu/leads/distsum/Constrain.java create mode 100644 topk-plugin/src/main/java/eu/leads/distsum/Coordinator.java create mode 100644 topk-plugin/src/main/java/eu/leads/distsum/Message.java create mode 100644 topk-plugin/src/main/java/eu/leads/distsum/Node.java create mode 100644 topk-plugin/src/main/java/eu/leads/distsum/Worker.java create mode 100644 topk-plugin/src/test/java/eu/leads/distsum/UnitTest.java diff --git a/topk-plugin/README.md b/topk-plugin/README.md new file mode 100644 index 00000000..4a537f20 --- /dev/null +++ b/topk-plugin/README.md @@ -0,0 +1,2 @@ +Distributed top-k monitoring (Babcock, Olston SIGMOD 2003) +on top of the Infispan Communication layer. diff --git a/topk-plugin/pom.xml b/topk-plugin/pom.xml new file mode 100644 index 00000000..0f82dd9f --- /dev/null +++ b/topk-plugin/pom.xml @@ -0,0 +1,91 @@ + + + 4.0.0 + + eu.leads + distsum + 1.0-SNAPSHOT + + + + The LEADS project + http://www.leads-project.eu/ + + + + 7.0.0.Alpha4 + + + + + + + org.infinispan + infinispan-core + ${infinispan.version} + + + org.infinispan + infinispan-client-hotrod + ${infinispan.version} + + + + + org.testng + testng + test + 6.8 + + + org.infinispan + infinispan-core + ${infinispan.version} + test-jar + test + + + org.infinispan + infinispan-client-hotrod + ${infinispan.version} + test-jar + test + + + org.infinispan + infinispan-server-core + ${infinispan.version} + test-jar + test + + + org.infinispan + infinispan-server-hotrod + ${infinispan.version} + test-jar + test + + + org.infinispan + infinispan-server-hotrod + ${infinispan.version} + test + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + 3.1 + + 1.6 + 1.6 + + + + + + diff --git a/topk-plugin/src/main/java/eu/leads/distsum/ComChannel.java b/topk-plugin/src/main/java/eu/leads/distsum/ComChannel.java new file mode 100644 index 00000000..1dcc1197 --- /dev/null +++ b/topk-plugin/src/main/java/eu/leads/distsum/ComChannel.java @@ -0,0 +1,50 @@ +package eu.leads.distsum; + +import org.infinispan.Cache; +import org.infinispan.filter.KeyValueFilter; +import org.infinispan.metadata.Metadata; + +/** + * + * @author vagvaz + * @author otrack + * + * Created by vagvaz on 7/5/14. + * + * Communication channel just just a map of String,Node to send messages + */ +public class ComChannel { + + private Cache nodes; + + public ComChannel(Cache c) { + nodes = c; + } + + // Add a node to the map + public void register(final String id, Node node){ + KeyValueFilter filter= new KeyValueFilter(){ + public boolean accept(String i, Message msg, Metadata metadata){ + return id.equals(i); + } + }; + nodes.put(id, Message.EMPTYMSG); // to create the entry + nodes.addListener(node, filter, null); + } + + //Send messsage to node id + public void sentTo(String id, Message message){ + nodes.put(id, message); + } + + // Broadcast a message to all nodes, but coordinator + // The coordinator takes as a result the replies of the nodes + public void broadCast(Message message){ + for(String node: nodes.keySet()){ + if(!node.equals(Node.COORDINATOR)){ + nodes.put(node,message); + } + } + } + +} diff --git a/topk-plugin/src/main/java/eu/leads/distsum/Constrain.java b/topk-plugin/src/main/java/eu/leads/distsum/Constrain.java new file mode 100644 index 00000000..9d14d7af --- /dev/null +++ b/topk-plugin/src/main/java/eu/leads/distsum/Constrain.java @@ -0,0 +1,26 @@ +package eu.leads.distsum; + +/** + * @author vagvaz + * @author otrack + * + * Created by vagvaz on 7/5/14. + * A simple constrain class with upper and lower bound + */ +public class Constrain { + private double lowBound; + private double upperBound; + public Constrain(double low, double high) { + lowBound = low; + upperBound = high; + } + + public boolean violates(double value){ + if(value < lowBound + || value > upperBound){ + return true; + } + return false; + } + +} diff --git a/topk-plugin/src/main/java/eu/leads/distsum/Coordinator.java b/topk-plugin/src/main/java/eu/leads/distsum/Coordinator.java new file mode 100644 index 00000000..25969fc6 --- /dev/null +++ b/topk-plugin/src/main/java/eu/leads/distsum/Coordinator.java @@ -0,0 +1,135 @@ +package eu.leads.distsum; + +import java.util.HashMap; +import java.util.Map; + +/** + * + * @author vagvaz + * @author otrack + * + * Created by vagvaz on 7/5/14. + * The coordinator that maintains the global sum. + * And computes the localConstrains given to the worker nodes + * + */ +public class Coordinator extends Node{ + + Map localValues; //the local values of all worker nodes + Map constrains; //the local constrains of all worker nodes + int globalSum; //global sum + ComChannel channel; //communication medium + + public Coordinator(ComChannel com) { + super(Node.COORDINATOR,com); + localValues = new HashMap(); + constrains = new HashMap(); + this.channel = com; + } + + /** + * Getter for property 'localValues'. + * + * @return Value for property 'localValues'. + */ + public Map getLocalValues() { + return localValues; + } + + /** + * Setter for property 'localValues'. + * + * @param localValues Value to set for property 'localValues'. + */ + public void setLocalValues(Map localValues) { + this.localValues = localValues; + recomputeValue(); + } + + /** + * Getter for property 'constrains'. + * + * @return Value for property 'constrains'. + */ + public Map getConstrains() { + return constrains; + } + + /** + * Setter for property 'constrains'. + * + * @param constrains Value to set for property 'constrains'. + */ + public void setConstrains(Map constrains) { + this.constrains = constrains; + } + + /** + * Getter for property 'globalSum'. + * + * @return Value for property 'globalSum'. + */ + public int getGlobalSum() { + return globalSum; + } + + /** + * Setter for property 'globalSum'. + * + * @param globalSum Value to set for property 'globalSum'. + */ + public void setGlobalSum(int globalSum) { + this.globalSum = globalSum; + } + + //Receive message from worker. This is called only when there is a violation on a worker node + @Override + public void receiveMessage(Message msg) { + if (msg.getType().equals("reply")) { + localValues.put(msg.getFrom(), (Integer) msg.getBody()); + //Compute new global sum + recomputeValue(); + //Compute constrains + computeConstrains(); + //Send Constrains to workers + sendConstrains(); + } else if (msg.getType().equals("violation")) { + channel.broadCast(new Message(Node.COORDINATOR, "get", null)); + } else { + throw new RuntimeException("Invalid message"); + } + } + + //recompute global sum + private void recomputeValue() { + this.globalSum = 0; + //Iterate over local values and sum to compute the global sum globalSum + for(Map.Entry entry : localValues.entrySet()){ + this.globalSum += entry.getValue(); + } + } + + //compute constrains + private void computeConstrains() { + //Compute the drift each constrain will be equal to + // localValue - drift...localValue + drift + int drift = (int) Math.ceil(0.1*this.globalSum)/localValues.size(); + for(Map.Entry entry : localValues.entrySet()){ + + //get current local globalSum; + int localValue = entry.getValue(); + //put new constrain to map + constrains.put(entry.getKey(),new Constrain(localValue-drift,localValue+drift)); + } + } + + //Send constrains back to workers + private void sendConstrains() { + //Send to each worker node the new constrain + for ( Map.Entry entry : constrains.entrySet() ) { + channel.sentTo(entry.getKey(), new Message(Node.COORDINATOR, "constrain", entry.getValue())); + } + } + + +} diff --git a/topk-plugin/src/main/java/eu/leads/distsum/Message.java b/topk-plugin/src/main/java/eu/leads/distsum/Message.java new file mode 100644 index 00000000..f6dfcfff --- /dev/null +++ b/topk-plugin/src/main/java/eu/leads/distsum/Message.java @@ -0,0 +1,88 @@ +package eu.leads.distsum; + +import java.io.Serializable; + +/** + * + * @author vagvaz + * @author otrack + * + * Created by vagvaz on 7/5/14. + */ +public class Message implements Serializable { + + String from; //Who sends the message + String type; //Type of Message + Object body; //The body of message + + public static final Message EMPTYMSG = new Message("",""); + + public Message() { + } + + public Message(String from,String type){ + this.from = from; + this.type = type; + } + + /** + * Getter for property 'from'. + * + * @return Value for property 'from'. + */ + public String getFrom() { + return from; + } + + /** + * Setter for property 'from'. + * + * @param from Value to set for property 'from'. + */ + public void setFrom(String from) { + this.from = from; + } + + public Message(String from,String type,Object body){ + this.from = from; + + this.type =type; + this.body = body; + } + + /** + * Getter for property 'type'. + * + * @return Value for property 'type'. + */ + public String getType() { + return type; + } + + /** + * Setter for property 'type'. + * + * @param type Value to set for property 'type'. + */ + public void setType(String type) { + this.type = type; + } + + /** + * Getter for property 'body'. + * + * @return Value for property 'body'. + */ + public Object getBody() { + return body; + } + + /** + * Setter for property 'body'. + * + * @param body Value to set for property 'body'. + */ + public void setBody(Object body) { + this.body = body; + } +} diff --git a/topk-plugin/src/main/java/eu/leads/distsum/Node.java b/topk-plugin/src/main/java/eu/leads/distsum/Node.java new file mode 100644 index 00000000..75f0ad22 --- /dev/null +++ b/topk-plugin/src/main/java/eu/leads/distsum/Node.java @@ -0,0 +1,37 @@ +package eu.leads.distsum; + +/** + * + * @author vagvaz + * @author otrack + + * Created by vagvaz on 7/5/14. + */ + +import org.infinispan.notifications.Listener; +import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified; +import org.infinispan.notifications.cachelistener.event.CacheEntryEvent; + +@Listener(clustered = true, sync = false) +public abstract class Node { + + public static final String COORDINATOR = "COORDINATOR"; + public String id; + + public Node(String i, ComChannel channel){ + id = i; + channel.register(id,this); + } + + public abstract void receiveMessage(Message msg); + + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @CacheEntryModified + public void onCacheModification(CacheEntryEvent event){ + if (event.getKey().equals(id)) + this.receiveMessage((Message) event.getValue()); + } + + +} diff --git a/topk-plugin/src/main/java/eu/leads/distsum/Worker.java b/topk-plugin/src/main/java/eu/leads/distsum/Worker.java new file mode 100644 index 00000000..9da62b01 --- /dev/null +++ b/topk-plugin/src/main/java/eu/leads/distsum/Worker.java @@ -0,0 +1,109 @@ +package eu.leads.distsum; + +import java.util.*; + +/** + * + * @author vagvaz + * @author otrack + * + * Created by vagvaz on 7/5/14. + * The worker node tracks updates on a stream + * and maintains a local sum of the updates + * when an update violates the its constrain, + * the worker informs the coodinator. + */ +public class Worker extends Node { + + private String id; //node id + private HashMap localValues; + + private PriorityQueue> topk_adjusted, rest_adjusted; + + private HashMap constrain; //worker constrain + private ComChannel channel; //communication channel. + + public Worker(String ID, Map.Entry initialTopEntry, ComChannel com) { + super(ID,com); + this.id = ID; + + localValues = new HashMap(); + + constrain = new HashMap(); + this.channel = com; + + topk_adjusted = new PriorityQueue>(10, + new Comparator>() { + @Override + public int compare(Map.Entry left, Map.Entry right) { + return left.getValue().compareTo(right.getValue()); + } + } ); + + rest_adjusted = new PriorityQueue>(10, + new Comparator>() { + @Override + public int compare(Map.Entry left, Map.Entry right) { + return right.getValue().compareTo(left.getValue()); + } + } ); + + /*topk_adjusted.add(new AbstractMap.SimpleEntry(2, 2d)); + topk_adjusted.add(new AbstractMap.SimpleEntry(3, 3d)); + + rest_adjusted.add(new AbstractMap.SimpleEntry(2, 2d)); + rest_adjusted.add(new AbstractMap.SimpleEntry(3, 3d)); + + System.out.println(topk_adjusted.poll()); + System.out.println(rest_adjusted.poll());*/ + + } + + + public boolean update(Object objToUpd, double newvalue){ + + double localValue = localValues.get(objToUpd) + newvalue; + + if(constrain.get(objToUpd).violates(localValue)) + { + channel.sentTo(Node.COORDINATOR, new Message(id,"violation",new AbstractMap.SimpleEntry(objToUpd, localValue) )); + return true; + } + return false; + } + + + /** + * Getter for property 'id'. + * + * @return Value for property 'id'. + */ + public String getId() { + return id; + } + + /** + * Setter for property 'id'. + * + * @param id Value to set for property 'id'. + */ + public void setId(String id) { + this.id = id; + } + + @Override + public void receiveMessage(Message msg) { + + /*Message reply = new Message(id,"reply"); + + //If the message is a get local values then set the local sum as body to the reply + if(msg.getType().equals("get")){ + reply.setBody(localValue); + channel.sentTo(COORDINATOR,reply); + } + //if the message is a new constrain just update the local constrain. + else if (msg.getType().equals("constrain")){ + this.constrain = (Constrain) msg.getBody(); + }*/ + } +} diff --git a/topk-plugin/src/test/java/eu/leads/distsum/UnitTest.java b/topk-plugin/src/test/java/eu/leads/distsum/UnitTest.java new file mode 100644 index 00000000..6337d3c3 --- /dev/null +++ b/topk-plugin/src/test/java/eu/leads/distsum/UnitTest.java @@ -0,0 +1,128 @@ +package eu.leads.distsum; + +import org.infinispan.configuration.cache.ConfigurationBuilder; +import org.infinispan.configuration.global.GlobalConfigurationBuilder; +import org.infinispan.manager.EmbeddedCacheManager; +import org.infinispan.test.SingleCacheManagerTest; +import org.infinispan.test.fwk.TestCacheManagerFactory; +import org.testng.annotations.Test; + +import java.util.*; + +/** + * + * @author vagvaz + * @author otrack + * + * Created by vagvaz on 7/5/14. + * + * Scenario: + * + * There is a group of nodes that monitor some streams of updates. We want to compute the total sum of these updates. + * One node from the group acts as a coordinator and the rest of the nodes as workers. + * The coordinator maintains the global sum, on the other hand, the worker nodes listen to the streams + * of updates and maintain a local sum. Furthermore, the worker nodes have some constrains, in our case + * these constrains are two numbers an upper and a lower bound. In case of an update violates the constrains, + * then the worker informs the coordinator, which in turn asks all workers to + * send their local values in order to recompute the global sum and the new constrains for each node. + * After these recomputations, the coordinator sends the new constrains back to the workers. + * + * Scenario: + * We have one coordinator and 3 worker nodes. We have 4 rounds. In each round we update each worker once. Each worker's update will be chosen + * uniformly from the following array {1,1,1,1,2,2,-1,-1,-2}. At the end of each round we get the global sum perceived from the coordinator + * Mind that the global sum might not be the actual sum, but it must always be inside the following values 0.9*realSum<= globalSum <= 1.1realSum. + * + * Using KVS/Infinispan as a communication channel: + * + * I tried to keep the code simple in order to demonstrate the communication needs and not to perplex things, by doing it + * using infinispan listeners. Using infinispan we would have one cache that would generate the updates (updateCache). + * On that cache we would have installed the worker listeners. The worker listeners whenever they would like to communication + * with the coordinator, they would do a put operation on another cache (workerCache). The coordinator could be a clustered listener + * installed on the workerCache, as a result, it listens to all the updates. What I cannot think is how through listeners the communication + * from the coordinator to the workers can be achieved. I would not like to do updates to the updateCache. To sup up. + * We have a clustered listener, the coordinator, installed on workerCache, + * we have the worker listeners, worker nodes, installed locally to all the nodes containing keys of the updateCache + * whenever worker nodes need to communicate with the coordinator do a put operation to the workerCache + * whener the Coordinator wants to communicate with the workers ?? + * + * Using the distributed executor could be a solution. + * + * + */ +@Test +public class UnitTest extends SingleCacheManagerTest{ + + public static final double epsilon = 0.1; + public static final int k = 10; + + public void run() { + + //The communication channel between coordinator and the workers + ComChannel channel = new ComChannel(cacheManager.getCache()); + + Coordinator coord = new Coordinator(channel); + + //Create initial values and constrains for the workers + int numOfWorkers = 10; + ArrayList workers = new ArrayList(numOfWorkers); + Map workerValues = new HashMap(numOfWorkers); + Map workerConstrains = new HashMap(numOfWorkers); + + int initValue = 10; + Constrain initConstrain = new Constrain(9,11); + for ( int worker = 0; worker < numOfWorkers; worker++ ) { + //Create new worker with initial values + Worker w = new Worker(Integer.toString(worker), new AbstractMap.SimpleEntry(1, 10d), channel); + workers.add(w); + //put worker initial globalSum into the map + /*workerValues.put(w.getId(), w.getLocalValue()); + //put worker's constrain into the map + workerConstrains.put(w.getId(),w.getConstrain());*/ + //register worker to the channel + channel.register(w.getId(),w); + } + + //Initialize structures kept by coordinator + coord.setLocalValues(workerValues); + coord.setConstrains(workerConstrains); + + int[] updates = {1,1,1,1,2,2,-1,-1,-2}; + int numberOfRounds = 4; + Random rand = new Random(); + for ( int round = 0; round < numberOfRounds; round++ ) { + System.out.println("********* ROUND " + round +" ********"); + int realsum = 0; + for ( int worker = 0; worker < numOfWorkers; worker++ ) { + int update = updates[rand.nextInt(updates.length)]; + int objToUpd = (int) rand.nextGaussian() + 1000; + + System.out.println("obj: " + objToUpd); + + /*int oldValue = workers.get(worker).getLocalValue(); + workers.get(worker).update(update); + + System.out.println("worker " + worker + ": update=" +update+" oldval="+oldValue + " new newValue= " + workers.get(worker).getLocalValue()); + realsum += workers.get(worker).getLocalValue();*/ + } + + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); // TODO: Customise this generated block + } + System.out.println("Real sum: " + realsum + " Global Sum: " +coord.getGlobalSum()); + System.out.println("********* END OF ROUND " + round +" ********\n\n"); + assert(0.9*realsum <= coord.getGlobalSum()); + assert(1.1*realsum >= coord.getGlobalSum()); + } + + + } + + @Override + protected EmbeddedCacheManager createCacheManager() throws Exception { + GlobalConfigurationBuilder global = new GlobalConfigurationBuilder(); + ConfigurationBuilder config = TestCacheManagerFactory.getDefaultCacheConfiguration(false); + return TestCacheManagerFactory.createCacheManager(global, config); + } +} From c487c801a7b39a922d2f6432644e54ac3a5ad186 Mon Sep 17 00:00:00 2001 From: Ioakeim Perros Date: Wed, 27 Aug 2014 17:45:54 -0400 Subject: [PATCH 2/2] Top-k monitoring implementation on top of the Infinispan communication layer --- topk-plugin/pom.xml | 6 + .../java/eu/leads/distsum/ComChannel.java | 70 +-- .../main/java/eu/leads/distsum/Constrain.java | 26 -- .../java/eu/leads/distsum/Coordinator.java | 428 +++++++++++++----- .../main/java/eu/leads/distsum/Message.java | 27 +- .../src/main/java/eu/leads/distsum/Node.java | 15 +- .../eu/leads/distsum/Utils/LocalView.java | 33 ++ .../java/eu/leads/distsum/Utils/Set_Map.java | 34 ++ .../leads/distsum/Utils/ViolationObject.java | 70 +++ .../main/java/eu/leads/distsum/Worker.java | 296 ++++++++++-- .../test/java/eu/leads/distsum/UnitTest.java | 70 ++- 11 files changed, 823 insertions(+), 252 deletions(-) delete mode 100644 topk-plugin/src/main/java/eu/leads/distsum/Constrain.java create mode 100644 topk-plugin/src/main/java/eu/leads/distsum/Utils/LocalView.java create mode 100644 topk-plugin/src/main/java/eu/leads/distsum/Utils/Set_Map.java create mode 100644 topk-plugin/src/main/java/eu/leads/distsum/Utils/ViolationObject.java diff --git a/topk-plugin/pom.xml b/topk-plugin/pom.xml index 0f82dd9f..12b5f77b 100644 --- a/topk-plugin/pom.xml +++ b/topk-plugin/pom.xml @@ -72,6 +72,12 @@ ${infinispan.version} test + + + com.google.guava + guava + 12.0 + diff --git a/topk-plugin/src/main/java/eu/leads/distsum/ComChannel.java b/topk-plugin/src/main/java/eu/leads/distsum/ComChannel.java index 1dcc1197..f261cadb 100644 --- a/topk-plugin/src/main/java/eu/leads/distsum/ComChannel.java +++ b/topk-plugin/src/main/java/eu/leads/distsum/ComChannel.java @@ -14,37 +14,45 @@ * Communication channel just just a map of String,Node to send messages */ public class ComChannel { - - private Cache nodes; - - public ComChannel(Cache c) { - nodes = c; - } - - // Add a node to the map - public void register(final String id, Node node){ - KeyValueFilter filter= new KeyValueFilter(){ - public boolean accept(String i, Message msg, Metadata metadata){ - return id.equals(i); - } - }; - nodes.put(id, Message.EMPTYMSG); // to create the entry - nodes.addListener(node, filter, null); - } - - //Send messsage to node id - public void sentTo(String id, Message message){ - nodes.put(id, message); - } - - // Broadcast a message to all nodes, but coordinator - // The coordinator takes as a result the replies of the nodes - public void broadCast(Message message){ - for(String node: nodes.keySet()){ - if(!node.equals(Node.COORDINATOR)){ - nodes.put(node,message); - } + + private Cache nodes; + + public ComChannel(Cache c) { + nodes = c; + } + + // Add a node to the map + public void register(final String id, Node node){ + KeyValueFilter filter= new KeyValueFilter(){ + public boolean accept(String i, Message msg, Metadata metadata){ + return id.equals(i); + } + }; + nodes.put(id, Message.EMPTYMSG); // to create the entry + nodes.addListener(node, filter, null); } - } + + //Send messsage to node id + public void sentTo(String id, Message message){ + nodes.put(id, message); + } + + // Broadcast a message to all nodes, but coordinator + // The coordinator takes as a result the replies of the nodes + /*public void broadCast(Message message){ + for(String node: nodes.keySet()){ + if(!node.equals(Node.COORDINATOR)){ + nodes.put(node,message); + } + } + }*/ + + /*public void broadCastToAllButOne(Message message, String node1){ + for(String node: nodes.keySet()){ + if( (!node.equals(Node.COORDINATOR)) && (!node.equals(node1)) ){ + nodes.put(node,message); + } + } + }*/ } diff --git a/topk-plugin/src/main/java/eu/leads/distsum/Constrain.java b/topk-plugin/src/main/java/eu/leads/distsum/Constrain.java deleted file mode 100644 index 9d14d7af..00000000 --- a/topk-plugin/src/main/java/eu/leads/distsum/Constrain.java +++ /dev/null @@ -1,26 +0,0 @@ -package eu.leads.distsum; - -/** - * @author vagvaz - * @author otrack - * - * Created by vagvaz on 7/5/14. - * A simple constrain class with upper and lower bound - */ -public class Constrain { - private double lowBound; - private double upperBound; - public Constrain(double low, double high) { - lowBound = low; - upperBound = high; - } - - public boolean violates(double value){ - if(value < lowBound - || value > upperBound){ - return true; - } - return false; - } - -} diff --git a/topk-plugin/src/main/java/eu/leads/distsum/Coordinator.java b/topk-plugin/src/main/java/eu/leads/distsum/Coordinator.java index 25969fc6..19c3ecc8 100644 --- a/topk-plugin/src/main/java/eu/leads/distsum/Coordinator.java +++ b/topk-plugin/src/main/java/eu/leads/distsum/Coordinator.java @@ -1,135 +1,357 @@ package eu.leads.distsum; -import java.util.HashMap; -import java.util.Map; - -/** - * - * @author vagvaz - * @author otrack - * - * Created by vagvaz on 7/5/14. - * The coordinator that maintains the global sum. - * And computes the localConstrains given to the worker nodes - * - */ -public class Coordinator extends Node{ - - Map localValues; //the local values of all worker nodes - Map constrains; //the local constrains of all worker nodes - int globalSum; //global sum - ComChannel channel; //communication medium +import com.google.common.collect.Ordering; +import eu.leads.distsum.Utils.LocalView; +import eu.leads.distsum.Utils.Set_Map; +import eu.leads.distsum.Utils.ViolationObject; + +import java.util.*; +public class Coordinator extends Node { + + public static final double epsilon = 0.1, EPS = 2.2204460492503131e-06, coordLeeway = 0.5; + public static final int k = 2; + + int workers_num; + ArrayList worker_ids; + + Map> all_values; //some of the local values of all nodes necessary during reallocation + Map border_values; //border values necessary during reallocation + Map> all_deltas; //the local all_deltas of all worker nodes + // each node is represented by its key (String) + + Set topk_set; + + boolean phase3InProgress, initPhase3; + // long epoch_altering_topk; + + Map lastTsReceived; public Coordinator(ComChannel com) { - super(Node.COORDINATOR,com); - localValues = new HashMap(); - constrains = new HashMap(); - this.channel = com; - } + super(Node.COORDINATOR, com); + all_values = new HashMap>(); + all_deltas = new HashMap>(); + border_values = new HashMap(); - /** - * Getter for property 'localValues'. - * - * @return Value for property 'localValues'. - */ - public Map getLocalValues() { - return localValues; + worker_ids = new ArrayList(); + workers_num = 0; + + topk_set = new HashSet(); + phase3InProgress = false; + + // epoch_altering_topk = 0; + + lastTsReceived = new HashMap(); } - /** - * Setter for property 'localValues'. - * - * @param localValues Value to set for property 'localValues'. - */ - public void setLocalValues(Map localValues) { - this.localValues = localValues; - recomputeValue(); + private synchronized boolean updateAndCheckTs(Object sender, long ts) { + + if (lastTsReceived.containsKey(sender)) { + + if (ts > lastTsReceived.get(sender)) + lastTsReceived.put(sender, ts); + else + return true; + } else + lastTsReceived.put(sender, ts); + return false; } - /** - * Getter for property 'constrains'. - * - * @return Value for property 'constrains'. - */ - public Map getConstrains() { - return constrains; + //Receive message from worker. + @Override + public synchronized void receiveMessage(Message msg) { + + String sender = msg.getFrom(); + + if (updateAndCheckTs(sender, msg.getTs())) + return; + + // this msg is received when a new worker joins + // the computation: I have to send him the top-k set + if (msg.getType().equals("join")) { + + worker_ids.add(sender); //add worker to my list of ids + workers_num++; + + // send the current top-k set to the new worker + send(sender, new Message(Node.COORDINATOR, "joinAck", topk_set)); + // System.out.println("Coordinator received join request from node " + sender); + }else if (msg.getType().equals("violation")) { + + assert (worker_ids.contains(sender)); + ViolationObject vo = (ViolationObject) msg.getBody(); + + // ignore them if a resolution in phase 3 has initiated + if (/*epoch_altering_topk > vo.getEpoch() || */phase3InProgress) { + // System.out.println("Coordinator aborted violation msg from " + sender + " and phase3 is in progress?" + phase3InProgress); + return; + } + + if (vo.isEmptyTopk()) + initPhase3 = true; + + // System.out.println("Coordinator will process violation msg from " + sender); + + // jump to 3rd phase requesting values according to the sender's resolution set + if (topk_set.isEmpty()) { + requestPhase3info(sender, new HashSet(vo.getPartialDataValues().keySet())); + processPhase3msg(sender, vo.getPartialDataValues(), vo.getBorderValue()); + + } else { + + // resolution: 2nd phase + if (checkIfPhase3Required(sender, vo)){ + requestPhase3info(sender, new HashSet(vo.getPartialDataValues().keySet())); + processPhase3msg(sender, vo.getPartialDataValues(), vo.getBorderValue()); + } + else{ + + // attempt to adjust factors through phase 2 + phase2attempt(sender, vo); + + if (!assertInvariants()){ + requestPhase3info(sender, new HashSet(vo.getPartialDataValues().keySet())); + processPhase3msg(sender, vo.getPartialDataValues(), vo.getBorderValue()); + } + else{ + + // send msg to sender with new factors + send(sender, new Message(Node.COORDINATOR, "phase2", all_deltas.get(sender))); + } + + } + } + + }else if (msg.getType().equals("replyPhase3")) { + + // System.out.println("Coordinator received partial data values requested for phase 3 from " + sender); + + LocalView lv = (LocalView) msg.getBody(); + processPhase3msg(sender, lv.getPartialValues(), lv.getBorder()); + }else { + throw new RuntimeException("Invalid message"); + } } - /** - * Setter for property 'constrains'. - * - * @param constrains Value to set for property 'constrains'. - */ - public void setConstrains(Map constrains) { - this.constrains = constrains; + private synchronized void phase2attempt(Object sender, ViolationObject vo){ + + Map max_distance_per_topk_item = new HashMap(); + for (Object o: vo.getViolatedTopk()) + max_distance_per_topk_item.put( o, Double.MIN_VALUE); + + double deltatf, deltarf, distance; + for (Object t: vo.getViolatedTopk()){ + + deltatf = all_deltas.get(sender).containsKey(t) ? all_deltas.get(sender).get(t) : 0; + for (Object r: vo.getViolatedRest()){ + + deltarf = all_deltas.get(sender).containsKey(r) ? all_deltas.get(sender).get(r) : 0; + distance = vo.getPartialDataValues().get(r) + deltarf - vo.getPartialDataValues().get(t) - deltatf; + + if (distance > max_distance_per_topk_item.get(t)) + max_distance_per_topk_item.put(t, distance); + + } + } + + double d0, df; + for ( Map.Entry entry: max_distance_per_topk_item.entrySet() ){ + + d0 = all_deltas.get(Node.COORDINATOR).containsKey(entry.getKey()) ? all_deltas.get(Node.COORDINATOR).get(entry.getKey()) - entry.getValue() : - entry.getValue(); + df = all_deltas.get(sender).containsKey(entry.getKey()) ? all_deltas.get(sender).get(entry.getKey()) + entry.getValue() : entry.getValue(); + + all_deltas.get(Node.COORDINATOR).put(entry.getKey(), d0); + all_deltas.get(sender).put(entry.getKey(), df); + } + } - /** - * Getter for property 'globalSum'. - * - * @return Value for property 'globalSum'. - */ - public int getGlobalSum() { - return globalSum; + private synchronized boolean checkIfPhase3Required(Object sender, ViolationObject vo){ + double deltat0, deltatf, deltar0, deltarf; + for (Object t: vo.getViolatedTopk()){ + + deltat0 = all_deltas.get(Node.COORDINATOR).containsKey(t) ? all_deltas.get(Node.COORDINATOR).get(t) : 0; + deltatf = all_deltas.get(sender).containsKey(t) ? all_deltas.get(sender).get(t) : 0; + for (Object r: vo.getViolatedRest()){ + + deltar0 = all_deltas.get(Node.COORDINATOR).containsKey(r) ? all_deltas.get(Node.COORDINATOR).get(r) : 0; + deltarf = all_deltas.get(sender).containsKey(r) ? all_deltas.get(sender).get(r) : 0; + if ( vo.getPartialDataValues().get(t) + deltat0 + deltatf < vo.getPartialDataValues().get(r) + deltar0 + deltarf ) + return true; + + } + } + return false; + } /** - * Setter for property 'globalSum'. + * This function serves as a coordinator's request from all nodes + * but the sender of the necessary info for the 3rd phase of resolution * - * @param globalSum Value to set for property 'globalSum'. + * @param sender sender to be excluded from the broadcasting msg + * @param resolutionSet the set of keys for which the coordinator requests the data values */ - public void setGlobalSum(int globalSum) { - this.globalSum = globalSum; - } + private synchronized void requestPhase3info(Object sender, HashSet resolutionSet) { - //Receive message from worker. This is called only when there is a violation on a worker node - @Override - public void receiveMessage(Message msg) { - if (msg.getType().equals("reply")) { - localValues.put(msg.getFrom(), (Integer) msg.getBody()); - //Compute new global sum - recomputeValue(); - //Compute constrains - computeConstrains(); - //Send Constrains to workers - sendConstrains(); - } else if (msg.getType().equals("violation")) { - channel.broadCast(new Message(Node.COORDINATOR, "get", null)); - } else { - throw new RuntimeException("Invalid message"); + phase3InProgress = true; + + for (Object o : worker_ids) { + if (!o.equals(sender)) { + send((String) o, new Message(Node.COORDINATOR, "reqPhase3", resolutionSet)); + // System.out.println("Coordinator requested local view from node " + o); + } } + } - //recompute global sum - private void recomputeValue() { - this.globalSum = 0; - //Iterate over local values and sum to compute the global sum globalSum - for(Map.Entry entry : localValues.entrySet()){ - this.globalSum += entry.getValue(); + private synchronized void processPhase3msg(Object sender, HashMap partialDataValues, double border) { + assert (phase3InProgress); + + all_values.put(sender, partialDataValues); + border_values.put(sender, border); + + // check if I have received the requested info from all nodes + // if so, I ll have to proceed with the resolution phase + if (all_values.size() == worker_ids.size()) { + + Map global_values = new HashMap(); + for (Map.Entry> outerEntry : all_values.entrySet()) { + for (Map.Entry innerEntry : outerEntry.getValue().entrySet()) { + if (global_values.containsKey(innerEntry.getKey())) + global_values.put(innerEntry.getKey(), global_values.get(innerEntry.getKey()) + innerEntry.getValue()); + else + global_values.put(innerEntry.getKey(), innerEntry.getValue()); + } + } + // compute new top-k set + compute_new_topk(global_values); + + // reallocate new adjustment factors + allocate_new_factors(global_values); + + // assert that invariants hold ! + assert(assertInvariants()); + + // send msgs containing top-k set & factors to each corresponding node + for (Object o : worker_ids) { + Set_Map sm = new Set_Map(all_deltas.get(o), topk_set); + send((String) o, new Message(Node.COORDINATOR, "newSetAndFactors", sm)); + } + + // clear all temporary structures & set the variable blocking violations to false + all_values.clear(); + border_values.clear(); + initPhase3 = false; + phase3InProgress = false; } } - //compute constrains - private void computeConstrains() { - //Compute the drift each constrain will be equal to - // localValue - drift...localValue + drift - int drift = (int) Math.ceil(0.1*this.globalSum)/localValues.size(); - for(Map.Entry entry : localValues.entrySet()){ - - //get current local globalSum; - int localValue = entry.getValue(); - //put new constrain to map - constrains.put(entry.getKey(),new Constrain(localValue-drift,localValue+drift)); + private synchronized void compute_new_topk(Map global_values){ + + List> dupList = new ArrayList>(global_values.entrySet()); + Ordering> byMapValues = new Ordering>() { + @Override + public int compare(Map.Entry left, Map.Entry right) { + return right.getValue().compareTo(left.getValue()); + } + }; + Collections.sort(dupList, byMapValues); + + topk_set.clear(); + int i = 0; + for (Map.Entry entry : dupList) { + if (i < k) + topk_set.add(entry.getKey()); + else + break; + i++; } } - //Send constrains back to workers - private void sendConstrains() { - //Send to each worker node the new constrain - for ( Map.Entry entry : constrains.entrySet() ) { - channel.sentTo(entry.getKey(), new Message(Node.COORDINATOR, "constrain", entry.getValue())); + private synchronized void allocate_new_factors(Map global_values){ + + all_deltas.clear(); + for (Object o: worker_ids) + all_deltas.put(o, new HashMap()); + all_deltas.put(Node.COORDINATOR, new HashMap()); + + if (!initPhase3){ + + // 1) calculate sum of border values + double global_border = 0; + for (Map.Entry entry : border_values.entrySet()) + global_border += entry.getValue(); + + // 2) calculate "leeway" for each object + Map obj_leeways = new HashMap(); + double tmp; + for (Map.Entry entry : global_values.entrySet()) { + + tmp = entry.getValue() - global_border; + if (topk_set.contains(entry.getKey())) + tmp += epsilon; + + obj_leeways.put(entry.getKey(), tmp); + } + + //3) distribute "leeway" among participating nodes + // follow strategy of providing the coordinator with half of the total leeway + // and the rest is evenly divided to every other node, so: + double workerLeeway = (1 - coordLeeway) / (double) worker_ids.size(); + + for (Map.Entry entry: obj_leeways.entrySet()){ + + int j = 0; + for (Object o: worker_ids){ + + if (all_values.get(o).containsKey(entry.getKey())) + all_deltas.get(o).put(entry.getKey(), border_values.get(o) - all_values.get(o).get(entry.getKey()) + workerLeeway*entry.getValue() ) ; + else + all_deltas.get(o).put(entry.getKey(), border_values.get(o) + workerLeeway*entry.getValue() ) ; + + } + } + + // computation of coordinator's deltas + double tmp2; + for (Map.Entry entry: obj_leeways.entrySet()) { + + tmp2 = coordLeeway*entry.getValue(); + if ( topk_set.contains(entry.getKey()) ) + tmp2 -= epsilon; + all_deltas.get(Node.COORDINATOR).put(entry.getKey(), tmp2); + } + } + } + private synchronized boolean assertInvariants(){ + + // invariant 1 + double sum; + for (Map.Entry entry: all_deltas.get(Node.COORDINATOR).entrySet()){ + + sum = entry.getValue(); + + for (Object o : worker_ids) + sum += all_deltas.get(o).get(entry.getKey()); + if (sum > EPS) + return false; + + } + + // invariant 2 + for (Map.Entry entry: all_deltas.get(Node.COORDINATOR).entrySet()){ + + if ( !topk_set.contains(entry.getKey()) ){ + for (Object o: topk_set){ + if ( all_deltas.get(Node.COORDINATOR).get(o) + epsilon < entry.getValue() ) + return false; + } + } + } + return true; + } -} +} \ No newline at end of file diff --git a/topk-plugin/src/main/java/eu/leads/distsum/Message.java b/topk-plugin/src/main/java/eu/leads/distsum/Message.java index f6dfcfff..6363dc45 100644 --- a/topk-plugin/src/main/java/eu/leads/distsum/Message.java +++ b/topk-plugin/src/main/java/eu/leads/distsum/Message.java @@ -15,14 +15,16 @@ public class Message implements Serializable { String type; //Type of Message Object body; //The body of message - public static final Message EMPTYMSG = new Message("",""); + long ts; - public Message() { - } + public static final Message EMPTYMSG = new Message("","",-11); - public Message(String from,String type){ + public Message(String from,String type,Object body){ this.from = from; - this.type = type; + this.type =type; + this.body = body; + + ts = 0; } /** @@ -43,13 +45,6 @@ public void setFrom(String from) { this.from = from; } - public Message(String from,String type,Object body){ - this.from = from; - - this.type =type; - this.body = body; - } - /** * Getter for property 'type'. * @@ -85,4 +80,12 @@ public Object getBody() { public void setBody(Object body) { this.body = body; } + + public long getTs() { + return ts; + } + + public void setTs(long ts) { + this.ts = ts; + } } diff --git a/topk-plugin/src/main/java/eu/leads/distsum/Node.java b/topk-plugin/src/main/java/eu/leads/distsum/Node.java index 75f0ad22..737e9f0c 100644 --- a/topk-plugin/src/main/java/eu/leads/distsum/Node.java +++ b/topk-plugin/src/main/java/eu/leads/distsum/Node.java @@ -18,9 +18,15 @@ public abstract class Node { public static final String COORDINATOR = "COORDINATOR"; public String id; + public long ts; + ComChannel channel; //communication medium + public Node(String i, ComChannel channel){ id = i; channel.register(id,this); + + ts = 0; + this.channel = channel; } public abstract void receiveMessage(Message msg); @@ -29,8 +35,15 @@ public Node(String i, ComChannel channel){ @SuppressWarnings({ "rawtypes", "unchecked" }) @CacheEntryModified public void onCacheModification(CacheEntryEvent event){ - if (event.getKey().equals(id)) + if (event.getKey().equals(id)) { this.receiveMessage((Message) event.getValue()); + } + } + + public synchronized void send(String id, Message message){ + ts++; + message.setTs(ts); + channel.sentTo(id, message); } diff --git a/topk-plugin/src/main/java/eu/leads/distsum/Utils/LocalView.java b/topk-plugin/src/main/java/eu/leads/distsum/Utils/LocalView.java new file mode 100644 index 00000000..9e39b610 --- /dev/null +++ b/topk-plugin/src/main/java/eu/leads/distsum/Utils/LocalView.java @@ -0,0 +1,33 @@ +package eu.leads.distsum.Utils; + +import java.util.HashMap; + +/** + * Created by dell on 8/25/14. + */ +public class LocalView { + + HashMap partialValues; + double border; + + public LocalView(HashMap partialValues, double border) { + this.border = border; + this.partialValues = partialValues; + } + + public HashMap getPartialValues() { + return partialValues; + } + + public void setPartialValues(HashMap partialValues) { + this.partialValues = partialValues; + } + + public double getBorder() { + return border; + } + + public void setBorder(double border) { + this.border = border; + } +} diff --git a/topk-plugin/src/main/java/eu/leads/distsum/Utils/Set_Map.java b/topk-plugin/src/main/java/eu/leads/distsum/Utils/Set_Map.java new file mode 100644 index 00000000..4cabaaf1 --- /dev/null +++ b/topk-plugin/src/main/java/eu/leads/distsum/Utils/Set_Map.java @@ -0,0 +1,34 @@ +package eu.leads.distsum.Utils; + +import java.util.Map; +import java.util.Set; + +/** + * Created by dell on 8/26/14. + */ +public class Set_Map { + + private Set topkset; + private Map deltas; + + public Set_Map(Map deltas, Set topkset) { + this.deltas = deltas; + this.topkset = topkset; + } + + public Set getTopkset() { + return topkset; + } + + public void setTopkset(Set topkset) { + this.topkset = topkset; + } + + public Map getDeltas() { + return deltas; + } + + public void setDeltas(Map deltas) { + this.deltas = deltas; + } +} diff --git a/topk-plugin/src/main/java/eu/leads/distsum/Utils/ViolationObject.java b/topk-plugin/src/main/java/eu/leads/distsum/Utils/ViolationObject.java new file mode 100644 index 00000000..7ecd9645 --- /dev/null +++ b/topk-plugin/src/main/java/eu/leads/distsum/Utils/ViolationObject.java @@ -0,0 +1,70 @@ +package eu.leads.distsum.Utils; + +import java.util.HashMap; +import java.util.Set; + +public class ViolationObject { + + private HashMap partialDataValues; + private double borderValue; + Set violatedTopk, violatedRest; + boolean emptyTopk; + //long epoch; + + public ViolationObject(HashMap partialDataValues, Set violatedTopk, Set violatedRest, double borderValue, boolean emptyTopk) { + this.borderValue = borderValue; + this.partialDataValues = partialDataValues; + this.violatedTopk = violatedTopk; + this.violatedRest = violatedRest; + this.emptyTopk = emptyTopk; + // this.epoch = epoch; + } + + public HashMap getPartialDataValues() { + return partialDataValues; + } + + public void setPartialDataValues(HashMap partialDataValues) { + this.partialDataValues = partialDataValues; + } + + public double getBorderValue() { + return borderValue; + } + + public void setBorderValue(double borderValue) { + this.borderValue = borderValue; + } + + public Set getViolatedTopk() { + return violatedTopk; + } + + public void setViolatedTopk(Set violatedTopk) { + this.violatedTopk = violatedTopk; + } + + public Set getViolatedRest() { + return violatedRest; + } + + public void setViolatedRest(Set violatedRest) { + this.violatedRest = violatedRest; + } + + public boolean isEmptyTopk() { + return emptyTopk; + } + + public void setEmptyTopk(boolean emptyTopk) { + this.emptyTopk = emptyTopk; + } + + /*public long getEpoch() { + return epoch; + } + + public void setEpoch(long epoch) { + this.epoch = epoch; + }*/ +} diff --git a/topk-plugin/src/main/java/eu/leads/distsum/Worker.java b/topk-plugin/src/main/java/eu/leads/distsum/Worker.java index 9da62b01..292bc8ba 100644 --- a/topk-plugin/src/main/java/eu/leads/distsum/Worker.java +++ b/topk-plugin/src/main/java/eu/leads/distsum/Worker.java @@ -1,5 +1,9 @@ package eu.leads.distsum; +import eu.leads.distsum.Utils.LocalView; +import eu.leads.distsum.Utils.Set_Map; +import eu.leads.distsum.Utils.ViolationObject; + import java.util.*; /** @@ -10,27 +14,29 @@ * Created by vagvaz on 7/5/14. * The worker node tracks updates on a stream * and maintains a local sum of the updates - * when an update violates the its constrain, + * when an update violates the its localDeltas, * the worker informs the coodinator. */ public class Worker extends Node { private String id; //node id private HashMap localValues; - + private HashMap localDeltas; //worker localDeltas private PriorityQueue> topk_adjusted, rest_adjusted; + Set topk_set; + + long lastTsReceived; - private HashMap constrain; //worker constrain - private ComChannel channel; //communication channel. + public Worker(String ID, ComChannel com/*, Object firstObject, double firstValue*/) { - public Worker(String ID, Map.Entry initialTopEntry, ComChannel com) { super(ID,com); this.id = ID; + topk_set = new HashSet(); + localValues = new HashMap(); - constrain = new HashMap(); - this.channel = com; + localDeltas = new HashMap(); topk_adjusted = new PriorityQueue>(10, new Comparator>() { @@ -48,37 +54,269 @@ public int compare(Map.Entry left, Map.Entry rig } } ); - /*topk_adjusted.add(new AbstractMap.SimpleEntry(2, 2d)); - topk_adjusted.add(new AbstractMap.SimpleEntry(3, 3d)); + send(Node.COORDINATOR, new Message(id,"join",null)); + // System.out.println("Worker "+id+" sent join request"); + + lastTsReceived = -1; + } - rest_adjusted.add(new AbstractMap.SimpleEntry(2, 2d)); - rest_adjusted.add(new AbstractMap.SimpleEntry(3, 3d)); + private synchronized boolean updateAndCheckTs(long ts){ - System.out.println(topk_adjusted.poll()); - System.out.println(rest_adjusted.poll());*/ + if (lastTsReceived!=-1){ + if (ts > lastTsReceived) + lastTsReceived = ts; + else + return true; + } + else + lastTsReceived = ts; + return false; } + @Override + public synchronized void receiveMessage(Message msg) { - public boolean update(Object objToUpd, double newvalue){ + if (msg.getType().equals("")) + return; - double localValue = localValues.get(objToUpd) + newvalue; + if ( updateAndCheckTs(msg.getTs()) ) + return; - if(constrain.get(objToUpd).violates(localValue)) - { - channel.sentTo(Node.COORDINATOR, new Message(id,"violation",new AbstractMap.SimpleEntry(objToUpd, localValue) )); - return true; + if (msg.getType().equals("joinAck")){ + + // System.out.println("Worker "+id+": Received join ACK from coordinator"); + + topk_set.clear(); + for (Object o: (HashSet) msg.getBody()){ + topk_set.add(o); + } + + if (!topk_set.isEmpty()){ + keepFactorsValuesConsistent(); + repartitionHeaps(); + if (constraintViolationCheck()) + triggerViolation(); + } + + } + else if(msg.getType().equals("newSetAndFactors")){ + + Set_Map sm = (Set_Map) msg.getBody(); + localDeltas.clear(); + localDeltas.putAll(sm.getDeltas()); + topk_set.clear(); + topk_set.addAll(sm.getTopkset()); + + keepFactorsValuesConsistent(); + repartitionHeaps(); + if (constraintViolationCheck()) + triggerViolation(); + } + else if(msg.getType().equals("reqPhase3")){ + + HashSet resolutionSet = (HashSet) (msg.getBody()); + LocalView lv = new LocalView(new HashMap(), 0); + + // encapsulate the request local view's values to the msg for the coordinator + for (Object o: resolutionSet){ + if ( localValues.containsKey(o) ) + lv.getPartialValues().put(o, localValues.get(o)); + } + + if (!localValues.isEmpty()) { + + PriorityQueue> rest_adjusted1 = + new PriorityQueue>(rest_adjusted); + //TODO: NULL POINTER HERE + while ( (!rest_adjusted1.isEmpty()) && topk_adjusted.peek().getValue() < rest_adjusted1.peek().getValue()){ + rest_adjusted1.poll(); + } + + if (topk_set.isEmpty()){ + assert(rest_adjusted.isEmpty() && topk_adjusted.isEmpty()); + lv.setBorder(0); + } + else{ + if (!rest_adjusted1.isEmpty()) + lv.setBorder( Math.min(topk_adjusted.peek().getValue(), rest_adjusted1.peek().getValue()) ); + else + lv.setBorder( topk_adjusted.peek().getValue() ); + } + } + + send(Node.COORDINATOR, new Message(id,"replyPhase3", lv)); + // System.out.println("Worker "+id+": sent local view for phase 3"); + + } + else if (msg.getType().equals("phase2")){ + + HashMap h = (HashMap) msg.getBody(); + localDeltas.clear(); + localDeltas.putAll(h); + + keepFactorsValuesConsistent(); + repartitionHeaps(); + if (constraintViolationCheck()) + triggerViolation(); + } + + } + + private synchronized void keepFactorsValuesConsistent(){ + assert(!topk_set.isEmpty()); + + for (Object o: topk_set){ + if (!localValues.containsKey(o)) + localValues.put(o, 0d); + } + + for (Map.Entry entry:localDeltas.entrySet()){ + if (!localValues.containsKey(entry.getKey())) + localValues.put(entry.getKey(), 0d); + } + } + + /** + * Re-partition adjusted heaps according to the received top-k set + */ + private synchronized void repartitionHeaps(){ + topk_adjusted.clear(); + rest_adjusted.clear(); + + for (Map.Entry entry: localValues.entrySet()) { + if (topk_set.contains(entry.getKey())) + insertAfterAddingDelta(entry.getKey(), topk_adjusted); + else + insertAfterAddingDelta(entry.getKey(), rest_adjusted); } + } + + private synchronized void insertAfterAddingDelta(Object objUpdated, PriorityQueue> prQ){ + if (localDeltas.containsKey(objUpdated)) + prQ.add( new AbstractMap.SimpleEntry(objUpdated, localValues.get(objUpdated) + localDeltas.get(objUpdated) ) ); + else + prQ.add( new AbstractMap.SimpleEntry(objUpdated, localValues.get(objUpdated) ) ); + } + + private synchronized boolean constraintViolationCheck(){ + assert(!topk_adjusted.isEmpty()); + if ( (!rest_adjusted.isEmpty()) && (topk_adjusted.peek().getValue() < rest_adjusted.peek().getValue()) ) + return true; return false; } + /** + * Function running for each new update + * After updating local view, + * it checks if this node has received a topk set - if not it initiates violation + * in order to have the coordinator initialize one. + * + * If a topk set has already been received, + * the node updates its heaps and checks for constraint violation. + * @param objToUpd + * @param drift + * @return + */ + public synchronized void update(Object objToUpd, double drift){ + + double oldLocalValue = 0; + if (localValues.containsKey(objToUpd)) + oldLocalValue += localValues.get(objToUpd); + localValues.put(objToUpd, oldLocalValue + drift); + + if (topk_set.isEmpty()) + triggerViolation(); + else{ + heapsAdjAfterSingleValueUpdate(objToUpd, oldLocalValue); + if (constraintViolationCheck()) + triggerViolation(); + } + } + + private synchronized void triggerViolation(){ + + ViolationObject vo = new ViolationObject(new HashMap(), new HashSet(), + new HashSet(), 0, topk_set.isEmpty() ); + + if (topk_set.isEmpty()){ + + vo.getPartialDataValues().putAll(localValues); + + ArrayList tempVals = new ArrayList(); + for (Map.Entry entry: localValues.entrySet()) { + vo.getViolatedRest().add(entry.getKey()); + tempVals.add(entry.getValue()); + } + vo.setBorderValue(/*Collections.max(tempVals)*/0); + } + else{ + + // send set of objects involved in violated constraints + PriorityQueue> topk_adjusted1, rest_adjusted1; + topk_adjusted1 = new PriorityQueue>(topk_adjusted); + rest_adjusted1 = new PriorityQueue>(rest_adjusted); + + assert( (!rest_adjusted1.isEmpty()) && (!topk_adjusted1.isEmpty()) ); + + double max_of_rest_adjusted = rest_adjusted1.peek().getValue(); + while ( (!rest_adjusted1.isEmpty()) && topk_adjusted1.peek().getValue() < rest_adjusted1.peek().getValue()){ + vo.getViolatedRest().add(rest_adjusted1.poll().getKey()); + } + while ( (!topk_adjusted1.isEmpty()) && topk_adjusted1.peek().getValue() < max_of_rest_adjusted){ + vo.getViolatedTopk().add(topk_adjusted1.poll().getKey()); + } + + // & send all partial data values for objects in resolution set : T union F + for (Object o: topk_set){ + if (localValues.containsKey(o)) + vo.getPartialDataValues().put(o, localValues.get(o)); + } + for (Object o: vo.getViolatedTopk()){ + if (localValues.containsKey(o)) + vo.getPartialDataValues().put(o, localValues.get(o)); + } + for (Object o: vo.getViolatedRest()){ + if (localValues.containsKey(o)) + vo.getPartialDataValues().put(o, localValues.get(o)); + } + + // & send border value + if (!rest_adjusted1.isEmpty()) + vo.setBorderValue(Math.min(topk_adjusted.peek().getValue(), rest_adjusted1.peek().getValue())); + else + vo.setBorderValue( topk_adjusted.peek().getValue() ); + + } + + send(Node.COORDINATOR, new Message(id,"violation",vo)); + // System.out.println("Worker "+id+": Sent violation msg to coordinator and topk_set is empty?"+topk_set.isEmpty()); + } + + private synchronized void heapsAdjAfterSingleValueUpdate(Object objUpdated, double oldLocalValue){ + + assert(localValues.containsKey(objUpdated)); + + double adjustedValue = oldLocalValue; + if (localDeltas.containsKey(objUpdated)) + adjustedValue += localDeltas.get(objUpdated); + Map.Entry entry = new AbstractMap.SimpleEntry(objUpdated, adjustedValue); + + //first search in the heap containing adjusted top-k set elements + if (topk_adjusted.remove(entry)){ + insertAfterAddingDelta(objUpdated, topk_adjusted); + return; + } + rest_adjusted.remove(entry); //dont have to test if exists, in any case it will be placed in rest_adjusted + insertAfterAddingDelta(objUpdated, rest_adjusted); + } /** * Getter for property 'id'. * * @return Value for property 'id'. */ - public String getId() { + public synchronized String getId() { return id; } @@ -87,23 +325,7 @@ public String getId() { * * @param id Value to set for property 'id'. */ - public void setId(String id) { + public synchronized void setId(String id) { this.id = id; } - - @Override - public void receiveMessage(Message msg) { - - /*Message reply = new Message(id,"reply"); - - //If the message is a get local values then set the local sum as body to the reply - if(msg.getType().equals("get")){ - reply.setBody(localValue); - channel.sentTo(COORDINATOR,reply); - } - //if the message is a new constrain just update the local constrain. - else if (msg.getType().equals("constrain")){ - this.constrain = (Constrain) msg.getBody(); - }*/ - } } diff --git a/topk-plugin/src/test/java/eu/leads/distsum/UnitTest.java b/topk-plugin/src/test/java/eu/leads/distsum/UnitTest.java index 6337d3c3..e737e7b9 100644 --- a/topk-plugin/src/test/java/eu/leads/distsum/UnitTest.java +++ b/topk-plugin/src/test/java/eu/leads/distsum/UnitTest.java @@ -1,5 +1,6 @@ package eu.leads.distsum; +import org.infinispan.Cache; import org.infinispan.configuration.cache.ConfigurationBuilder; import org.infinispan.configuration.global.GlobalConfigurationBuilder; import org.infinispan.manager.EmbeddedCacheManager; @@ -21,11 +22,11 @@ * There is a group of nodes that monitor some streams of updates. We want to compute the total sum of these updates. * One node from the group acts as a coordinator and the rest of the nodes as workers. * The coordinator maintains the global sum, on the other hand, the worker nodes listen to the streams - * of updates and maintain a local sum. Furthermore, the worker nodes have some constrains, in our case - * these constrains are two numbers an upper and a lower bound. In case of an update violates the constrains, + * of updates and maintain a local sum. Furthermore, the worker nodes have some all_deltas, in our case + * these all_deltas are two numbers an upper and a lower bound. In case of an update violates the all_deltas, * then the worker informs the coordinator, which in turn asks all workers to - * send their local values in order to recompute the global sum and the new constrains for each node. - * After these recomputations, the coordinator sends the new constrains back to the workers. + * send their local values in order to recompute the global sum and the new all_deltas for each node. + * After these recomputations, the coordinator sends the new all_deltas back to the workers. * * Scenario: * We have one coordinator and 3 worker nodes. We have 4 rounds. In each round we update each worker once. Each worker's update will be chosen @@ -52,68 +53,53 @@ @Test public class UnitTest extends SingleCacheManagerTest{ - public static final double epsilon = 0.1; - public static final int k = 10; - public void run() { + Random rand = new Random(); + //The communication channel between coordinator and the workers ComChannel channel = new ComChannel(cacheManager.getCache()); - Coordinator coord = new Coordinator(channel); - //Create initial values and constrains for the workers - int numOfWorkers = 10; - ArrayList workers = new ArrayList(numOfWorkers); - Map workerValues = new HashMap(numOfWorkers); - Map workerConstrains = new HashMap(numOfWorkers); - int initValue = 10; - Constrain initConstrain = new Constrain(9,11); + ArrayList workers = new ArrayList(); + //Create initial values and all_deltas for the workers + /*Map workerValues = new HashMap(numOfWorkers); + Map workerConstrains = new HashMap(numOfWorkers);*/ + + int numOfWorkers = 100; for ( int worker = 0; worker < numOfWorkers; worker++ ) { //Create new worker with initial values - Worker w = new Worker(Integer.toString(worker), new AbstractMap.SimpleEntry(1, 10d), channel); + Worker w = new Worker(Integer.toString(worker), channel); workers.add(w); - //put worker initial globalSum into the map - /*workerValues.put(w.getId(), w.getLocalValue()); - //put worker's constrain into the map - workerConstrains.put(w.getId(),w.getConstrain());*/ //register worker to the channel channel.register(w.getId(),w); } - //Initialize structures kept by coordinator - coord.setLocalValues(workerValues); - coord.setConstrains(workerConstrains); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + System.out.println("------------"); int[] updates = {1,1,1,1,2,2,-1,-1,-2}; - int numberOfRounds = 4; - Random rand = new Random(); + int numberOfRounds = 100; + for ( int round = 0; round < numberOfRounds; round++ ) { System.out.println("********* ROUND " + round +" ********"); - int realsum = 0; + for ( int worker = 0; worker < numOfWorkers; worker++ ) { - int update = updates[rand.nextInt(updates.length)]; - int objToUpd = (int) rand.nextGaussian() + 1000; - System.out.println("obj: " + objToUpd); + //int objToUpd = (int) rand.nextGaussian() + 1000; + int objToUpd = rand.nextInt(100); + int update = updates[rand.nextInt(updates.length)]; - /*int oldValue = workers.get(worker).getLocalValue(); - workers.get(worker).update(update); + workers.get(worker).update(objToUpd, update); - System.out.println("worker " + worker + ": update=" +update+" oldval="+oldValue + " new newValue= " + workers.get(worker).getLocalValue()); - realsum += workers.get(worker).getLocalValue();*/ } - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - e.printStackTrace(); // TODO: Customise this generated block - } - System.out.println("Real sum: " + realsum + " Global Sum: " +coord.getGlobalSum()); - System.out.println("********* END OF ROUND " + round +" ********\n\n"); - assert(0.9*realsum <= coord.getGlobalSum()); - assert(1.1*realsum >= coord.getGlobalSum()); }