Skip to content

Commit

Permalink
AttributeDeduplicator works in-memory instead of persisting (typedb#5022
Browse files Browse the repository at this point in the history
)

Removed RocksDB and use ConcurrentHashMap instead, updates all the tests and dependencies.
  • Loading branch information
Marco Scoppetta authored Mar 12, 2019
1 parent 2f89056 commit 2a99b4e
Show file tree
Hide file tree
Showing 15 changed files with 175 additions and 344 deletions.
6 changes: 0 additions & 6 deletions BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,10 @@ assemble_targz(
},
empty_directories = [
"server/db/cassandra",
"server/db/queue"
],
permissions = {
"server/services/cassandra/cassandra.yaml": "0777",
"server/db/cassandra": "0777",
"server/db/queue": "0777",
},
output_filename = "grakn-core-all-linux",
visibility = ["//visibility:public"]
Expand All @@ -73,12 +71,10 @@ assemble_zip(
},
empty_directories = [
"server/db/cassandra",
"server/db/queue"
],
permissions = {
"server/services/cassandra/cassandra.yaml": "0777",
"server/db/cassandra": "0777",
"server/db/queue": "0777",
},
output_filename = "grakn-core-all-mac",
visibility = ["//visibility:public"]
Expand All @@ -98,12 +94,10 @@ assemble_zip(
},
empty_directories = [
"server/db/cassandra",
"server/db/queue"
],
permissions = {
"server/services/cassandra/cassandra.yaml": "0777",
"server/db/cassandra": "0777",
"server/db/queue": "0777",
},
output_filename = "grakn-core-all-windows",
visibility = ["//visibility:public"]
Expand Down
11 changes: 0 additions & 11 deletions dependencies/maven/artifacts/org/rocksdb/BUILD

This file was deleted.

1 change: 0 additions & 1 deletion dependencies/maven/dependencies.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -700,7 +700,6 @@ def list_dependencies():
{"artifact": "org.ow2.asm:asm:5.0.4", "lang": "java", "sha1": "0da08b8cce7bbf903602a25a3a163ae252435795", "sha256": "896618ed8ae62702521a78bc7be42b7c491a08e6920a15f89a3ecdec31e9a220", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/ow2/asm/asm/5.0.4/asm-5.0.4.jar", "source": {"sha1": "112ff54474f1f04ccf1384c92e39fdc566f0bb5e", "sha256": "7ba89bc14669d86c1c0dc6abaeb74a87715089f3b904cc2016969e8737d70707", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/ow2/asm/asm/5.0.4/asm-5.0.4-sources.jar"} , "name": "org-ow2-asm-asm", "actual": "@org-ow2-asm-asm//jar", "bind": "jar/org/ow2/asm/asm"},
{"artifact": "org.reflections:reflections:0.9.9-RC1", "lang": "java", "sha1": "b78b545f452a6b7d4fab2641dd0b0147a0f4fd5e", "sha256": "8be6ea8a7d790056c3492c2c9419985834b258345c81764984b4eb2a86c28492", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/reflections/reflections/0.9.9-RC1/reflections-0.9.9-RC1.jar", "source": {"sha1": "654dcaafdb84f03858bb69bbe031f0325bb49065", "sha256": "e495a0cde2c1348d9ee126573bb0bcd6cc87999ae8056cbbf0abe6349576aaf9", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/reflections/reflections/0.9.9-RC1/reflections-0.9.9-RC1-sources.jar"} , "name": "org-reflections-reflections", "actual": "@org-reflections-reflections//jar", "bind": "jar/org/reflections/reflections"},
{"artifact": "org.roaringbitmap:RoaringBitmap:0.5.11", "lang": "java", "sha1": "e6b04760ea1896fc36beea4f11b8649481bf5af7", "sha256": "bad2fcf146d4a41cb188a28a4216f15cc470cd2fb6417a7d0005e7f8221ff312", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/roaringbitmap/RoaringBitmap/0.5.11/RoaringBitmap-0.5.11.jar", "source": {"sha1": "dd4dc1e5ed4d98ff8421e0e26f9822873c455e34", "sha256": "15767ab00bd7b50f8711003dd544b18d446a8ef9cdba06b6ad441cb8d2fe59b3", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/roaringbitmap/RoaringBitmap/0.5.11/RoaringBitmap-0.5.11-sources.jar"} , "name": "org-roaringbitmap-RoaringBitmap", "actual": "@org-roaringbitmap-RoaringBitmap//jar", "bind": "jar/org/roaringbitmap/RoaringBitmap"},
{"artifact": "org.rocksdb:rocksdbjni:5.14.2", "lang": "java", "sha1": "a6087318fab540ba0b4c6ff68475ffbedc0b3d10", "sha256": "34b7e45d18bca957e38cad0c3269dd36c9df81313f2ff41171ec2a96a3736c7f", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/rocksdb/rocksdbjni/5.14.2/rocksdbjni-5.14.2.jar", "source": {"sha1": "ac0184f4618db881be0a9aeac98535b13f36a967", "sha256": "8fdf11a3b7b19201459cbac18d7a693331a992c159f042ab6ece7e843f6ff59b", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/rocksdb/rocksdbjni/5.14.2/rocksdbjni-5.14.2-sources.jar"} , "name": "org-rocksdb-rocksdbjni", "actual": "@org-rocksdb-rocksdbjni//jar", "bind": "jar/org/rocksdb/rocksdbjni"},
{"artifact": "org.scala-lang.modules:scala-parser-combinators_2.11:1.0.1", "lang": "java", "sha1": "f05d7345bf5a58924f2837c6c1f4d73a938e1ff0", "sha256": "19495ce701fd9ba3c499c137e3ad5364acee8f87a01ef99e912f995a23fd4cb1", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.1/scala-parser-combinators_2.11-1.0.1.jar", "source": {"sha1": "34d013c02d0b73794ba2911552896dd9c00f34c3", "sha256": "d8543ae4bed3db45361f9dfd73cb1343d6969aa371831d0e836ab99a11276d85", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.1/scala-parser-combinators_2.11-1.0.1-sources.jar"} , "name": "org-scala-lang-modules-scala-parser-combinators_2-11", "actual": "@org-scala-lang-modules-scala-parser-combinators_2-11//jar", "bind": "jar/org/scala-lang/modules/scala-parser-combinators-2-11"},
# duplicates in org.scala-lang.modules:scala-xml_2.11 promoted to 1.0.5
# - org.apache.tinkerpop:spark-gremlin:3.3.3 wanted version 1.0.5
Expand Down
5 changes: 0 additions & 5 deletions dependencies/maven/dependencies.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -404,11 +404,6 @@ dependencies:
version: "1.19"
lang: java

org.rocksdb:
rocksdbjni:
version: "5.14.2"
lang: java

org.scala-lang:
scala-library:
version: "2.11.8"
Expand Down
1 change: 0 additions & 1 deletion server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ java_library(
"//dependencies/maven/artifacts/org/apache/tinkerpop:hadoop-gremlin",
"//dependencies/maven/artifacts/org/apache/tinkerpop:spark-gremlin",
"//dependencies/maven/artifacts/org/apache/tinkerpop:tinkergraph-gremlin",
"//dependencies/maven/artifacts/org/rocksdb:rocksdbjni",
"//dependencies/maven/artifacts/org/janusgraph:janusgraph-cassandra",
"//dependencies/maven/artifacts/org/janusgraph:janusgraph-core",
"//dependencies/maven/artifacts/org/janusgraph:janusgraph-hadoop",
Expand Down
2 changes: 1 addition & 1 deletion server/src/server/ServerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static Server createServer(boolean benchmark) {
SessionFactory sessionFactory = new SessionFactory(lockManager, janusGraphFactory, keyspaceStore, config);

// post-processing
AttributeDeduplicatorDaemon attributeDeduplicatorDaemon = new AttributeDeduplicatorDaemon(config, sessionFactory);
AttributeDeduplicatorDaemon attributeDeduplicatorDaemon = new AttributeDeduplicatorDaemon(sessionFactory);

// http services: gRPC server
io.grpc.Server serverRPC = createServerRPC(config, sessionFactory, attributeDeduplicatorDaemon, keyspaceStore, janusGraphFactory, benchmark);
Expand Down
14 changes: 14 additions & 0 deletions server/src/server/deduplicator/AttributeDeduplicator.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ public static void deduplicate(SessionFactory sessionFactory, KeyspaceIndexPair
if (rolePlayerEdge.hasNext()) {
mergeRolePlayerEdge(mergeTargetV, rolePlayerEdge);
}
try {
attributeEdge.close();
rolePlayerEdge.close();
} catch (Exception e) {
LOG.warn("Exception while closing traversals:", e);
}
});
duplicate.remove();
} catch (IllegalStateException vertexAlreadyRemovedException) {
Expand All @@ -85,6 +91,14 @@ public static void deduplicate(SessionFactory sessionFactory, KeyspaceIndexPair
} else {
tx.close();
}

try {
tinker.close();
duplicates.close();
} catch (Exception e) {
LOG.warn("Exception while closing traversals:", e);
}

} finally {
session.close();
}
Expand Down
16 changes: 4 additions & 12 deletions server/src/server/deduplicator/AttributeDeduplicatorDaemon.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,14 @@
package grakn.core.server.deduplicator;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import grakn.core.common.config.Config;
import grakn.core.common.config.ConfigKey;
import grakn.core.concept.ConceptId;
import grakn.core.server.deduplicator.queue.Attribute;
import grakn.core.server.deduplicator.queue.RocksDbQueue;
import grakn.core.server.deduplicator.queue.InMemoryQueue;
import grakn.core.server.keyspace.KeyspaceImpl;
import grakn.core.server.session.SessionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -59,24 +55,20 @@
public class AttributeDeduplicatorDaemon {
private static Logger LOG = LoggerFactory.getLogger(AttributeDeduplicatorDaemon.class);
private static final int QUEUE_GET_BATCH_MAX = 1000;
private static final Path queueDataDirRelative = Paths.get("queue"); // path to the queue storage location, relative to the data directory

private ExecutorService executorServiceForDaemon = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("attribute-deduplicator-daemon-%d").build());

private SessionFactory sessionFactory;
private RocksDbQueue queue;
private InMemoryQueue queue;

private boolean stopDaemon = false;

/**
* Instantiates AttributeDeduplicatorDaemon
* @param config a reference to an instance of Config which is initialised from a grakn.properties.
* @param sessionFactory an SessionFactory instance to create new Sessions
*/
public AttributeDeduplicatorDaemon(Config config, SessionFactory sessionFactory) {
Path dataDir = Paths.get(config.getProperty(ConfigKey.DATA_DIR));
Path queueDataDir = dataDir.resolve(queueDataDirRelative);
this.queue = new RocksDbQueue(queueDataDir);
public AttributeDeduplicatorDaemon(SessionFactory sessionFactory) {
this.queue = new InMemoryQueue();
this.sessionFactory = sessionFactory;
}

Expand Down
88 changes: 88 additions & 0 deletions server/src/server/deduplicator/queue/InMemoryQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* GRAKN.AI - THE KNOWLEDGE GRAPH
* Copyright (C) 2018 Grakn Labs Ltd
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package grakn.core.server.deduplicator.queue;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* In-memory queue implemented using a ConcurrentHashMap
* It supports three operations: insert, read, and ack.
* <p>
* The read and ack should be used together in order to provide fault-tolerance.
* The attribute de-duplicator can read attributes from the queue and only ack after everything has been processed.
*/
public class InMemoryQueue {
private final Map<String, Attribute> queue;

/**
* Instantiates the class
*
*/
public InMemoryQueue() {
queue = new ConcurrentHashMap<>();
}

/**
* insert a new attribute at the end of the queue.
*
* @param attribute the attribute to be inserted
*/
public void insert(Attribute attribute) {
queue.put(attribute.conceptId().getValue(), attribute);
synchronized (this) {
notifyAll();
}
}

/**
* Read at most N attributes from the beginning of the queue. Read everything if there are less than N attributes in the queue.
* If the queue is empty, the method will block until the queue receives a new attribute.
* The attributes won't be removed from the queue until you call {@link #ack(List<Attribute>)} on the returned attributes.
*
* @param limit the maximum number of items to be returned.
* @return a list of {@link Attribute}
* @throws InterruptedException
* @see #ack(List<Attribute>)
*/
public List<Attribute> read(int limit) throws InterruptedException {
// blocks until the queue contains at least 1 element
while (queue.isEmpty()) {
synchronized (this) {
wait();
}
}

return queue.values().stream().limit(limit).collect(Collectors.toList());
}

/**
* Remove attributes from the queue.
*
* @param attributes the attributes which will be removed
*/
public void ack(List<Attribute> attributes) {
for (Attribute attr : attributes) {
queue.remove(attr.conceptId().getValue());
}
}

}
Loading

0 comments on commit 2a99b4e

Please sign in to comment.