From 2a99b4eb2703534c6b5a10c80c798ddacb058377 Mon Sep 17 00:00:00 2001 From: Marco Scoppetta Date: Tue, 12 Mar 2019 16:48:34 +0000 Subject: [PATCH] AttributeDeduplicator works in-memory instead of persisting (#5022) Removed RocksDB and use ConcurrentHashMap instead, updates all the tests and dependencies. --- BUILD | 6 - .../maven/artifacts/org/rocksdb/BUILD | 11 - dependencies/maven/dependencies.bzl | 1 - dependencies/maven/dependencies.yaml | 5 - server/BUILD | 1 - server/src/server/ServerFactory.java | 2 +- .../deduplicator/AttributeDeduplicator.java | 14 ++ .../AttributeDeduplicatorDaemon.java | 16 +- .../deduplicator/queue/InMemoryQueue.java | 88 ++++++++ .../deduplicator/queue/RocksDbQueue.java | 193 ------------------ server/test/server/deduplicator/BUILD | 11 +- ...bQueueTest.java => InMemoryQueueTest.java} | 124 ++++++----- .../AttributeDeduplicatorE2E.java | 44 +--- test-end-to-end/deduplicator/BUILD | 1 - test-integration/rule/GraknTestServer.java | 2 +- 15 files changed, 175 insertions(+), 344 deletions(-) delete mode 100644 dependencies/maven/artifacts/org/rocksdb/BUILD create mode 100644 server/src/server/deduplicator/queue/InMemoryQueue.java delete mode 100644 server/src/server/deduplicator/queue/RocksDbQueue.java rename server/test/server/deduplicator/{RocksDbQueueTest.java => InMemoryQueueTest.java} (50%) diff --git a/BUILD b/BUILD index 93efb0439e9..18f9ac379cf 100644 --- a/BUILD +++ b/BUILD @@ -48,12 +48,10 @@ assemble_targz( }, empty_directories = [ "server/db/cassandra", - "server/db/queue" ], permissions = { "server/services/cassandra/cassandra.yaml": "0777", "server/db/cassandra": "0777", - "server/db/queue": "0777", }, output_filename = "grakn-core-all-linux", visibility = ["//visibility:public"] @@ -73,12 +71,10 @@ assemble_zip( }, empty_directories = [ "server/db/cassandra", - "server/db/queue" ], permissions = { "server/services/cassandra/cassandra.yaml": "0777", "server/db/cassandra": "0777", - "server/db/queue": "0777", }, output_filename = "grakn-core-all-mac", visibility = ["//visibility:public"] @@ -98,12 +94,10 @@ assemble_zip( }, empty_directories = [ "server/db/cassandra", - "server/db/queue" ], permissions = { "server/services/cassandra/cassandra.yaml": "0777", "server/db/cassandra": "0777", - "server/db/queue": "0777", }, output_filename = "grakn-core-all-windows", visibility = ["//visibility:public"] diff --git a/dependencies/maven/artifacts/org/rocksdb/BUILD b/dependencies/maven/artifacts/org/rocksdb/BUILD deleted file mode 100644 index 6f363464945..00000000000 --- a/dependencies/maven/artifacts/org/rocksdb/BUILD +++ /dev/null @@ -1,11 +0,0 @@ -java_library( - name = "rocksdbjni", - exports = [ - "//external:jar/org/rocksdb/rocksdbjni" - ], - visibility = [ - "//visibility:public" - ] -) - - diff --git a/dependencies/maven/dependencies.bzl b/dependencies/maven/dependencies.bzl index 9930e74f120..0463b3171ee 100644 --- a/dependencies/maven/dependencies.bzl +++ b/dependencies/maven/dependencies.bzl @@ -700,7 +700,6 @@ def list_dependencies(): {"artifact": "org.ow2.asm:asm:5.0.4", "lang": "java", "sha1": "0da08b8cce7bbf903602a25a3a163ae252435795", "sha256": "896618ed8ae62702521a78bc7be42b7c491a08e6920a15f89a3ecdec31e9a220", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/ow2/asm/asm/5.0.4/asm-5.0.4.jar", "source": {"sha1": "112ff54474f1f04ccf1384c92e39fdc566f0bb5e", "sha256": "7ba89bc14669d86c1c0dc6abaeb74a87715089f3b904cc2016969e8737d70707", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/ow2/asm/asm/5.0.4/asm-5.0.4-sources.jar"} , "name": "org-ow2-asm-asm", "actual": "@org-ow2-asm-asm//jar", "bind": "jar/org/ow2/asm/asm"}, {"artifact": "org.reflections:reflections:0.9.9-RC1", "lang": "java", "sha1": "b78b545f452a6b7d4fab2641dd0b0147a0f4fd5e", "sha256": "8be6ea8a7d790056c3492c2c9419985834b258345c81764984b4eb2a86c28492", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/reflections/reflections/0.9.9-RC1/reflections-0.9.9-RC1.jar", "source": {"sha1": "654dcaafdb84f03858bb69bbe031f0325bb49065", "sha256": "e495a0cde2c1348d9ee126573bb0bcd6cc87999ae8056cbbf0abe6349576aaf9", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/reflections/reflections/0.9.9-RC1/reflections-0.9.9-RC1-sources.jar"} , "name": "org-reflections-reflections", "actual": "@org-reflections-reflections//jar", "bind": "jar/org/reflections/reflections"}, {"artifact": "org.roaringbitmap:RoaringBitmap:0.5.11", "lang": "java", "sha1": "e6b04760ea1896fc36beea4f11b8649481bf5af7", "sha256": "bad2fcf146d4a41cb188a28a4216f15cc470cd2fb6417a7d0005e7f8221ff312", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/roaringbitmap/RoaringBitmap/0.5.11/RoaringBitmap-0.5.11.jar", "source": {"sha1": "dd4dc1e5ed4d98ff8421e0e26f9822873c455e34", "sha256": "15767ab00bd7b50f8711003dd544b18d446a8ef9cdba06b6ad441cb8d2fe59b3", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/roaringbitmap/RoaringBitmap/0.5.11/RoaringBitmap-0.5.11-sources.jar"} , "name": "org-roaringbitmap-RoaringBitmap", "actual": "@org-roaringbitmap-RoaringBitmap//jar", "bind": "jar/org/roaringbitmap/RoaringBitmap"}, - {"artifact": "org.rocksdb:rocksdbjni:5.14.2", "lang": "java", "sha1": "a6087318fab540ba0b4c6ff68475ffbedc0b3d10", "sha256": "34b7e45d18bca957e38cad0c3269dd36c9df81313f2ff41171ec2a96a3736c7f", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/rocksdb/rocksdbjni/5.14.2/rocksdbjni-5.14.2.jar", "source": {"sha1": "ac0184f4618db881be0a9aeac98535b13f36a967", "sha256": "8fdf11a3b7b19201459cbac18d7a693331a992c159f042ab6ece7e843f6ff59b", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/rocksdb/rocksdbjni/5.14.2/rocksdbjni-5.14.2-sources.jar"} , "name": "org-rocksdb-rocksdbjni", "actual": "@org-rocksdb-rocksdbjni//jar", "bind": "jar/org/rocksdb/rocksdbjni"}, {"artifact": "org.scala-lang.modules:scala-parser-combinators_2.11:1.0.1", "lang": "java", "sha1": "f05d7345bf5a58924f2837c6c1f4d73a938e1ff0", "sha256": "19495ce701fd9ba3c499c137e3ad5364acee8f87a01ef99e912f995a23fd4cb1", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.1/scala-parser-combinators_2.11-1.0.1.jar", "source": {"sha1": "34d013c02d0b73794ba2911552896dd9c00f34c3", "sha256": "d8543ae4bed3db45361f9dfd73cb1343d6969aa371831d0e836ab99a11276d85", "repository": "https://repo.maven.apache.org/maven2/", "url": "https://repo.maven.apache.org/maven2/org/scala-lang/modules/scala-parser-combinators_2.11/1.0.1/scala-parser-combinators_2.11-1.0.1-sources.jar"} , "name": "org-scala-lang-modules-scala-parser-combinators_2-11", "actual": "@org-scala-lang-modules-scala-parser-combinators_2-11//jar", "bind": "jar/org/scala-lang/modules/scala-parser-combinators-2-11"}, # duplicates in org.scala-lang.modules:scala-xml_2.11 promoted to 1.0.5 # - org.apache.tinkerpop:spark-gremlin:3.3.3 wanted version 1.0.5 diff --git a/dependencies/maven/dependencies.yaml b/dependencies/maven/dependencies.yaml index 88c6a4ae717..9b719714493 100644 --- a/dependencies/maven/dependencies.yaml +++ b/dependencies/maven/dependencies.yaml @@ -404,11 +404,6 @@ dependencies: version: "1.19" lang: java - org.rocksdb: - rocksdbjni: - version: "5.14.2" - lang: java - org.scala-lang: scala-library: version: "2.11.8" diff --git a/server/BUILD b/server/BUILD index 2c54e517c90..b55148b8e07 100644 --- a/server/BUILD +++ b/server/BUILD @@ -68,7 +68,6 @@ java_library( "//dependencies/maven/artifacts/org/apache/tinkerpop:hadoop-gremlin", "//dependencies/maven/artifacts/org/apache/tinkerpop:spark-gremlin", "//dependencies/maven/artifacts/org/apache/tinkerpop:tinkergraph-gremlin", - "//dependencies/maven/artifacts/org/rocksdb:rocksdbjni", "//dependencies/maven/artifacts/org/janusgraph:janusgraph-cassandra", "//dependencies/maven/artifacts/org/janusgraph:janusgraph-core", "//dependencies/maven/artifacts/org/janusgraph:janusgraph-hadoop", diff --git a/server/src/server/ServerFactory.java b/server/src/server/ServerFactory.java index c350bbe749f..81151ef81e7 100644 --- a/server/src/server/ServerFactory.java +++ b/server/src/server/ServerFactory.java @@ -59,7 +59,7 @@ public static Server createServer(boolean benchmark) { SessionFactory sessionFactory = new SessionFactory(lockManager, janusGraphFactory, keyspaceStore, config); // post-processing - AttributeDeduplicatorDaemon attributeDeduplicatorDaemon = new AttributeDeduplicatorDaemon(config, sessionFactory); + AttributeDeduplicatorDaemon attributeDeduplicatorDaemon = new AttributeDeduplicatorDaemon(sessionFactory); // http services: gRPC server io.grpc.Server serverRPC = createServerRPC(config, sessionFactory, attributeDeduplicatorDaemon, keyspaceStore, janusGraphFactory, benchmark); diff --git a/server/src/server/deduplicator/AttributeDeduplicator.java b/server/src/server/deduplicator/AttributeDeduplicator.java index 129dc86f740..e62f4709d21 100644 --- a/server/src/server/deduplicator/AttributeDeduplicator.java +++ b/server/src/server/deduplicator/AttributeDeduplicator.java @@ -74,6 +74,12 @@ public static void deduplicate(SessionFactory sessionFactory, KeyspaceIndexPair if (rolePlayerEdge.hasNext()) { mergeRolePlayerEdge(mergeTargetV, rolePlayerEdge); } + try { + attributeEdge.close(); + rolePlayerEdge.close(); + } catch (Exception e) { + LOG.warn("Exception while closing traversals:", e); + } }); duplicate.remove(); } catch (IllegalStateException vertexAlreadyRemovedException) { @@ -85,6 +91,14 @@ public static void deduplicate(SessionFactory sessionFactory, KeyspaceIndexPair } else { tx.close(); } + + try { + tinker.close(); + duplicates.close(); + } catch (Exception e) { + LOG.warn("Exception while closing traversals:", e); + } + } finally { session.close(); } diff --git a/server/src/server/deduplicator/AttributeDeduplicatorDaemon.java b/server/src/server/deduplicator/AttributeDeduplicatorDaemon.java index 4d432230d54..f0e9014a394 100644 --- a/server/src/server/deduplicator/AttributeDeduplicatorDaemon.java +++ b/server/src/server/deduplicator/AttributeDeduplicatorDaemon.java @@ -19,18 +19,14 @@ package grakn.core.server.deduplicator; import com.google.common.util.concurrent.ThreadFactoryBuilder; -import grakn.core.common.config.Config; -import grakn.core.common.config.ConfigKey; import grakn.core.concept.ConceptId; import grakn.core.server.deduplicator.queue.Attribute; -import grakn.core.server.deduplicator.queue.RocksDbQueue; +import grakn.core.server.deduplicator.queue.InMemoryQueue; import grakn.core.server.keyspace.KeyspaceImpl; import grakn.core.server.session.SessionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.nio.file.Path; -import java.nio.file.Paths; import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -59,24 +55,20 @@ public class AttributeDeduplicatorDaemon { private static Logger LOG = LoggerFactory.getLogger(AttributeDeduplicatorDaemon.class); private static final int QUEUE_GET_BATCH_MAX = 1000; - private static final Path queueDataDirRelative = Paths.get("queue"); // path to the queue storage location, relative to the data directory private ExecutorService executorServiceForDaemon = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("attribute-deduplicator-daemon-%d").build()); private SessionFactory sessionFactory; - private RocksDbQueue queue; + private InMemoryQueue queue; private boolean stopDaemon = false; /** * Instantiates AttributeDeduplicatorDaemon - * @param config a reference to an instance of Config which is initialised from a grakn.properties. * @param sessionFactory an SessionFactory instance to create new Sessions */ - public AttributeDeduplicatorDaemon(Config config, SessionFactory sessionFactory) { - Path dataDir = Paths.get(config.getProperty(ConfigKey.DATA_DIR)); - Path queueDataDir = dataDir.resolve(queueDataDirRelative); - this.queue = new RocksDbQueue(queueDataDir); + public AttributeDeduplicatorDaemon(SessionFactory sessionFactory) { + this.queue = new InMemoryQueue(); this.sessionFactory = sessionFactory; } diff --git a/server/src/server/deduplicator/queue/InMemoryQueue.java b/server/src/server/deduplicator/queue/InMemoryQueue.java new file mode 100644 index 00000000000..caf902194f0 --- /dev/null +++ b/server/src/server/deduplicator/queue/InMemoryQueue.java @@ -0,0 +1,88 @@ +/* + * GRAKN.AI - THE KNOWLEDGE GRAPH + * Copyright (C) 2018 Grakn Labs Ltd + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package grakn.core.server.deduplicator.queue; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * In-memory queue implemented using a ConcurrentHashMap + * It supports three operations: insert, read, and ack. + *

+ * The read and ack should be used together in order to provide fault-tolerance. + * The attribute de-duplicator can read attributes from the queue and only ack after everything has been processed. + */ +public class InMemoryQueue { + private final Map queue; + + /** + * Instantiates the class + * + */ + public InMemoryQueue() { + queue = new ConcurrentHashMap<>(); + } + + /** + * insert a new attribute at the end of the queue. + * + * @param attribute the attribute to be inserted + */ + public void insert(Attribute attribute) { + queue.put(attribute.conceptId().getValue(), attribute); + synchronized (this) { + notifyAll(); + } + } + + /** + * Read at most N attributes from the beginning of the queue. Read everything if there are less than N attributes in the queue. + * If the queue is empty, the method will block until the queue receives a new attribute. + * The attributes won't be removed from the queue until you call {@link #ack(List)} on the returned attributes. + * + * @param limit the maximum number of items to be returned. + * @return a list of {@link Attribute} + * @throws InterruptedException + * @see #ack(List) + */ + public List read(int limit) throws InterruptedException { + // blocks until the queue contains at least 1 element + while (queue.isEmpty()) { + synchronized (this) { + wait(); + } + } + + return queue.values().stream().limit(limit).collect(Collectors.toList()); + } + + /** + * Remove attributes from the queue. + * + * @param attributes the attributes which will be removed + */ + public void ack(List attributes) { + for (Attribute attr : attributes) { + queue.remove(attr.conceptId().getValue()); + } + } + +} \ No newline at end of file diff --git a/server/src/server/deduplicator/queue/RocksDbQueue.java b/server/src/server/deduplicator/queue/RocksDbQueue.java deleted file mode 100644 index d8300932e87..00000000000 --- a/server/src/server/deduplicator/queue/RocksDbQueue.java +++ /dev/null @@ -1,193 +0,0 @@ -/* - * GRAKN.AI - THE KNOWLEDGE GRAPH - * Copyright (C) 2018 Grakn Labs Ltd - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -package grakn.core.server.deduplicator.queue; - -import grakn.core.concept.ConceptId; -import grakn.core.server.keyspace.KeyspaceImpl; -import mjson.Json; -import org.rocksdb.Options; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; -import org.rocksdb.WriteBatch; -import org.rocksdb.WriteOptions; - -import java.nio.file.Path; -import java.util.LinkedList; -import java.util.List; - -import static grakn.core.server.deduplicator.queue.RocksDbQueue.SerialisationUtils.deserialiseAttributeUtf8; -import static grakn.core.server.deduplicator.queue.RocksDbQueue.SerialisationUtils.serialiseAttributeUtf8; -import static grakn.core.server.deduplicator.queue.RocksDbQueue.SerialisationUtils.serialiseStringUtf8; -import static java.nio.charset.StandardCharsets.UTF_8; - -/** - * An implementation of a FIFO queue to support the attribute de-duplication in RocksDB. - * It supports three operations: insert, read, and ack. - * - * The read and ack should be used together in order to provide fault-tolerance. - * The attribute de-duplicator can read attributes from the queue and only ack after everything has been processed. - * If the de-duplicator crashes during a deduplication, it can resume operation from the last ack-ed attribute. - * - */ -public class RocksDbQueue implements AutoCloseable { - private final RocksDB queueDb; - - /** - * Instantiates the class and the queue data directory - * - * @param path where to persist the queue data - */ - public RocksDbQueue(Path path) { - try { - Options options = new Options().setCreateIfMissing(true); - queueDb = RocksDB.open(options, path.toAbsolutePath().toString()); - } - catch (RocksDBException e) { - throw new RocksDbQueueException(e); - } - } - - /** - * insert a new attribute at the end of the queue. - * - * @param attribute the attribute to be inserted - */ - public void insert(Attribute attribute) { - WriteOptions syncWrite = new WriteOptions().setSync(true); - try { - queueDb.put(syncWrite, serialiseStringUtf8(attribute.conceptId().getValue()), serialiseAttributeUtf8(attribute)); - syncWrite.close(); - synchronized (this) { notifyAll(); } - } - catch (RocksDBException e) { - throw new RocksDbQueueException(e); - } - } - - /** - * Read at most N attributes from the beginning of the queue. Read everything if there are less than N attributes in the queue. - * If the queue is empty, the method will block until the queue receives a new attribute. - * The attributes won't be removed from the queue until you call {@link #ack(List)} on the returned attributes. - * - * @param limit the maximum number of items to be returned. - * @return a list of {@link Attribute} - * @throws InterruptedException - * @see #ack(List) - */ - public List read(int limit) throws InterruptedException { - // blocks until the queue contains at least 1 element - while (isQueueEmpty(queueDb)) { - synchronized (this) { wait(); } - } - - List result = new LinkedList<>(); - - RocksIterator it = queueDb.newIterator(); - it.seekToFirst(); - int count = 0; - while (it.isValid() && count < limit) { - Attribute attr = deserialiseAttributeUtf8(it.value()); - result.add(attr); - it.next(); - count++; - } - it.close(); - - return result; - } - - /** - * Remove attributes from the queue. - * - * @param attributes the attributes which will be removed - */ - public void ack(List attributes) { - WriteBatch acks = new WriteBatch(); - // set to false for better performance. at the moment we're setting it to true as the algorithm is untested and we prefer correctness over speed - WriteOptions writeOptions = new WriteOptions().setSync(true); - for (Attribute attr: attributes) { - try { - acks.delete(serialiseStringUtf8(attr.conceptId().getValue())); - } catch (RocksDBException e) { - throw new RocksDbQueueException(e); - } - } - try { - queueDb.write(writeOptions, acks); - acks.close(); - writeOptions.close(); - } - catch (RocksDBException e) { - throw new RocksDbQueueException(e); - } - } - - /** - * Close the {@link RocksDbQueue} instance. - */ - public void close() { - queueDb.close(); - } - - /** - * Check if the queue is empty. - * - * @param queueDb the queue to be checked - * @return true if empty, false otherwise - */ - private boolean isQueueEmpty(RocksDB queueDb) { - RocksIterator it = queueDb.newIterator(); - it.seekToFirst(); - boolean isQueueEmpty = !it.isValid(); - it.close(); - return isQueueEmpty; - } - - /** - * Serialisation helpers for the {@link RocksDbQueue}. Don't add any other serialisation methods that are not related to it. - */ - static class SerialisationUtils { - //TODO: figure out if we could not use/depend on Json, which brings in mjson.Json - static byte[] serialiseAttributeUtf8(Attribute attribute) { - Json json = Json.object( - "attribute-keyspace", attribute.keyspace().name(), - "attribute-index", attribute.index(), - "attribute-concept-id", attribute.conceptId().getValue() - ); - return serialiseStringUtf8(json.toString()); - } - - static Attribute deserialiseAttributeUtf8(byte[] attribute) { - Json json = Json.read(deserializeStringUtf8(attribute)); - String keyspace = json.at("attribute-keyspace").asString(); - String value = json.at("attribute-index").asString(); - String conceptId = json.at("attribute-concept-id").asString(); - return Attribute.create(KeyspaceImpl.of(keyspace), value, ConceptId.of(conceptId)); - } - - static String deserializeStringUtf8(byte[] bytes) { - return new String(bytes, UTF_8); - } - - static byte[] serialiseStringUtf8(String string) { - return string.getBytes(UTF_8); - } - } -} \ No newline at end of file diff --git a/server/test/server/deduplicator/BUILD b/server/test/server/deduplicator/BUILD index aecf6dae8ca..a8017cf50d0 100644 --- a/server/test/server/deduplicator/BUILD +++ b/server/test/server/deduplicator/BUILD @@ -19,9 +19,9 @@ load("@graknlabs_build_tools//checkstyle:rules.bzl", "checkstyle_test") java_test( - name = "rocks-db-queue-test", - test_class = "grakn.core.server.deduplicator.RocksDbQueueTest", - srcs = ["RocksDbQueueTest.java"], + name = "in-memory-queue-test", + test_class = "grakn.core.server.deduplicator.InMemoryQueueTest", + srcs = ["InMemoryQueueTest.java"], deps = [ "//concept:concept", "//server:server", @@ -29,13 +29,12 @@ java_test( "//dependencies/maven/artifacts/commons-io:commons-io", "//dependencies/maven/artifacts/org/hamcrest:hamcrest-library", ], - size = "small", - flaky = True + size = "small" ) checkstyle_test( name = "checkstyle", targets = [ - ":rocks-db-queue-test" + ":in-memory-queue-test" ], ) diff --git a/server/test/server/deduplicator/RocksDbQueueTest.java b/server/test/server/deduplicator/InMemoryQueueTest.java similarity index 50% rename from server/test/server/deduplicator/RocksDbQueueTest.java rename to server/test/server/deduplicator/InMemoryQueueTest.java index 80ad404b910..986c2d3d80c 100644 --- a/server/test/server/deduplicator/RocksDbQueueTest.java +++ b/server/test/server/deduplicator/InMemoryQueueTest.java @@ -20,15 +20,16 @@ import grakn.core.concept.ConceptId; import grakn.core.server.deduplicator.queue.Attribute; -import grakn.core.server.deduplicator.queue.RocksDbQueue; +import grakn.core.server.deduplicator.queue.InMemoryQueue; import grakn.core.server.keyspace.KeyspaceImpl; -import org.apache.commons.io.FileUtils; import org.junit.Test; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -39,74 +40,66 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; -public class RocksDbQueueTest { +public class InMemoryQueueTest { @Test - public void shouldBeAbleToInsertNewAttributes() throws InterruptedException, IOException { - Path queuePath = Files.createTempDirectory("rocksdb-test-dir"); - try (RocksDbQueue queue = new RocksDbQueue(queuePath)) { - List attributes = Arrays.asList( - Attribute.create(KeyspaceImpl.of("k1"), "v1", ConceptId.of("c1")), - Attribute.create(KeyspaceImpl.of("k2"), "v2", ConceptId.of("c2")), - Attribute.create(KeyspaceImpl.of("k3"), "v3", ConceptId.of("c3")), - Attribute.create(KeyspaceImpl.of("k4"), "v4", ConceptId.of("c4")), - Attribute.create(KeyspaceImpl.of("k5"), "v5", ConceptId.of("c5")) - ); - - for (Attribute attr : attributes) { - queue.insert(attr); - } - List insertedAttributes = queue.read(attributes.size()); - assertThat(insertedAttributes, equalTo(attributes)); + public void shouldBeAbleToInsertNewAttributes() throws InterruptedException { + InMemoryQueue queue = new InMemoryQueue(); + List attributes = Arrays.asList( + Attribute.create(KeyspaceImpl.of("k1"), "v1", ConceptId.of("c1")), + Attribute.create(KeyspaceImpl.of("k2"), "v2", ConceptId.of("c2")), + Attribute.create(KeyspaceImpl.of("k3"), "v3", ConceptId.of("c3")), + Attribute.create(KeyspaceImpl.of("k4"), "v4", ConceptId.of("c4")), + Attribute.create(KeyspaceImpl.of("k5"), "v5", ConceptId.of("c5")) + ); + + for (Attribute attr : attributes) { + queue.insert(attr); } - FileUtils.deleteDirectory(queuePath.toFile()); + List insertedAttributes = queue.read(attributes.size()); + assertThat(new HashSet<>(insertedAttributes), equalTo(new HashSet<>(attributes))); } + @Test - public void readButUnackedAttributesShouldRemainInTheQueue() throws InterruptedException, IOException { - Path queuePath = Files.createTempDirectory("rocksdb-test-dir"); - try (RocksDbQueue queue = new RocksDbQueue(queuePath)) { - List attributes = Arrays.asList( - Attribute.create(KeyspaceImpl.of("k1"), "v1", ConceptId.of("c1")), - Attribute.create(KeyspaceImpl.of("k2"), "v2", ConceptId.of("c2")), - Attribute.create(KeyspaceImpl.of("k3"), "v3", ConceptId.of("c3")), - Attribute.create(KeyspaceImpl.of("k4"), "v4", ConceptId.of("c4")), - Attribute.create(KeyspaceImpl.of("k5"), "v5", ConceptId.of("c5")) - ); - - for (Attribute attr : attributes) { - queue.insert(attr); - } - List insertedAttributes = queue.read(Integer.MAX_VALUE); - assertThat(insertedAttributes, equalTo(attributes)); - List remainingAttributes = queue.read(Integer.MAX_VALUE); - assertThat(remainingAttributes, equalTo(attributes)); + public void readButUnackedAttributesShouldRemainInTheQueue() throws InterruptedException { + InMemoryQueue queue = new InMemoryQueue(); + List attributes = Arrays.asList( + Attribute.create(KeyspaceImpl.of("k1"), "v1", ConceptId.of("c1")), + Attribute.create(KeyspaceImpl.of("k2"), "v2", ConceptId.of("c2")), + Attribute.create(KeyspaceImpl.of("k3"), "v3", ConceptId.of("c3")), + Attribute.create(KeyspaceImpl.of("k4"), "v4", ConceptId.of("c4")), + Attribute.create(KeyspaceImpl.of("k5"), "v5", ConceptId.of("c5")) + ); + + for (Attribute attr : attributes) { + queue.insert(attr); } - FileUtils.deleteDirectory(queuePath.toFile()); + List insertedAttributes = queue.read(Integer.MAX_VALUE); + assertThat(new HashSet<>(insertedAttributes), equalTo(new HashSet<>(attributes))); + List remainingAttributes = queue.read(Integer.MAX_VALUE); + assertThat(new HashSet<>(remainingAttributes), equalTo(new HashSet<>(attributes))); } @Test - public void shouldBeAbleToAckOnlySomeOfTheReadAttributes() throws InterruptedException, IOException { - Path queuePath = Files.createTempDirectory("rocksdb-test-dir"); - try (RocksDbQueue queue = new RocksDbQueue(queuePath)) { - List attributes1 = Arrays.asList( - Attribute.create(KeyspaceImpl.of("k1"), "v1", ConceptId.of("c1")), - Attribute.create(KeyspaceImpl.of("k2"), "v2", ConceptId.of("c2")) - ); - List attributes2 = Arrays.asList( - Attribute.create(KeyspaceImpl.of("k3"), "v3", ConceptId.of("c3")), - Attribute.create(KeyspaceImpl.of("k4"), "v4", ConceptId.of("c4")), - Attribute.create(KeyspaceImpl.of("k5"), "v5", ConceptId.of("c5")) - ); - - Stream.concat(attributes1.stream(), attributes2.stream()).forEach(attr -> queue.insert(attr)); - - List insertedAttributes1 = queue.read(attributes1.size()); - queue.ack(insertedAttributes1); - List insertedAttributes2 = queue.read(Integer.MAX_VALUE); - assertThat(insertedAttributes2, equalTo(attributes2)); - } - FileUtils.deleteDirectory(queuePath.toFile()); + public void shouldBeAbleToAckOnlySomeOfTheReadAttributes() throws InterruptedException { + InMemoryQueue queue = new InMemoryQueue(); + List attributes = new ArrayList<>(Arrays.asList( + Attribute.create(KeyspaceImpl.of("k1"), "v1", ConceptId.of("c1")), + Attribute.create(KeyspaceImpl.of("k2"), "v2", ConceptId.of("c2")), + Attribute.create(KeyspaceImpl.of("k3"), "v3", ConceptId.of("c3")), + Attribute.create(KeyspaceImpl.of("k4"), "v4", ConceptId.of("c4")), + Attribute.create(KeyspaceImpl.of("k5"), "v5", ConceptId.of("c5")) + )); + + attributes.forEach(queue::insert); + List insertedAttributes1 = queue.read(2); + queue.ack(insertedAttributes1); + + + List insertedAttributes2 = queue.read(Integer.MAX_VALUE); + attributes.removeAll(insertedAttributes1); + assertThat(new HashSet<>(insertedAttributes2), equalTo(new HashSet<>(attributes))); } /** @@ -119,9 +112,8 @@ public void shouldBeAbleToAckOnlySomeOfTheReadAttributes() throws InterruptedExc * @throws TimeoutException */ @Test(expected = TimeoutException.class) - public void theReadMethodMustBlockIfTheQueueIsEmpty() throws IOException, ExecutionException, InterruptedException, TimeoutException { - Path queuePath = Files.createTempDirectory("rocksdb-test-dir"); - RocksDbQueue queue = new RocksDbQueue(queuePath); + public void theReadMethodMustBlockIfTheQueueIsEmpty() throws ExecutionException, InterruptedException, TimeoutException { + InMemoryQueue queue = new InMemoryQueue(); // perform a read() on the currently empty queue asynchronously. CompletableFuture> readMustBlock = CompletableFuture.supplyAsync(() -> { @@ -137,7 +129,6 @@ public void theReadMethodMustBlockIfTheQueueIsEmpty() throws IOException, Execut } /** - * * the read() method implements the 'guarded block' pattern using wait() and notifyAll(), and we want to * test if it properly returns after blocking only once the queue is non-empty. * @@ -147,9 +138,8 @@ public void theReadMethodMustBlockIfTheQueueIsEmpty() throws IOException, Execut * @throws TimeoutException */ @Test - public void theReadMethodMustReturnOnceTheQueueIsNonEmpty() throws IOException, InterruptedException, ExecutionException, TimeoutException { - Path queuePath = Files.createTempDirectory("rocksdb-test-dir"); - RocksDbQueue queue = new RocksDbQueue(queuePath); + public void theReadMethodMustReturnOnceTheQueueIsNonEmpty() throws InterruptedException, ExecutionException, TimeoutException { + InMemoryQueue queue = new InMemoryQueue(); Attribute input = Attribute.create(KeyspaceImpl.of("k1"), "v1", ConceptId.of("c1")); List expectedOutput = Arrays.asList(input); diff --git a/test-end-to-end/deduplicator/AttributeDeduplicatorE2E.java b/test-end-to-end/deduplicator/AttributeDeduplicatorE2E.java index 943441435bd..04cdfcfbbf7 100644 --- a/test-end-to-end/deduplicator/AttributeDeduplicatorE2E.java +++ b/test-end-to-end/deduplicator/AttributeDeduplicatorE2E.java @@ -26,10 +26,6 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.rocksdb.Options; -import org.rocksdb.RocksDB; -import org.rocksdb.RocksDBException; -import org.rocksdb.RocksIterator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.zeroturnaround.exec.ProcessExecutor; @@ -84,7 +80,7 @@ public static void cleanup_cleanupDistribution() throws IOException, Interrupted } @Test - public void shouldDeduplicateAttributes() throws RocksDBException, InterruptedException, ExecutionException { + public void shouldDeduplicateAttributes() throws InterruptedException, ExecutionException { int numOfUniqueNames = 10; int numOfDuplicatesPerName = 673; ExecutorService executorServiceForParallelInsertion = Executors.newFixedThreadPool(8); @@ -99,9 +95,8 @@ public void shouldDeduplicateAttributes() throws RocksDBException, InterruptedEx // wait until queue is empty LOG.info("names and duplicates have been inserted. waiting for the deduplication to finish..."); - long timeoutMs = 60000; - long pollFrequencyMs = 2000; - waitUntilAllAttributesDeduplicated(timeoutMs, pollFrequencyMs); + long timeoutMs = 10000; + waitUntilAllAttributesDeduplicated(timeoutMs); LOG.info("deduplication has finished."); // verify deduplicated attributes @@ -152,39 +147,10 @@ private static void insertNameShuffled(GraknClient.Session session, int nameCoun CompletableFuture.allOf(asyncInsertions.toArray(new CompletableFuture[] {})).get(); } - private void waitUntilAllAttributesDeduplicated(long timeoutMs, long pollFrequencyMs) throws RocksDBException, InterruptedException { - long startMs = System.currentTimeMillis(); - int queueSize = countRemainingItemsInQueue(queuePath); - while (queueSize > 0) { - LOG.info("deduplication in progress. there are " + queueSize + " attributes left to process."); - Thread.sleep(pollFrequencyMs); - long elapsedMs = System.currentTimeMillis() - startMs; - if (elapsedMs > timeoutMs) { - String message = "waitUntilAllAttributesDeduplicated - Timeout of '" + timeoutMs + "ms has been exceeded. There are '" + queueSize + "' items remaining in the queue."; - throw new RuntimeException(message); - } - queueSize = countRemainingItemsInQueue(queuePath); - } + private void waitUntilAllAttributesDeduplicated(long timeoutMs) throws InterruptedException { + Thread.sleep(timeoutMs); } - /** - * Count the number of elements in the queue - * - * @param queuePath the queue to be checked - * @return the number of elements in the queue - */ - private int countRemainingItemsInQueue(Path queuePath) throws RocksDBException { - RocksDB queue = RocksDB.openReadOnly(new Options(), queuePath.toAbsolutePath().toString()); - RocksIterator it = queue.newIterator(); - it.seekToFirst(); - int count = 0; - while (it.isValid()) { - it.next(); - count++; - } - queue.close(); - return count; - } private int countTotalNames(GraknClient.Session session) { try (GraknClient.Transaction tx = session.transaction().read()) { diff --git a/test-end-to-end/deduplicator/BUILD b/test-end-to-end/deduplicator/BUILD index 92f2679738f..f8624e5b87b 100644 --- a/test-end-to-end/deduplicator/BUILD +++ b/test-end-to-end/deduplicator/BUILD @@ -16,7 +16,6 @@ java_test( # External dependencies from Maven "//dependencies/maven/artifacts/commons-io:commons-io", - "//dependencies/maven/artifacts/org/rocksdb:rocksdbjni", "//dependencies/maven/artifacts/org/zeroturnaround:zt-exec", "//dependencies/maven/artifacts/org/hamcrest:hamcrest-library", "//dependencies/maven/artifacts/org/slf4j:slf4j-api" diff --git a/test-integration/rule/GraknTestServer.java b/test-integration/rule/GraknTestServer.java index 2f59f6665c2..90801c417d0 100644 --- a/test-integration/rule/GraknTestServer.java +++ b/test-integration/rule/GraknTestServer.java @@ -207,7 +207,7 @@ private Server createServer() { // tx-factory sessionFactory = new SessionFactory(lockManager, janusGraphFactory, keyspaceStore, serverConfig); - AttributeDeduplicatorDaemon attributeDeduplicatorDaemon = new AttributeDeduplicatorDaemon(serverConfig, sessionFactory); + AttributeDeduplicatorDaemon attributeDeduplicatorDaemon = new AttributeDeduplicatorDaemon(sessionFactory); OpenRequest requestOpener = new ServerOpenRequest(sessionFactory); io.grpc.Server serverRPC = ServerBuilder.forPort(grpcPort)