Skip to content

Commit 1c0894a

Browse files
committed
initial code drop from hnsw-poc
1 parent 8742099 commit 1c0894a

35 files changed

+4288
-3
lines changed

fdb-extensions/fdb-extensions.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ dependencies {
2626
}
2727
api(libs.fdbJava)
2828
implementation(libs.guava)
29+
implementation(libs.half4j)
2930
implementation(libs.slf4j.api)
3031
compileOnly(libs.jsr305)
3132

fdb-extensions/src/main/java/com/apple/foundationdb/async/MoreAsyncUtil.java

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,14 @@
2323
import com.apple.foundationdb.annotation.API;
2424
import com.apple.foundationdb.util.LoggableException;
2525
import com.google.common.base.Suppliers;
26+
import com.google.common.collect.Lists;
2627
import com.google.common.util.concurrent.ThreadFactoryBuilder;
2728

2829
import javax.annotation.Nonnull;
2930
import javax.annotation.Nullable;
3031
import java.util.ArrayDeque;
3132
import java.util.ArrayList;
33+
import java.util.Arrays;
3234
import java.util.Collections;
3335
import java.util.Iterator;
3436
import java.util.List;
@@ -42,9 +44,13 @@
4244
import java.util.concurrent.ScheduledThreadPoolExecutor;
4345
import java.util.concurrent.ThreadFactory;
4446
import java.util.concurrent.TimeUnit;
47+
import java.util.concurrent.atomic.AtomicInteger;
48+
import java.util.concurrent.atomic.AtomicReference;
4549
import java.util.function.BiConsumer;
4650
import java.util.function.BiFunction;
4751
import java.util.function.Function;
52+
import java.util.function.IntPredicate;
53+
import java.util.function.IntUnaryOperator;
4854
import java.util.function.Predicate;
4955
import java.util.function.Supplier;
5056

@@ -1051,6 +1057,64 @@ public static CompletableFuture<Void> swallowException(@Nonnull CompletableFutur
10511057
return result;
10521058
}
10531059

1060+
@Nonnull
1061+
public static <U> CompletableFuture<U> forLoop(final int startI, @Nullable final U startU,
1062+
@Nonnull final IntPredicate conditionPredicate,
1063+
@Nonnull final IntUnaryOperator stepFunction,
1064+
@Nonnull final BiFunction<Integer, U, CompletableFuture<U>> body,
1065+
@Nonnull final Executor executor) {
1066+
final AtomicInteger loopVariableAtomic = new AtomicInteger(startI);
1067+
final AtomicReference<U> lastResultAtomic = new AtomicReference<>(startU);
1068+
return whileTrue(() -> {
1069+
final int loopVariable = loopVariableAtomic.get();
1070+
if (!conditionPredicate.test(loopVariable)) {
1071+
return AsyncUtil.READY_FALSE;
1072+
}
1073+
return body.apply(loopVariable, lastResultAtomic.get())
1074+
.thenApply(result -> {
1075+
loopVariableAtomic.set(stepFunction.applyAsInt(loopVariable));
1076+
lastResultAtomic.set(result);
1077+
return true;
1078+
});
1079+
}, executor).thenApply(ignored -> lastResultAtomic.get());
1080+
}
1081+
1082+
@SuppressWarnings("unchecked")
1083+
public static <T, U> CompletableFuture<List<U>> forEach(@Nonnull final Iterable<T> items,
1084+
@Nonnull final Function<T, CompletableFuture<U>> body,
1085+
final int parallelism,
1086+
@Nonnull final Executor executor) {
1087+
// this deque is only modified by once upon creation
1088+
final ArrayDeque<T> toBeProcessed = new ArrayDeque<>();
1089+
for (final T item : items) {
1090+
toBeProcessed.addLast(item);
1091+
}
1092+
1093+
final List<CompletableFuture<Void>> working = Lists.newArrayList();
1094+
final AtomicInteger indexAtomic = new AtomicInteger(0);
1095+
final Object[] resultArray = new Object[toBeProcessed.size()];
1096+
1097+
return whileTrue(() -> {
1098+
working.removeIf(CompletableFuture::isDone);
1099+
1100+
while (working.size() <= parallelism) {
1101+
final T currentItem = toBeProcessed.pollFirst();
1102+
if (currentItem == null) {
1103+
break;
1104+
}
1105+
1106+
final int index = indexAtomic.getAndIncrement();
1107+
working.add(body.apply(currentItem)
1108+
.thenAccept(result -> resultArray[index] = result));
1109+
}
1110+
1111+
if (working.isEmpty()) {
1112+
return AsyncUtil.READY_FALSE;
1113+
}
1114+
return whenAny(working).thenApply(ignored -> true);
1115+
}, executor).thenApply(ignored -> Arrays.asList((U[])resultArray));
1116+
}
1117+
10541118
/**
10551119
* A {@code Boolean} function that is always true.
10561120
* @param <T> the type of the (ignored) argument to the function
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*
2+
* AbstractNode.java
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2015-2023 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package com.apple.foundationdb.async.hnsw;
22+
23+
import com.apple.foundationdb.tuple.Tuple;
24+
import com.google.common.collect.ImmutableList;
25+
26+
import javax.annotation.Nonnull;
27+
import java.util.List;
28+
29+
/**
30+
* TODO.
31+
* @param <N> node type class.
32+
*/
33+
abstract class AbstractNode<N extends NodeReference> implements Node<N> {
34+
@Nonnull
35+
private final Tuple primaryKey;
36+
37+
@Nonnull
38+
private final List<N> neighbors;
39+
40+
protected AbstractNode(@Nonnull final Tuple primaryKey,
41+
@Nonnull final List<N> neighbors) {
42+
this.primaryKey = primaryKey;
43+
this.neighbors = ImmutableList.copyOf(neighbors);
44+
}
45+
46+
@Nonnull
47+
@Override
48+
public Tuple getPrimaryKey() {
49+
return primaryKey;
50+
}
51+
52+
@Nonnull
53+
@Override
54+
public List<N> getNeighbors() {
55+
return neighbors;
56+
}
57+
58+
@Nonnull
59+
@Override
60+
public N getNeighbor(final int index) {
61+
return neighbors.get(index);
62+
}
63+
}
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
/*
2+
* AbstractStorageAdapter.java
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2015-2023 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package com.apple.foundationdb.async.hnsw;
22+
23+
import com.apple.foundationdb.ReadTransaction;
24+
import com.apple.foundationdb.Transaction;
25+
import com.apple.foundationdb.subspace.Subspace;
26+
import com.apple.foundationdb.tuple.Tuple;
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import javax.annotation.Nonnull;
31+
import javax.annotation.Nullable;
32+
import java.util.concurrent.CompletableFuture;
33+
34+
/**
35+
* Implementations and attributes common to all concrete implementations of {@link StorageAdapter}.
36+
*/
37+
abstract class AbstractStorageAdapter<N extends NodeReference> implements StorageAdapter<N> {
38+
@Nonnull
39+
private static final Logger logger = LoggerFactory.getLogger(AbstractStorageAdapter.class);
40+
41+
@Nonnull
42+
private final HNSW.Config config;
43+
@Nonnull
44+
private final NodeFactory<N> nodeFactory;
45+
@Nonnull
46+
private final Subspace subspace;
47+
@Nonnull
48+
private final OnWriteListener onWriteListener;
49+
@Nonnull
50+
private final OnReadListener onReadListener;
51+
52+
private final Subspace dataSubspace;
53+
54+
protected AbstractStorageAdapter(@Nonnull final HNSW.Config config, @Nonnull final NodeFactory<N> nodeFactory,
55+
@Nonnull final Subspace subspace,
56+
@Nonnull final OnWriteListener onWriteListener,
57+
@Nonnull final OnReadListener onReadListener) {
58+
this.config = config;
59+
this.nodeFactory = nodeFactory;
60+
this.subspace = subspace;
61+
this.onWriteListener = onWriteListener;
62+
this.onReadListener = onReadListener;
63+
this.dataSubspace = subspace.subspace(Tuple.from(SUBSPACE_PREFIX_DATA));
64+
}
65+
66+
@Override
67+
@Nonnull
68+
public HNSW.Config getConfig() {
69+
return config;
70+
}
71+
72+
@Nonnull
73+
@Override
74+
public NodeFactory<N> getNodeFactory() {
75+
return nodeFactory;
76+
}
77+
78+
@Nonnull
79+
@Override
80+
public NodeKind getNodeKind() {
81+
return getNodeFactory().getNodeKind();
82+
}
83+
84+
@Override
85+
@Nonnull
86+
public Subspace getSubspace() {
87+
return subspace;
88+
}
89+
90+
@Override
91+
@Nonnull
92+
public Subspace getDataSubspace() {
93+
return dataSubspace;
94+
}
95+
96+
@Override
97+
@Nonnull
98+
public OnWriteListener getOnWriteListener() {
99+
return onWriteListener;
100+
}
101+
102+
@Override
103+
@Nonnull
104+
public OnReadListener getOnReadListener() {
105+
return onReadListener;
106+
}
107+
108+
@Nonnull
109+
@Override
110+
public CompletableFuture<Node<N>> fetchNode(@Nonnull final ReadTransaction readTransaction,
111+
int layer, @Nonnull Tuple primaryKey) {
112+
return fetchNodeInternal(readTransaction, layer, primaryKey).thenApply(this::checkNode);
113+
}
114+
115+
@Nonnull
116+
protected abstract CompletableFuture<Node<N>> fetchNodeInternal(@Nonnull ReadTransaction readTransaction,
117+
int layer, @Nonnull Tuple primaryKey);
118+
119+
/**
120+
* Method to perform basic invariant check(s) on a newly-fetched node.
121+
*
122+
* @param node the node to check
123+
* was passed in
124+
*
125+
* @return the node that was passed in
126+
*/
127+
@Nullable
128+
private Node<N> checkNode(@Nullable final Node<N> node) {
129+
return node;
130+
}
131+
132+
@Override
133+
public void writeNode(@Nonnull Transaction transaction, @Nonnull Node<N> node, int layer,
134+
@Nonnull NeighborsChangeSet<N> changeSet) {
135+
writeNodeInternal(transaction, node, layer, changeSet);
136+
if (logger.isDebugEnabled()) {
137+
logger.debug("written node with key={} at layer={}", node.getPrimaryKey(), layer);
138+
}
139+
}
140+
141+
protected abstract void writeNodeInternal(@Nonnull Transaction transaction, @Nonnull Node<N> node, int layer,
142+
@Nonnull NeighborsChangeSet<N> changeSet);
143+
144+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* InliningNode.java
3+
*
4+
* This source file is part of the FoundationDB open source project
5+
*
6+
* Copyright 2015-2023 Apple Inc. and the FoundationDB project authors
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
package com.apple.foundationdb.async.hnsw;
22+
23+
import com.apple.foundationdb.Transaction;
24+
import com.apple.foundationdb.tuple.Tuple;
25+
import com.google.common.collect.ImmutableList;
26+
27+
import javax.annotation.Nonnull;
28+
import javax.annotation.Nullable;
29+
import java.util.List;
30+
import java.util.function.Predicate;
31+
32+
/**
33+
* TODO.
34+
*/
35+
class BaseNeighborsChangeSet<N extends NodeReference> implements NeighborsChangeSet<N> {
36+
@Nonnull
37+
private final List<N> neighbors;
38+
39+
public BaseNeighborsChangeSet(@Nonnull final List<N> neighbors) {
40+
this.neighbors = ImmutableList.copyOf(neighbors);
41+
}
42+
43+
@Nullable
44+
@Override
45+
public BaseNeighborsChangeSet<N> getParent() {
46+
return null;
47+
}
48+
49+
@Nonnull
50+
@Override
51+
public List<N> merge() {
52+
return neighbors;
53+
}
54+
55+
@Override
56+
public void writeDelta(@Nonnull final InliningStorageAdapter storageAdapter, @Nonnull final Transaction transaction,
57+
final int layer, @Nonnull final Node<N> node,
58+
@Nonnull final Predicate<Tuple> primaryKeyPredicate) {
59+
// nothing to be written
60+
}
61+
}

0 commit comments

Comments
 (0)