diff --git a/bash/blockchair.sh b/bash/blockchair.sh new file mode 100755 index 0000000..eb2a6c3 --- /dev/null +++ b/bash/blockchair.sh @@ -0,0 +1,56 @@ +#!/bin/bash -e + +if [[ "$2" == "" ]]; then + echo "$(basename $0) DIR NTHREADS [OUTPUT]" 1>&2 + echo "Reads files in DIR and processes them using NTHREADS parallel sorts." 1>&2 + echo "Files are processed as input files unless OUTPUT is specified." 1>&2 + echo "FILES MUST END WITH A NEWLINE. Fix them with \"sed -i -e '\$a\\' *\"." 1>&2 + exit 1 +fi + +DIR=$1 +NTHREADS=$2 +OUTPUT=$3 + +function file_ends_with_newline() { + [[ $(tail -c1 "$1" | wc -l) -gt 0 ]] +} + +FILES=$(mktemp) +find $DIR -type f >$FILES + +# Check that all files end with a newline + +while read FILE; do + if ! file_ends_with_newline $FILE; then + echo "File $FILE does not end with a newline" 1>&2 + exit 1 + fi +done <$FILES + +NFILES=$(cat $FILES | wc -l) + +# To avoid empty splits, there must be at least as many threads as files + +if (( NFILES < NTHREADS )); then + NTHREADS=$NFILES + echo "Not enough files: number of threads set to $NFILES" 1>&2 +fi + +SPLITBASE=$(mktemp) +split -n l/$NTHREADS $FILES $SPLITBASE +SPLITS=$(for file in ${SPLITBASE}?*; do echo $file; done) + +for SPLIT in $SPLITS; do + mkfifo $SPLIT.pipe + if [[ "$OUTPUT" != "" ]]; then + (tail -q -n+2 $(cat $SPLIT) | cut -f2,7,10 | awk '{ if ($3 == 0) print $1 "\t" $2 }' | LC_ALL=C sort -S2G >$SPLIT.pipe) & + else + (tail -q -n+2 $(cat $SPLIT) | cut -f7,13 | awk '{ print $2 "\t" $1 }' | LC_ALL=C sort -S2G >$SPLIT.pipe) & + fi +done + +LC_ALL=C sort -S2G -m $(for SPLIT in $SPLITS; do echo $SPLIT.pipe; done) + +rm -f $FILES +rm -f ${SPLITBASE}* diff --git a/src/it/unimi/dsi/webgraph/EFGraph.java b/src/it/unimi/dsi/webgraph/EFGraph.java index ca85a89..1aa9801 100644 --- a/src/it/unimi/dsi/webgraph/EFGraph.java +++ b/src/it/unimi/dsi/webgraph/EFGraph.java @@ -637,14 +637,14 @@ public static EFGraph loadOffline(final CharSequence basename) throws IOExceptio return EFGraph.loadMapped(basename, null); } - /** An iterator returning the offsets. */ - private final static class OffsetsLongIterator implements LongIterator { + /** An iterator returning offsets by reading δ-encoded gaps. */ + public final static class OffsetsLongIterator implements LongIterator { private final InputBitStream offsetIbs; private final long n; private long offset; private long i; - private OffsetsLongIterator(final InputBitStream offsetIbs, final long n) { + public OffsetsLongIterator(final InputBitStream offsetIbs, final long n) { this.offsetIbs = offsetIbs; this.n = n; } diff --git a/src/it/unimi/dsi/webgraph/ScatteredArcsASCIIGraph.java b/src/it/unimi/dsi/webgraph/ScatteredArcsASCIIGraph.java index 1208360..55b2427 100644 --- a/src/it/unimi/dsi/webgraph/ScatteredArcsASCIIGraph.java +++ b/src/it/unimi/dsi/webgraph/ScatteredArcsASCIIGraph.java @@ -42,7 +42,6 @@ import it.unimi.dsi.fastutil.BigArrays; import it.unimi.dsi.fastutil.Hash; -import it.unimi.dsi.fastutil.booleans.BooleanBigArrays; import it.unimi.dsi.fastutil.bytes.ByteArrays; import it.unimi.dsi.fastutil.ints.IntBigArrays; import it.unimi.dsi.fastutil.io.BinIO; @@ -145,61 +144,60 @@ public class ScatteredArcsASCIIGraph extends ImmutableSequentialGraph { /** The list of identifiers in order of appearance. */ public long[] ids; - private static final class Long2IntOpenHashBigMap implements java.io.Serializable, Cloneable, Hash { - public static final long serialVersionUID = 0L; - + public static class Id2NodeMap implements Hash { /** The big array of keys. */ - public transient long[][] key; + protected long[][] key; /** The big array of values. */ - public transient int[][] value; + protected int[][] value; + + /** Whether the zero key is present (the value is stored in {@link #zeroValue). */ + protected boolean containsZeroKey; - /** The big array telling whether a position is used. */ - protected transient boolean[][] used; + /** The value associated with the zero key, if {@link #containsZeroKey}. */ + protected int zeroValue; /** The acceptable load factor. */ protected final float f; /** The current table size (always a power of 2). */ - protected transient long n; + protected long n; /** Threshold after which we rehash. It must be the table size times {@link #f}. */ - protected transient long maxFill; + protected long maxFill; /** The mask for wrapping a position counter. */ - protected transient long mask; + protected long mask; /** The mask for wrapping a segment counter. */ - protected transient int segmentMask; + protected int segmentMask; /** The mask for wrapping a base counter. */ - protected transient int baseMask; + protected int baseMask; /** Number of entries in the set. */ - protected long size; + protected int size; - /** Initialises the mask values. */ private void initMasks() { mask = n - 1; - /* - * Note that either we have more than one segment, and in this case all segments are - * BigArrays.SEGMENT_SIZE long, or we have exactly one segment whose length is a power of - * two. - */ - segmentMask = key[0].length - 1; baseMask = key.length - 1; + /* Note that either we have more than one segment, and in this case all segments + * are BigArrays.SEGMENT_SIZE long, or we have exactly one segment whose length + * is a power of two. */ + segmentMask = key[0].length - 1; } /** - * Creates a new hash big set. + * Creates a new map based on a hash table. * - *

The actual table size will be the least power of two greater than + *

+ * The actual table size will be the least power of two greater than * expected/f. * - * @param expected the expected number of elements in the set. + * @param expected the expected number of elements in the map. * @param f the load factor. */ - public Long2IntOpenHashBigMap(final long expected, final float f) { + public Id2NodeMap(final long expected, final float f) { if (f <= 0 || f > 1) throw new IllegalArgumentException("Load factor must be greater than 0 and smaller than or equal to 1"); if (n < 0) throw new IllegalArgumentException("The expected number of elements must be nonnegative"); this.f = f; @@ -207,7 +205,6 @@ public Long2IntOpenHashBigMap(final long expected, final float f) { maxFill = maxFill(n, f); key = LongBigArrays.newBigArray(n); value = IntBigArrays.newBigArray(n); - used = BooleanBigArrays.newBigArray(n); initMasks(); } @@ -216,82 +213,69 @@ public Long2IntOpenHashBigMap(final long expected, final float f) { * and {@link Hash#DEFAULT_LOAD_FACTOR} as load factor. */ - public Long2IntOpenHashBigMap() { + public Id2NodeMap() { this(DEFAULT_INITIAL_SIZE, DEFAULT_LOAD_FACTOR); } - public int put(final long k, final int v) { - final long h = it.unimi.dsi.fastutil.HashCommon.murmurHash3(k); - - // The starting point. - int displ = (int)(h & segmentMask); - int base = (int)((h & mask) >>> BigArrays.SEGMENT_SHIFT); - - // There's always an unused entry. - while (used[base][displ]) { - if (k == key[base][displ]) { - final int oldValue = value[base][displ]; - value[base][displ] = v; - return oldValue; - } - base = (base + ((displ = (displ + 1) & segmentMask) == 0 ? 1 : 0)) & baseMask; - } + /** + * Returns the node associated with a given identifier, assigning a new one if necessary. + * + * @param id an identifier. + * @return the associated node. + */ + public int getNode(final long id) { + if (id == 0) { + if (containsZeroKey) return zeroValue; + zeroValue = size; + containsZeroKey = true; + } else { - used[base][displ] = true; - key[base][displ] = k; - value[base][displ] = v; + final long h = it.unimi.dsi.fastutil.HashCommon.mix(id); - if (++size >= maxFill) rehash(2 * n); - return -1; - } - - public int get(final long k) { - final long h = it.unimi.dsi.fastutil.HashCommon.murmurHash3(k); + // The starting point. + int displ = (int)(h & segmentMask); + int base = (int)((h & mask) >>> BigArrays.SEGMENT_SHIFT); - // The starting point. - int displ = (int)(h & segmentMask); - int base = (int)((h & mask) >>> BigArrays.SEGMENT_SHIFT); + // There's always an unused entry. + while (key[base][displ] != 0) { + if (id == key[base][displ]) return value[base][displ]; + base = (base + ((displ = (displ + 1) & segmentMask) == 0 ? 1 : 0)) & baseMask; + } - // There's always an unused entry. - while (used[base][displ]) { - if (k == key[base][displ]) return value[base][displ]; - base = (base + ((displ = (displ + 1) & segmentMask) == 0 ? 1 : 0)) & baseMask; + key[base][displ] = id; + value[base][displ] = size; } - return -1; + if (++size >= maxFill) rehash(2 * n); + return size - 1; } protected void rehash(final long newN) { - final boolean used[][] = this.used; final long key[][] = this.key; final int[][] value = this.value; - final boolean newUsed[][] = BooleanBigArrays.newBigArray(newN); final long newKey[][] = LongBigArrays.newBigArray(newN); final int newValue[][] = IntBigArrays.newBigArray(newN); final long newMask = newN - 1; - final int newSegmentMask = newKey[0].length - 1; final int newBaseMask = newKey.length - 1; + final int newSegmentMask = newKey[0].length - 1; + final int realSize = containsZeroKey ? size - 1 : size; int base = 0, displ = 0; - long h; - long k; - - for (long i = size; i-- != 0;) { - while (!used[base][displ]) + for (int i = realSize; i-- != 0;) { + while (key[base][displ] == 0) base = (base + ((displ = (displ + 1) & segmentMask) == 0 ? 1 : 0)); - k = key[base][displ]; - h = it.unimi.dsi.fastutil.HashCommon.murmurHash3(k); + final long k = key[base][displ]; + final long h = it.unimi.dsi.fastutil.HashCommon.mix(k); // The starting point. int d = (int)(h & newSegmentMask); int b = (int)((h & newMask) >>> BigArrays.SEGMENT_SHIFT); - while (newUsed[b][d]) + while (newKey[b][d] != 0) b = (b + ((d = (d + 1) & newSegmentMask) == 0 ? 1 : 0)) & newBaseMask; - newUsed[b][d] = true; newKey[b][d] = k; newValue[b][d] = value[base][displ]; @@ -301,20 +285,56 @@ protected void rehash(final long newN) { this.n = newN; this.key = newKey; this.value = newValue; - this.used = newUsed; initMasks(); maxFill = maxFill(n, f); } - public void compact() { + /** + * Returns the id list in order of appearance as an array. + * + *

+ * The map is not usable after this call. + * + * @param tempDir a temporary directory for storing keys and values. + * @return the id list in order of appearance. + */ + public long[] getIds(final File tempDir) throws IOException { + // Here we assume that the map is a minimal perfect hash + final int realSize = containsZeroKey ? size - 1 : size; int base = 0, displ = 0, b = 0, d = 0; - for(long i = size; i-- != 0;) { - while (! used[base][displ]) base = (base + ((displ = (displ + 1) & segmentMask) == 0 ? 1 : 0)) & baseMask; + for (int i = realSize; i-- != 0;) { + while (key[base][displ] == 0) base = (base + ((displ = (displ + 1) & segmentMask) == 0 ? 1 : 0)) & baseMask; key[b][d] = key[base][displ]; value[b][d] = value[base][displ]; base = (base + ((displ = (displ + 1) & segmentMask) == 0 ? 1 : 0)) & baseMask; b = (b + ((d = (d + 1) & segmentMask) == 0 ? 1 : 0)) & baseMask; } + + if (containsZeroKey) { + key[b][d] = 0; + value[b][d] = zeroValue; + } + + // The following weird code minimizes memory usage + final File keyFile = File.createTempFile(ScatteredArcsASCIIGraph.class.getSimpleName(), "keys", tempDir); + keyFile.deleteOnExit(); + final File valueFile = File.createTempFile(ScatteredArcsASCIIGraph.class.getSimpleName(), "values", tempDir); + valueFile.deleteOnExit(); + + BinIO.storeLongs(key, 0, size(), keyFile); + BinIO.storeInts(value, 0, size(), valueFile); + + key = null; + value = null; + + final long[][] key = BinIO.loadLongsBig(keyFile); + keyFile.delete(); + final int[][] value = BinIO.loadIntsBig(valueFile); + valueFile.delete(); + + final long[] result = new long[size]; + for (int i = (int)size(); i-- != 0;) result[BigArrays.get(value, i)] = BigArrays.get(key, i); + return result; } public long size() { @@ -465,7 +485,7 @@ public ScatteredArcsASCIIGraph(final InputStream is, final Object2LongFunction function, Charset charset, final int n, final boolean symmetrize, final boolean noLoops, final int batchSize, final File tempDir, final ProgressLogger pl) throws IOException { @SuppressWarnings("resource") final FastBufferedInputStream fbis = new FastBufferedInputStream(is); - Long2IntOpenHashBigMap map = new Long2IntOpenHashBigMap(); + final Id2NodeMap map = new Id2NodeMap(); int numNodes = -1; if (charset == null) charset = Charset.forName("ISO-8859-1"); @@ -523,8 +543,7 @@ public ScatteredArcsASCIIGraph(final InputStream is, final Object2LongFunction " + s); } @@ -566,8 +585,7 @@ public ScatteredArcsASCIIGraph(final InputStream is, final Object2LongFunction " + t); } @@ -624,34 +642,7 @@ public ScatteredArcsASCIIGraph(final InputStream is, final Object2LongFunctionnull. */ public ScatteredArcsASCIIGraph(final Iterator arcs, final boolean symmetrize, final boolean noLoops, final int batchSize, final File tempDir, final ProgressLogger pl) throws IOException { - Long2IntOpenHashBigMap map = new Long2IntOpenHashBigMap(); + final Id2NodeMap map = new Id2NodeMap(); int numNodes = -1; @@ -703,11 +694,9 @@ public ScatteredArcsASCIIGraph(final Iterator arcs, final boolean symmet while(arcs.hasNext()) { final long[] arc = arcs.next(); final long sl = arc[0]; - int s = map.get(sl); - if (s == -1) map.put(sl, s = (int)map.size()); + final int s = map.getNode(sl); final long tl = arc[1]; - int t = map.get(tl); - if (t == -1) map.put(tl, t = (int)map.size()); + final int t = map.getNode(tl); if (s != t || ! noLoops) { source[j] = s; @@ -742,32 +731,7 @@ public ScatteredArcsASCIIGraph(final Iterator arcs, final boolean symmet source = null; target = null; - map.compact(); - - final File keyFile = File.createTempFile(ScatteredArcsASCIIGraph.class.getSimpleName(), "keys", tempDir); - keyFile.deleteOnExit(); - final File valueFile = File.createTempFile(ScatteredArcsASCIIGraph.class.getSimpleName(), "values", tempDir); - valueFile.deleteOnExit(); - - BinIO.storeLongs(map.key, 0, map.size(), keyFile); - BinIO.storeInts(map.value, 0, map.size(), valueFile); - - map = null; - - long[][] key = BinIO.loadLongsBig(keyFile); - keyFile.delete(); - int[][] value = BinIO.loadIntsBig(valueFile); - valueFile.delete(); - - ids = new long[numNodes]; - - final long[] result = new long[numNodes]; - for(int i = numNodes; i--!= 0;) result[BigArrays.get(value, i)] = BigArrays.get(key, i); - ids = result; - - key = null; - value = null; - + ids = map.getIds(tempDir); batchGraph = new Transform.BatchGraph(numNodes, pairs, batches); } diff --git a/src/it/unimi/dsi/webgraph/Transform.java b/src/it/unimi/dsi/webgraph/Transform.java index 900c576..f9bdbdc 100644 --- a/src/it/unimi/dsi/webgraph/Transform.java +++ b/src/it/unimi/dsi/webgraph/Transform.java @@ -17,12 +17,7 @@ package it.unimi.dsi.webgraph; -import java.io.BufferedOutputStream; -import java.io.DataInput; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileOutputStream; -import java.io.IOException; +import java.io.*; import java.lang.reflect.Field; import java.lang.reflect.InvocationTargetException; import java.util.ArrayList; @@ -656,7 +651,7 @@ public static ImmutableGraph map(final ImmutableGraph g, final int map[], final if (! g.randomAccess()) throw new IllegalArgumentException("Graph mapping requires random access"); final int sourceNumNodes = g.numNodes(); - if (map.length != sourceNumNodes) throw new IllegalArgumentException("The graph to be mapped has " + sourceNumNodes + " whereas the map contains " + map.length + " entries"); + if (map.length != sourceNumNodes) throw new IllegalArgumentException("The graph to be mapped has " + sourceNumNodes + " nodes whereas the map contains " + map.length + " entries"); int max = -1; if (pl != null) { @@ -1199,7 +1194,7 @@ public int[] successorArray() { final int numPairs = this.numPairs; // Neither quicksort nor heaps are stable, so we reestablish order here. IntArrays.quickSort(successor, 0, numPairs); - if (numPairs!= 0) { + if (numPairs != 0) { int p = 0; for (int j = 1; j < numPairs; j++) if (successor[p] != successor[j]) successor[++p] = successor[j]; outdegree = p + 1; @@ -1271,6 +1266,290 @@ protected void finalize() throws Throwable { } + public static class ArcLabelledBatchGraph extends ArcLabelledImmutableSequentialGraph { + private final int n; + private final long numArcs; + private final ObjectArrayList batches; + private final ObjectArrayList labelBatches; + private final Label prototype; + private final LabelMergeStrategy labelMergeStrategy; + + public ArcLabelledBatchGraph(int n, long numArcs, ObjectArrayList batches, ObjectArrayList labelBatches, Label prototype, final LabelMergeStrategy labelMergeStrategy) { + this.n = n; + this.numArcs = numArcs; + this.batches = batches; + this.labelBatches = labelBatches; + this.prototype = prototype; + this.labelMergeStrategy = labelMergeStrategy; + } + + @Override + public int numNodes() { return n; } + @Override + public long numArcs() { return numArcs; } + @Override + public boolean hasCopiableIterators() { return true; } + + class InternalArcLabelledNodeIterator extends ArcLabelledNodeIterator { + /** The buffer size. We can't make it too big—there's two per batch, per thread. */ + private static final int STD_BUFFER_SIZE = 64 * 1024; + private final int[] refArray; + private final InputBitStream[] batchIbs; + private final InputBitStream[] labelInputBitStream; + private final int[] inputStreamLength; + private final int[] prevTarget; + + // The indirect queue used to merge the batches. + private final IntHeapSemiIndirectPriorityQueue queue; + /** The limit for {@link #hasNext()}. */ + private final int hasNextLimit; + + /** The last returned node (-1 if no node has been returned yet). */ + private int last; + /** The outdegree of the current node (valid if {@link #last} is not -1). */ + private int outdegree; + /** The number of pairs associated with the current node (valid if {@link #last} is not -1). */ + private int numPairs; + /** The successors of the current node (valid if {@link #last} is not -1); + * only the first {@link #outdegree} entries are meaningful. */ + private int[] successor; + /** The labels of the arcs going out of the current node (valid if {@link #last} is not -1); + * only the first {@link #outdegree} entries are meaningful. */ + private Label[] label; + + public InternalArcLabelledNodeIterator(final int upperBound) throws IOException { + this(upperBound, null, null, null, null, null, -1, -1, IntArrays.EMPTY_ARRAY, Label.EMPTY_LABEL_ARRAY); + } + + public InternalArcLabelledNodeIterator(final int upperBound, final InputBitStream[] baseIbs, final InputBitStream[] baseLabelInputBitStream, final int[] refArray, final int[] prevTarget, final int[] inputStreamLength, final int last, final int outdegree, final int successor[], final Label[] label) throws IOException { + this.hasNextLimit = Math.min(n, upperBound) - 1; + this.last = last; + this.outdegree = outdegree; + this.successor = successor; + this.label = label; + batchIbs = new InputBitStream[batches.size()]; + labelInputBitStream = new InputBitStream[batches.size()]; + + if (refArray == null) { + this.refArray = new int[batches.size()]; + this.prevTarget = new int[batches.size()]; + this.inputStreamLength = new int[batches.size()]; + Arrays.fill(this.prevTarget, -1); + queue = new IntHeapSemiIndirectPriorityQueue(this.refArray); + // We open all files and load the first element into the reference array. + for(int i = 0; i < batches.size(); i++) { + batchIbs[i] = new InputBitStream(batches.get(i), STD_BUFFER_SIZE); + labelInputBitStream[i] = new InputBitStream(labelBatches.get(i), STD_BUFFER_SIZE); + this.inputStreamLength[i] = batchIbs[i].readDelta(); + this.refArray[i] = batchIbs[i].readDelta(); + queue.enqueue(i); + } + } + else { + this.refArray = refArray; + this.prevTarget = prevTarget; + this.inputStreamLength = inputStreamLength; + queue = new IntHeapSemiIndirectPriorityQueue(refArray); + + for(int i = 0; i < refArray.length; i++) { + if (baseIbs[i] != null) { + batchIbs[i] = new InputBitStream(batches.get(i), STD_BUFFER_SIZE); + batchIbs[i].position(baseIbs[i].position()); + labelInputBitStream[i] = new InputBitStream(labelBatches.get(i), STD_BUFFER_SIZE); + labelInputBitStream[i].position(baseLabelInputBitStream[i].position()); + queue.enqueue(i); + } + } + } + } + + @Override + public ArcLabelledNodeIterator copy(final int upperBound) { + try { + if (last == -1) return new InternalArcLabelledNodeIterator(upperBound); + else return new InternalArcLabelledNodeIterator(upperBound, batchIbs, labelInputBitStream, + refArray.clone(), prevTarget.clone(), inputStreamLength.clone(), last, outdegree(), Arrays.copyOf(successor, outdegree()), Arrays.copyOf(label, outdegree())); + } + catch (final IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public int outdegree() { + if (last == -1) throw new IllegalStateException(); + sortSuccessors(); + return outdegree; + } + + @Override + public boolean hasNext() { + return last < hasNextLimit; + } + + @Override + public int nextInt() { + if (! hasNext()) throw new NoSuchElementException(); + last++; + int d = 0; + outdegree = -1; + int i; + + try { + /* We extract elements from the queue as long as their target is equal + * to last. If during the process we exhaust a batch, we close it. */ + + while(! queue.isEmpty() && refArray[i = queue.first()] == last) { + successor = IntArrays.grow(successor, d + 1); + successor[d] = (prevTarget[i] += batchIbs[i].readDelta() + 1); + label = ObjectArrays.grow(label, d + 1); + label[d] = prototype.copy(); + label[d].fromBitStream(labelInputBitStream[i], last); + + if (--inputStreamLength[i] == 0) { + queue.dequeue(); + batchIbs[i].close(); + batchIbs[i] = null; + labelInputBitStream[i].close(); + labelInputBitStream[i] = null; + } + else { + // We read a new source and update the queue. + final int sourceDelta = batchIbs[i].readDelta(); + if (sourceDelta != 0) { + refArray[i] += sourceDelta; + prevTarget[i] = -1; + queue.changed(); + } + } + d++; + } + + numPairs = d; + } + catch(final IOException e) { + e.printStackTrace(); + throw new RuntimeException(this + " " + e); + } + + return last; + } + + @Override + public int[] successorArray() { + if (last == -1) throw new IllegalStateException(); + if (outdegree == -1) sortSuccessors(); + return successor; + } + + @Override + public Label[] labelArray() { + if (last == -1) throw new IllegalStateException(); + if (outdegree == -1) sortSuccessors(); + return super.labelArray(); + } + + @Override + public LabelledArcIterator successors() { + if (last == -1) throw new IllegalStateException(); + if (outdegree == -1) sortSuccessors(); + return new LabelledArcIterator() { + int last = -1; + + @Override + public Label label() { + return label[last]; + } + + @Override + public int nextInt() { + if (last + 1 == outdegree) return -1; + return successor[++last]; + } + + @Override + public int skip(final int k) { + final int toSkip = Math.min(k, outdegree - last - 1); + last += toSkip; + return toSkip; + } + }; + } + + @SuppressWarnings("deprecation") + @Override + protected void finalize() throws Throwable { + try { + for(final InputBitStream ibs: batchIbs) if (ibs != null) ibs.close(); + for(final InputBitStream ibs: labelInputBitStream) if (ibs != null) ibs.close(); + } + finally { + super.finalize(); + } + } + + private void sortSuccessors() { + // Compute outdegree + if (outdegree == -1) { + final int numPairs = this.numPairs; + // Neither quicksort nor heaps are stable, so we reestablish order here. + it.unimi.dsi.fastutil.Arrays.quickSort(0, numPairs, (x, y) -> Integer.compare(successor[x], successor[y]), + (x, y) -> { + final int t = successor[x]; + successor[x] = successor[y]; + successor[y] = t; + final Label l = label[x]; + label[x] = label[y]; + label[y] = l; + }); + + if (numPairs != 0) { + // Avoid returning the duplicate arcs + int p = 0; + for (int j = 1; j < numPairs; j++) { + if (successor[p] != successor[j]) { + successor[++p] = successor[j]; + } else if (labelMergeStrategy != null) { + label[p] = labelMergeStrategy.merge(label[p], label[j]); + } + } + outdegree = p + 1; + } + else outdegree = 0; + } + } + + } + + @Override + public ArcLabelledNodeIterator nodeIterator() { + try { + return new InternalArcLabelledNodeIterator(Integer.MAX_VALUE); + } + catch (final IOException e) { + throw new RuntimeException(e); + } + } + + @SuppressWarnings("deprecation") + @Override + protected void finalize() throws Throwable { + try { + for(final File f : batches) f.delete(); + for(final File f : labelBatches) f.delete(); + } + finally { + super.finalize(); + } + } + + @Override + public Label prototype() { + return prototype; + } + } + + /** Sorts the given source and target arrays w.r.t. the target and stores them in a temporary file. * * @param n the index of the last element to be sorted (exclusive). @@ -1319,79 +1598,114 @@ else if (target[i] != target[i - 1]) { return u; } - /** Sorts the given source and target arrays w.r.t. the target and stores them in two temporary files. - * An additional positionable input bit stream is provided that contains labels, starting at given positions. - * Labels are also written onto the appropriate file. + /** + * Sorts the given source and target arrays w.r.t. the target and stores them in two temporary files. An additional + * positionable input bit stream is provided that contains labels, starting at given positions. Labels are also + * written onto the appropriate file. * * @param n the index of the last element to be sorted (exclusive). * @param source the source array. * @param target the target array. - * @param start the array containing the bit position (within the given input stream) where the label of the arc starts. + * @param start the array containing the bit position (within the given input stream) where the label of the arc + * starts. * @param labelBitStream the positionable bit stream containing the labels. * @param tempDir a temporary directory where to store the sorted arrays. * @param batches a list of files to which the batch file will be added. * @param labelBatches a list of files to which the label batch file will be added. + * @param labelMergeStrategy + * @return the number of pairs in the batch (might be less than n because duplicates are eliminated). */ - private static void processTransposeBatch(final int n, final int[] source, final int[] target, final long[] start, - final InputBitStream labelBitStream, final File tempDir, final List batches, final List labelBatches, - final Label prototype) throws IOException { + public static int processTransposeBatch(final int n, final int[] source, final int[] target, final long[] start, + final InputBitStream labelBitStream, final File tempDir, final List batches, final List labelBatches, + Label prototype, final LabelMergeStrategy labelMergeStrategy) throws IOException { it.unimi.dsi.fastutil.Arrays.parallelQuickSort(0, n, (x,y) -> { - final int t = Integer.compare(source[x], source[y]); - if (t != 0) return t; - return Integer.compare(target[x], target[y]); - }, - (x, y) -> { - int t = source[x]; - source[x] = source[y]; - source[y] = t; - t = target[x]; - target[x] = target[y]; - target[y] = t; - final long u = start[x]; - start[x] = start[y]; - start[y] = u; - }); + final int t = Integer.compare(source[x], source[y]); + if (t != 0) return t; + return Integer.compare(target[x], target[y]); + }, + (x, y) -> { + int t = source[x]; + source[x] = source[y]; + source[y] = t; + t = target[x]; + target[x] = target[y]; + target[y] = t; + final long u = start[x]; + start[x] = start[y]; + start[y] = u; + }); final File batchFile = File.createTempFile("batch", ".bitstream", tempDir); batchFile.deleteOnExit(); batches.add(batchFile); final OutputBitStream batch = new OutputBitStream(batchFile); + final File labelFile = File.createTempFile("label-", ".bits", tempDir); + labelFile.deleteOnExit(); + labelBatches.add(labelFile); + final OutputBitStream labelObs = new OutputBitStream(labelFile); + + // Used to handle duplicate arcs with different labels + final Label otherPrototype = prototype.copy(); + + int u = 0; + if (n != 0) { // Compute unique pairs - batch.writeDelta(n); + u = 1; + for(int i = n - 1; i-- != 0;) if (source[i] != source[i + 1] || target[i] != target[i + 1]) u++; + batch.writeDelta(u); + int prevSource = source[0]; batch.writeDelta(prevSource); batch.writeDelta(target[0]); + labelBitStream.position(start[0]); + prototype.fromBitStream(labelBitStream, source[0]); + for(int i = 1; i < n; i++) { if (source[i] != prevSource) { batch.writeDelta(source[i] - prevSource); batch.writeDelta(target[i]); prevSource = source[i]; + + prototype.toBitStream(labelObs, target[i - 1]); + labelBitStream.position(start[i]); + prototype.fromBitStream(labelBitStream, source[i]); } else if (target[i] != target[i - 1]) { // We don't write duplicate pairs batch.writeDelta(0); batch.writeDelta(target[i] - target[i - 1] - 1); + + prototype.toBitStream(labelObs, target[i - 1]); + labelBitStream.position(start[i]); + prototype.fromBitStream(labelBitStream, source[i]); + } + else { + // Duplicate arcs, overwrite the label with either the new label encountered or merging the two labels. + labelBitStream.position(start[i]); + + if (labelMergeStrategy != null) { + otherPrototype.fromBitStream(labelBitStream, source[i]); + prototype = labelMergeStrategy.merge(otherPrototype, prototype); + } + else { + prototype.fromBitStream(labelBitStream, source[i]); + } } } + + prototype.toBitStream(labelObs, target[n - 1]); } + else batch.writeDelta(0); batch.close(); - - final File labelFile = File.createTempFile("label-", ".bits", tempDir); - labelFile.deleteOnExit(); - labelBatches.add(labelFile); - final OutputBitStream labelObs = new OutputBitStream(labelFile); - for (int i = 0; i < n; i++) { - labelBitStream.position(start[i]); - prototype.fromBitStream(labelBitStream, source[i]); - prototype.toBitStream(labelObs, target[i]); - } labelObs.close(); + + return u; } /** Returns an immutable graph obtained by reversing all arcs in g, using an offline method. @@ -1701,7 +2015,7 @@ public static ArcLabelledImmutableGraph transposeOffline(final ArcLabelledImmuta if (j == batchSize) { obs.flush(); - processTransposeBatch(batchSize, source, target, start, new InputBitStream(fbos.array), tempDir, batches, labelBatches, prototype); + processTransposeBatch(batchSize, source, target, start, new InputBitStream(fbos.array), tempDir, batches, labelBatches, prototype, null); fbos = new FastByteArrayOutputStream(); obs = new OutputBitStream(fbos); //ALERT here we should re-use j = 0; @@ -1714,7 +2028,7 @@ public static ArcLabelledImmutableGraph transposeOffline(final ArcLabelledImmuta if (j != 0) { obs.flush(); - processTransposeBatch(j, source, target, start, new InputBitStream(fbos.array), tempDir, batches, labelBatches, prototype); + processTransposeBatch(j, source, target, start, new InputBitStream(fbos.array), tempDir, batches, labelBatches, prototype, null); } if (pl != null) { @@ -1725,239 +2039,10 @@ public static ArcLabelledImmutableGraph transposeOffline(final ArcLabelledImmuta final long numArcs = m; // Now we return an immutable graph whose nodeIterator() merges the batches on the fly. - return new ArcLabelledImmutableSequentialGraph() { - @Override - public int numNodes() { return n; } - @Override - public long numArcs() { return numArcs; } - @Override - public boolean hasCopiableIterators() { return true; } - - class InternalArcLabelledNodeIterator extends ArcLabelledNodeIterator { - /** The buffer size. We can't make it too big—there's two per batch, per thread. */ - private static final int STD_BUFFER_SIZE = 64 * 1024; - private final int[] refArray; - private final InputBitStream[] batchIbs; - private final InputBitStream[] labelInputBitStream; - private final int[] inputStreamLength; - private final int[] prevTarget; - - // The indirect queue used to merge the batches. - private final IntHeapSemiIndirectPriorityQueue queue; - /** The limit for {@link #hasNext()}. */ - private final int hasNextLimit; - - /** The last returned node (-1 if no node has been returned yet). */ - private int last; - /** The outdegree of the current node (valid if {@link #last} is not -1). */ - private int outdegree; - /** The successors of the current node (valid if {@link #last} is not -1); - * only the first {@link #outdegree} entries are meaningful. */ - private int[] successor; - /** The labels of the arcs going out of the current node (valid if {@link #last} is not -1); - * only the first {@link #outdegree} entries are meaningful. */ - private Label[] label; - - public InternalArcLabelledNodeIterator(final int upperBound) throws IOException { - this(upperBound, null, null, null, null, null, -1, 0, IntArrays.EMPTY_ARRAY, Label.EMPTY_LABEL_ARRAY); - } - - public InternalArcLabelledNodeIterator(final int upperBound, final InputBitStream[] baseIbs, final InputBitStream[] baseLabelInputBitStream, final int[] refArray, final int[] prevTarget, final int[] inputStreamLength, final int last, final int outdegree, final int successor[], final Label[] label) throws IOException { - this.hasNextLimit = Math.min(n, upperBound) - 1; - this.last = last; - this.outdegree = outdegree; - this.successor = successor; - this.label = label; - batchIbs = new InputBitStream[batches.size()]; - labelInputBitStream = new InputBitStream[batches.size()]; - - if (refArray == null) { - this.refArray = new int[batches.size()]; - this.prevTarget = new int[batches.size()]; - this.inputStreamLength = new int[batches.size()]; - Arrays.fill(this.prevTarget, -1); - queue = new IntHeapSemiIndirectPriorityQueue(this.refArray); - // We open all files and load the first element into the reference array. - for(int i = 0; i < batches.size(); i++) { - batchIbs[i] = new InputBitStream(batches.get(i), STD_BUFFER_SIZE); - labelInputBitStream[i] = new InputBitStream(labelBatches.get(i), STD_BUFFER_SIZE); - this.inputStreamLength[i] = batchIbs[i].readDelta(); - this.refArray[i] = batchIbs[i].readDelta(); - queue.enqueue(i); - } - } - else { - this.refArray = refArray; - this.prevTarget = prevTarget; - this.inputStreamLength = inputStreamLength; - queue = new IntHeapSemiIndirectPriorityQueue(refArray); - - for(int i = 0; i < refArray.length; i++) { - if (baseIbs[i] != null) { - batchIbs[i] = new InputBitStream(batches.get(i), STD_BUFFER_SIZE); - batchIbs[i].position(baseIbs[i].position()); - labelInputBitStream[i] = new InputBitStream(labelBatches.get(i), STD_BUFFER_SIZE); - labelInputBitStream[i].position(baseLabelInputBitStream[i].position()); - queue.enqueue(i); - } - } - } - } - - @Override - public int outdegree() { - if (last == -1) throw new IllegalStateException(); - return outdegree; - } - - @Override - public boolean hasNext() { - return last < hasNextLimit; - } - - @Override - public int nextInt() { - last++; - int d = 0; - int i; - - try { - /* We extract elements from the queue as long as their target is equal - * to last. If during the process we exhaust a batch, we close it. */ - - while(! queue.isEmpty() && refArray[i = queue.first()] == last) { - successor = IntArrays.grow(successor, d + 1); - successor[d] = (prevTarget[i] += batchIbs[i].readDelta() + 1); - label = ObjectArrays.grow(label, d + 1); - label[d] = prototype.copy(); - label[d].fromBitStream(labelInputBitStream[i], last); - - if (--inputStreamLength[i] == 0) { - queue.dequeue(); - batchIbs[i].close(); - labelInputBitStream[i].close(); - batchIbs[i] = null; - labelInputBitStream[i] = null; - } - else { - // We read a new source and update the queue. - final int sourceDelta = batchIbs[i].readDelta(); - if (sourceDelta != 0) { - refArray[i] += sourceDelta; - prevTarget[i] = -1; - queue.changed(); - } - } - d++; - } - // Neither quicksort nor heaps are stable, so we reestablish order here. - it.unimi.dsi.fastutil.Arrays.quickSort(0, d, (x, y) -> Integer.compare(successor[x], successor[y]), - (x, y) -> { - final int t = successor[x]; - successor[x] = successor[y]; - successor[y] = t; - final Label l = label[x]; - label[x] = label[y]; - label[y] = l; - }); - } - catch(final IOException e) { - throw new RuntimeException(e); - } - - outdegree = d; - return last; - } - - @Override - public int[] successorArray() { - if (last == -1) throw new IllegalStateException(); - return successor; - } - - @SuppressWarnings("deprecation") - @Override - protected void finalize() throws Throwable { - try { - for(final InputBitStream ibs: batchIbs) if (ibs != null) ibs.close(); - for(final InputBitStream ibs: labelInputBitStream) if (ibs != null) ibs.close(); - } - finally { - super.finalize(); - } - } - - @Override - public LabelledArcIterator successors() { - if (last == -1) throw new IllegalStateException(); - return new LabelledArcIterator() { - int last = -1; - - @Override - public Label label() { - return label[last]; - } - - @Override - public int nextInt() { - if (last + 1 == outdegree) return -1; - return successor[++last]; - } - - @Override - public int skip(final int k) { - final int toSkip = Math.min(k, outdegree - last - 1); - last += toSkip; - return toSkip; - } - }; - } - - - @Override - public ArcLabelledNodeIterator copy(final int upperBound) { - try { - if (last == -1) return new InternalArcLabelledNodeIterator(upperBound); - else return new InternalArcLabelledNodeIterator(upperBound, batchIbs, labelInputBitStream, - refArray.clone(), prevTarget.clone(), inputStreamLength.clone(), last, outdegree, Arrays.copyOf(successor, outdegree), Arrays.copyOf(label, outdegree)); - } - catch (final IOException e) { - throw new RuntimeException(e); - } - } - } - - - @Override - public ArcLabelledNodeIterator nodeIterator() { - try { - return new InternalArcLabelledNodeIterator(Integer.MAX_VALUE); - } - catch (final IOException e) { - throw new RuntimeException(e); - } - } - - @SuppressWarnings("deprecation") - @Override - protected void finalize() throws Throwable { - try { - for(final File f : batches) f.delete(); - for(final File f : labelBatches) f.delete(); - } - finally { - super.finalize(); - } - } - @Override - public Label prototype() { - return prototype; - } - - }; + // We don't need a merge strategy because a transposition never introduces duplicates + return new ArcLabelledBatchGraph(n, numArcs, batches, labelBatches, prototype, null); } - /** Returns an immutable graph obtained by reversing all arcs in g. * *

This method can process {@linkplain ImmutableGraph#loadOffline(CharSequence) offline graphs}. @@ -2602,8 +2687,8 @@ public static void main(final String args[]) throws IOException, IllegalArgument "transposeOffline sourceBasename destBasename [batchSize] [tempDir]\n" + "symmetrize sourceBasename [transposeBasename] destBasename\n" + "symmetrizeOffline sourceBasename destBasename [batchSize] [tempDir]\n" + - "simplifyOffline sourceBasename destBasename [batchSize] [tempDir]\n" + - "simplify sourceBasename transposeBasename destBasename\n" + + "simplifyOffline sourceBasename destBasename [batchSize] [tempDir]\n" + + "simplify sourceBasename transposeBasename destBasename\n" + "union source1Basename source2Basename destBasename [strategy]\n" + "compose source1Basename source2Basename destBasename [semiring]\n" + "gray sourceBasename destBasename\n" + @@ -2630,8 +2715,8 @@ public static void main(final String args[]) throws IOException, IllegalArgument new Switch("ascii", 'a', "ascii", "Maps are in ASCII form (one integer per line)."), new UnflaggedOption("transform", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.REQUIRED, JSAP.NOT_GREEDY, "The transformation to be applied."), new UnflaggedOption("param", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.REQUIRED, JSAP.GREEDY, "The remaining parameters."), - } - ); + } + ); final JSAPResult jsapResult = jsap.parse(args); if (jsap.messagePrinted()) System.exit(1); diff --git a/src/it/unimi/dsi/webgraph/labelling/BitStreamArcLabelledImmutableGraph.java b/src/it/unimi/dsi/webgraph/labelling/BitStreamArcLabelledImmutableGraph.java index 1dd548a..6261ff6 100644 --- a/src/it/unimi/dsi/webgraph/labelling/BitStreamArcLabelledImmutableGraph.java +++ b/src/it/unimi/dsi/webgraph/labelling/BitStreamArcLabelledImmutableGraph.java @@ -25,9 +25,23 @@ import java.io.PrintWriter; import java.nio.channels.FileChannel; import java.util.Properties; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.martiansoftware.jsap.FlaggedOption; +import com.martiansoftware.jsap.JSAP; +import com.martiansoftware.jsap.JSAPException; +import com.martiansoftware.jsap.JSAPResult; +import com.martiansoftware.jsap.Parameter; +import com.martiansoftware.jsap.SimpleJSAP; +import com.martiansoftware.jsap.Switch; +import com.martiansoftware.jsap.UnflaggedOption; import it.unimi.dsi.fastutil.io.BinIO; import it.unimi.dsi.fastutil.io.FastMultiByteArrayInputStream; +import it.unimi.dsi.fastutil.longs.LongBigList; import it.unimi.dsi.fastutil.longs.LongIterator; import it.unimi.dsi.fastutil.objects.ObjectArrays; import it.unimi.dsi.io.ByteBufferInputStream; @@ -35,6 +49,7 @@ import it.unimi.dsi.io.OutputBitStream; import it.unimi.dsi.lang.ObjectParser; import it.unimi.dsi.logging.ProgressLogger; +import it.unimi.dsi.sux4j.util.EliasFanoMonotoneBigLongBigList; import it.unimi.dsi.sux4j.util.EliasFanoMonotoneLongBigList; import it.unimi.dsi.webgraph.AbstractLazyIntIterator; import it.unimi.dsi.webgraph.BVGraph; @@ -119,10 +134,13 @@ */ public class BitStreamArcLabelledImmutableGraph extends ArcLabelledImmutableGraph { + private static final Logger LOGGER = LoggerFactory.getLogger(BitStreamArcLabelledImmutableGraph.class); /** The standard extension for the labels bit stream. */ public static final String LABELS_EXTENSION = ".labels"; /** The standard extension for the label offsets bit stream. */ public static final String LABEL_OFFSETS_EXTENSION = ".labeloffsets"; + /** The standard extension for the cached {@link LongBigList} containing the label offsets. */ + public static final String LABEL_OFFSETS_BIG_LIST_EXTENSION = ".labelobl"; /** The standard property key for a label specification. */ public static final String LABELSPEC_PROPERTY_KEY = "labelspec"; @@ -145,7 +163,7 @@ public class BitStreamArcLabelledImmutableGraph extends ArcLabelledImmutableGrap /** The basename of this graph (required for offline access). */ protected final CharSequence basename; /** The offset array, or null for sequential access. */ - protected final EliasFanoMonotoneLongBigList offset; + protected final LongBigList offset; /** * Builds a new labelled graph using a bit stream of labels. @@ -161,7 +179,7 @@ public class BitStreamArcLabelledImmutableGraph extends ArcLabelledImmutableGrap * null, this memory-mapped stream is used as the bit stream of labels. * @param offset the offset array for random access, or null. */ - protected BitStreamArcLabelledImmutableGraph(final CharSequence basename, final ImmutableGraph g, final Label prototype, final byte[] byteArray, final FastMultiByteArrayInputStream labelStream, final ByteBufferInputStream mappedLabelStream, final EliasFanoMonotoneLongBigList offset) { + protected BitStreamArcLabelledImmutableGraph(final CharSequence basename, final ImmutableGraph g, final Label prototype, final byte[] byteArray, final FastMultiByteArrayInputStream labelStream, final ByteBufferInputStream mappedLabelStream, final LongBigList offset) { this.g = g; this.byteArray = byteArray; this.labelStream = labelStream; @@ -311,6 +329,50 @@ public static BitStreamArcLabelledImmutableGraph load(final CharSequence basenam return load(LoadMethod.STANDARD, basename, pl); } + /** An iterator returning the label offsets by reading γ-encoded gaps. */ + public final static class LabelOffsetsLongIterator implements LongIterator { + private final InputBitStream offsetStream; + private final long n; + private long off; + private long i; + + public LabelOffsetsLongIterator(final long n, final InputBitStream offsetIbs) { + this.offsetStream = offsetIbs; + this.n = n; + } + + @Override + public boolean hasNext() { + return i <= n; + } + + @Override + public long nextLong() { + i++; + try { + return off = offsetStream.readLongGamma() + off; + } catch (final IOException e) { + throw new RuntimeException(e); + } + } + } + + /** + * Returns an appropriate Elias–Fano {@link LongBigList} for the given number of nodes and + * label file size. + * + * @param numNodes the number of nodes of the graph. + * @param size the size of the label file in bytes. + * @param labelOffsetsIbs the label offsets input stream. + * @return an appropriate Elias–Fano {@link LongBigList} (either an + * {@link EliasFanoMonotoneLongBigList} or an {@link EliasFanoMonotoneBigLongBigList}). + */ + public static LongBigList fitEliasFano(final long numNodes, final long size, final InputBitStream labelOffsetsIbs) { + return EliasFanoMonotoneLongBigList.fits(numNodes + 1, size * Byte.SIZE + 1) + ? new EliasFanoMonotoneLongBigList(numNodes + 1, size * Byte.SIZE + 1, new LabelOffsetsLongIterator(numNodes, labelOffsetsIbs)) + : new EliasFanoMonotoneBigLongBigList(numNodes + 1, size * Byte.SIZE + 1, new LabelOffsetsLongIterator(numNodes, labelOffsetsIbs)); + } + /** Loads a labelled graph using the given method. * * @param method a load method. @@ -364,7 +426,7 @@ protected static BitStreamArcLabelledImmutableGraph load(final LoadMethod method byte[] byteArray = null; FastMultiByteArrayInputStream labelStream = null; ByteBufferInputStream mappedLabelStream = null; - EliasFanoMonotoneLongBigList offsets = null; + LongBigList offsets = null; if (method != LoadMethod.OFFLINE) { if (pl != null) { @@ -392,33 +454,27 @@ protected static BitStreamArcLabelledImmutableGraph load(final LoadMethod method pl.expectedUpdates = g.numNodes() + 1; pl.start("Loading label offsets..."); } - final InputBitStream offsetStream = new InputBitStream(basename + LABEL_OFFSETS_EXTENSION); - - offsets = new EliasFanoMonotoneLongBigList(g.numNodes() + 1, size * Byte.SIZE + 1, new LongIterator() { - private long off; - private int i; - - @Override - public boolean hasNext() { - return i <= g.numNodes(); + final File offsetsBigListFile = new File(basename + LABEL_OFFSETS_BIG_LIST_EXTENSION); + if (offsetsBigListFile.exists()) { + try { + offsets = (LongBigList)BinIO.loadObject(offsetsBigListFile); } - @Override - public long nextLong() { - i++; - try { - return off = offsetStream.readLongGamma() + off; - } - catch (final IOException e) { - throw new RuntimeException(e); + catch (final ClassNotFoundException e) { + if (pl != null) { + LOGGER.warn("A cached long big list of offsets was found, but its class is unknown", e); } } - }); - - offsetStream.close(); + } + if (offsets == null) { + final InputBitStream offsetStream = new InputBitStream(basename + LABEL_OFFSETS_EXTENSION); + offsets = fitEliasFano(g.numNodes(), size, offsetStream); + offsetStream.close(); + } if (pl != null) { pl.count = g.numNodes() + 1; pl.done(); - pl.logger().info("Label pointer bits per node: " + offsets.numBits() / (g.numNodes() + 1.0)); + final long offsetsNumBits = (offsets instanceof EliasFanoMonotoneLongBigList) ? ((EliasFanoMonotoneLongBigList)offsets).numBits() : ((EliasFanoMonotoneBigLongBigList)offsets).numBits(); + pl.logger().info("Label pointer bits per node: " + offsetsNumBits / (g.numNodes() + 1.0)); } } @@ -637,4 +693,46 @@ public static void saveProperties(final Label prototype, final CharSequence base properties.println(BitStreamArcLabelledImmutableGraph.LABELSPEC_PROPERTY_KEY + " = " + prototype.toSpec()); properties.close(); } + + /** + * Reads an arc-labelled immutable graph and stores it as a + * {@link BitStreamArcLabelledImmutableGraph}. + */ + public static void main(final String[] args) throws JSAPException, IOException { + final SimpleJSAP jsap = new SimpleJSAP(BVGraph.class.getName(), "Write an ArcLabelledGraph as a BitStreamArcLabelledImmutableGraph. Source and destination are basenames from which suitable filenames will be stemmed.", new Parameter[] { + new Switch("list", 'L', "list", "Precomputes an Elias-Fano list of offsets for the source labels."), + new FlaggedOption("underlyingBasename", JSAP.STRING_PARSER, null, JSAP.NOT_REQUIRED, 'u', "underlying", "The basename of the underlying graph"), + new UnflaggedOption("sourceBasename", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.REQUIRED, JSAP.NOT_GREEDY, "The basename of the source graph, or a source spec if --spec was given; it is immaterial when --once is specified."), + new UnflaggedOption("destBasename", JSAP.STRING_PARSER, JSAP.NO_DEFAULT, JSAP.NOT_REQUIRED, JSAP.NOT_GREEDY, "The basename of the destination graph; if omitted, no recompression is performed. This is useful in conjunction with --offsets and --list."), }); + + final JSAPResult jsapResult = jsap.parse(args); + if (jsap.messagePrinted()) System.exit(1); + + final boolean list = jsapResult.getBoolean("list"); + final String source = jsapResult.getString("sourceBasename"); + final String dest = jsapResult.getString("destBasename"); + final String underlying = jsapResult.getString("underlyingBasename"); + + final ProgressLogger pl = new ProgressLogger(LOGGER, 10, TimeUnit.SECONDS); + final ArcLabelledImmutableGraph graph = ArcLabelledImmutableGraph.loadOffline(source, pl); + + if (dest != null) { + if (list) throw new IllegalArgumentException("You cannot specify a destination graph with these options"); + if (underlying == null) throw new IllegalArgumentException("You must specify an underlying graph with --underlying if you want to store a BitStreamArcLabelledImmutableGraph"); + BitStreamArcLabelledImmutableGraph.store(graph, dest, underlying, pl); + } else { + if (list) { + final FileInputStream fis = new FileInputStream(source + LABELS_EXTENSION); + final long size = fis.getChannel().size(); + final ImmutableGraph g = ImmutableGraph.loadOffline(source, pl); + final InputBitStream offsetStream = new InputBitStream(source + LABEL_OFFSETS_EXTENSION); + final LongBigList offsets = fitEliasFano(g.numNodes(), size, offsetStream); + offsetStream.close(); + fis.close(); + BinIO.storeObject(offsets, g.basename() + LABEL_OFFSETS_BIG_LIST_EXTENSION); + } else { + throw new IllegalArgumentException("You must specify a destination graph."); + } + } + } } diff --git a/src/it/unimi/dsi/webgraph/labelling/ScatteredLabelledArcsASCIIGraph.java b/src/it/unimi/dsi/webgraph/labelling/ScatteredLabelledArcsASCIIGraph.java new file mode 100644 index 0000000..80a1a29 --- /dev/null +++ b/src/it/unimi/dsi/webgraph/labelling/ScatteredLabelledArcsASCIIGraph.java @@ -0,0 +1,941 @@ +/* + * Copyright (C) 2011-2023 Sebastiano Vigna + * + * This program and the accompanying materials are made available under the + * terms of the GNU Lesser General Public License v2.1 or later, + * which is available at + * http://www.gnu.org/licenses/old-licenses/lgpl-2.1-standalone.html, + * or the Apache Software License 2.0, which is available at + * https://www.apache.org/licenses/LICENSE-2.0. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. + * + * SPDX-License-Identifier: LGPL-2.1-or-later OR Apache-2.0 + */ + +package it.unimi.dsi.webgraph.labelling; + +import com.martiansoftware.jsap.*; +import it.unimi.dsi.Util; +import it.unimi.dsi.fastutil.bytes.ByteArrays; +import it.unimi.dsi.fastutil.io.BinIO; +import it.unimi.dsi.fastutil.io.FastBufferedInputStream; +import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream; +import it.unimi.dsi.fastutil.longs.Long2IntFunction; +import it.unimi.dsi.fastutil.objects.Object2IntFunction; +import it.unimi.dsi.fastutil.objects.Object2LongFunction; +import it.unimi.dsi.fastutil.objects.ObjectArrayList; +import it.unimi.dsi.io.InputBitStream; +import it.unimi.dsi.io.OutputBitStream; +import it.unimi.dsi.lang.MutableString; +import it.unimi.dsi.logging.ProgressLogger; +import it.unimi.dsi.sux4j.mph.GOV3Function; +import it.unimi.dsi.webgraph.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import java.util.concurrent.TimeUnit; +import java.util.zip.GZIPInputStream; + +import static it.unimi.dsi.webgraph.Transform.processTransposeBatch; +import static it.unimi.dsi.webgraph.labelling.ArcLabelledImmutableGraph.UNDERLYINGGRAPH_SUFFIX; + +/** + * An {@link ArcLabelledImmutableGraph} that corresponds to a labelled graph stored as a scattered list of arcs. + * + *

+ * A scattered list of arcs describes a graph in a fairly loose way. Each line contains a + * labelled arc specified as two node identifiers and a label separated by whitespace (but we suggest exactly one TAB + * character). + * + *

+ * In the standard description, node identifiers can be in the range + * [-263..263): they will be remapped in a compact identifier space by + * assigning to each newly seen identifier a new node number. The list of identifiers in order of + * appearance is available in {@link #ids}. Lines can be empty, or comments starting with + * #. Characters following the target will be discarded with a warning. + * Similarly, the labels can be in the range [-263..263) and will be saved + * as-is in gamma coding, in case of duplicates only the last new label will be considered, + * this behaviour can be changed by providing more parameters. + * + *

+ * Warning: Lines not conforming the above specification will cause an error to be + * logged, but will be otherwise ignored. + * + *

+ * Alternatively, you can + * {@linkplain #ScatteredLabelledArcsASCIIGraph(InputStream, Object2LongFunction, Charset, int, boolean) + * provide} an {@link Object2LongFunction Object2LongFunction<String>} with default return value + * -1 that will be used to map identifiers to node numbers, along with a {@link Charset} to parse + * lines and the number of nodes of the graph (which must be a strict upper bound for the largest + * value returned by the function). Note that in principle an {@link Object2IntFunction} would be + * sufficient, but we want to make easier using functions from Sux4J such as {@link GOV3Function}. + * + *

+ * Additionally, the resulting graph can be symmetrized, and its loops be removed, using + * {@linkplain #ScatteredLabelledArcsASCIIGraph(InputStream, boolean, boolean, int, File, ProgressLogger) + * suitable constructor options}. + * + *

+ * You can provide {@linkplain #ScatteredLabelledArcsASCIIGraph(InputStream, labelPrototype, labelMapping, labelMergeStrategy) + * suitable constructor options} a {@link Label} as prototype, a {@link LabelMapping} as a way to + * convert the written labels to object of the prototype's type and a {@link LabelMergeStrategy} + * to handle the case of identical arcs with different labels. + * + *

+ * This class has no load method, and its main method converts a scattered-arcs representation + * directly into a {@link BVGraph}. + * + *

Using {@link ScatteredLabelledArcsASCIIGraph} to convert your data

+ * + *

+ * A simple (albeit rather inefficient) way to import data into WebGraph is using ASCII graphs + * specified by scattered arcs. Suppose you create the following file, named + * example.arcs: + * + *

+ *  # My graph
+ *  -1 15 100
+ *  15 2 200
+ *  2 -1 300 This will cause a warning to be logged
+ *  OOPS! (This will cause an error to be logged)
+ *  -1 2 400
+ * 
+ * + * Then, the command + * + *
+ *  java it.unimi.dsi.webgraph.ScatteredLabelledArcsASCIIGraph example < example.arcs
+ * 
+ * + * will produce a compressed labelled graph in {@link it.unimi.dsi.webgraph.BVGraph} format. + * The underlying graph will be saved with basename example-underlying. + * The file example.ids will contain the list of longs -1, 15, 2. + * The node with identifer -1 will be the node 0 in the output graph, the node with identifier + * 15 will be node 1, and the node with identifier 2 will be node 2. The graph example + * will thus have three nodes and four arcs (viz., <0,1>, <0,2>, <1,2> and + * <2,0>). The labels will be saved as example.labels in the order of visit + * of the arcs, the offset example.labeloffsets relay the offset of each specific label, + * because in general labels are not written in a fixed number of bits. + * + *

Memory requirements

+ * + *

+ * To convert node identifiers to node numbers, instances of this class use a custom map that in the + * worst case will require + * 19.5×2⌈log(4n/3)⌉ ≤ 52n bytes, + * where n is the number of distinct identifiers. Storing batches of arcs in memory + * requires 8 bytes per arc. + */ + +public class ScatteredLabelledArcsASCIIGraph extends ImmutableSequentialGraph { + /** + * The default batch size. + */ + public static final int DEFAULT_BATCH_SIZE = 1000000; + /** + * The default label prototype. + */ + public static final Label DEFAULT_LABEL_PROTOTYPE = new GammaCodedIntLabel("FOO"); + /** + * The default label mapping function. + */ + public static final LabelMapping DEFAULT_LABEL_MAPPING = (label, st) -> ((GammaCodedIntLabel) label).value = Integer.parseInt((String) st); + + private static final Logger LOGGER = LoggerFactory.getLogger(ScatteredLabelledArcsASCIIGraph.class); + private final static boolean DEBUG = false; + + /** + * The extension of the identifier file (a binary list of longs). + */ + private static final String IDS_EXTENSION = ".ids"; + /** + * The labelled batch graph used to return node iterators. + */ + private final Transform.ArcLabelledBatchGraph arcLabelledBatchGraph; + /** + * The list of identifiers in order of appearance. + */ + public long[] ids; + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a standard scattered list of arcs. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is) throws IOException { + this(is, DEFAULT_LABEL_PROTOTYPE, DEFAULT_LABEL_MAPPING, null, false); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a standard scattered list of arcs. + * @param labelPrototype an example of the labels contained in the graph. + * @param labelMapping a function mapping string into the label defined by the prototype. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Label labelPrototype, final LabelMapping labelMapping) throws IOException { + this(is, labelPrototype, labelMapping, null, false); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a standard scattered list of arcs. + * @param labelPrototype an example of the labels contained in the graph. + * @param labelMapping a function mapping string into the label defined by the prototype. + * @param labelMergeStrategy a merge strategy to apply when encountering duplicate arcs with different labels. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Label labelPrototype, final LabelMapping labelMapping, final LabelMergeStrategy labelMergeStrategy) throws IOException { + this(is, labelPrototype, labelMapping, labelMergeStrategy, false); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a standard scattered list of arcs. + * @param labelPrototype an example of the labels contained in the graph. + * @param labelMapping a function mapping string into the label defined by the prototype. + * @param labelMergeStrategy a merge strategy to apply when encountering duplicate arcs with different labels. + * @param symmetrize the new graph will be forced to be symmetric. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Label labelPrototype, final LabelMapping labelMapping, final LabelMergeStrategy labelMergeStrategy, final boolean symmetrize) throws IOException { + this(is, labelPrototype, labelMapping, labelMergeStrategy, symmetrize, false); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a standard scattered list of arcs. + * @param labelPrototype an example of the labels contained in the graph. + * @param labelMapping a function mapping string into the label defined by the prototype. + * @param labelMergeStrategy a merge strategy to apply when encountering duplicate arcs with different labels. + * @param symmetrize the new graph will be forced to be symmetric. + * @param noLoops the new graph will have no loops. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Label labelPrototype, final LabelMapping labelMapping, final LabelMergeStrategy labelMergeStrategy, final boolean symmetrize, final boolean noLoops) throws IOException { + this(is, labelPrototype, labelMapping, labelMergeStrategy, symmetrize, noLoops, DEFAULT_BATCH_SIZE); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a standard scattered list of arcs. + * @param labelPrototype an example of the labels contained in the graph. + * @param labelMapping a function mapping string into the label defined by the prototype. + * @param labelMergeStrategy a merge strategy to apply when encountering duplicate arcs with different labels. + * @param symmetrize the new graph will be forced to be symmetric. + * @param noLoops the new graph will have no loops. + * @param batchSize the number of integers in a batch; two arrays of integers of this size will be allocated by + * this method. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Label labelPrototype, final LabelMapping labelMapping, final LabelMergeStrategy labelMergeStrategy, final boolean symmetrize, final boolean noLoops, final int batchSize) throws IOException { + this(is, labelPrototype, labelMapping, labelMergeStrategy, symmetrize, noLoops, batchSize, null); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a standard scattered list of arcs. + * @param labelPrototype an example of the labels contained in the graph. + * @param labelMapping a function mapping string into the label defined by the prototype. + * @param labelMergeStrategy a merge strategy to apply when encountering duplicate arcs with different labels. + * @param symmetrize the new graph will be forced to be symmetric. + * @param noLoops the new graph will have no loops. + * @param batchSize the number of integers in a batch; two arrays of integers of this size will be allocated by + * this method. + * @param tempDir a temporary directory for the batches, or null for + * {@link File#createTempFile(java.lang.String, java.lang.String)}'s choice. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Label labelPrototype, final LabelMapping labelMapping, final LabelMergeStrategy labelMergeStrategy, final boolean symmetrize, final boolean noLoops, final int batchSize, final File tempDir) throws IOException { + this(is, labelPrototype, labelMapping, labelMergeStrategy, symmetrize, noLoops, batchSize, tempDir, null); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a standard scattered list of arcs. + * @param symmetrize the new graph will be forced to be symmetric. + * @param noLoops the new graph will have no loops. + * @param batchSize the number of integers in a batch; two arrays of integers of this size will be allocated by + * this method. + * @param tempDir a temporary directory for the batches, or null for + * {@link File#createTempFile(String, String)}'s choice. + * @param pl a progress logger, or null. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final boolean symmetrize, final boolean noLoops, final int batchSize, final File tempDir, final ProgressLogger pl) throws IOException { + this(is, null, null, -1, DEFAULT_LABEL_PROTOTYPE, DEFAULT_LABEL_MAPPING, null, symmetrize, noLoops, batchSize, tempDir, pl); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a standard scattered list of arcs. + * @param labelPrototype an example of the labels contained in the graph. + * @param labelMapping a function mapping string into the label defined by the prototype. + * @param labelMergeStrategy a merge strategy to apply when encountering duplicate arcs with different labels. + * @param symmetrize the new graph will be forced to be symmetric. + * @param noLoops the new graph will have no loops. + * @param batchSize the number of integers in a batch; two arrays of integers of this size will be allocated by + * this method. + * @param tempDir a temporary directory for the batches, or null for + * {@link File#createTempFile(java.lang.String, java.lang.String)}'s choice. + * @param pl a progress logger, or null. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Label labelPrototype, final LabelMapping labelMapping, final LabelMergeStrategy labelMergeStrategy, final boolean symmetrize, final boolean noLoops, final int batchSize, final File tempDir, final ProgressLogger pl) throws IOException { + this(is, null, null, -1, labelPrototype, labelMapping, labelMergeStrategy, symmetrize, noLoops, batchSize, tempDir, pl); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a scattered list of arcs. + * @param function an explicitly provided function from string representing nodes to node numbers, or + * null for the standard behaviour. + * @param charset a character set that will be used to read the identifiers passed to function, or + * null for ISO-8859-1 (used only if function is not null). + * @param n the number of nodes of the graph (used only if function is not null). + * @param labelPrototype an example of the labels contained in the graph. + * @param labelMapping a function mapping string into the label defined by the prototype. + * @param labelMergeStrategy a merge strategy to apply when encountering duplicate arcs with different labels. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Object2LongFunction function, final Charset charset, final int n, final Label labelPrototype, final LabelMapping labelMapping, final LabelMergeStrategy labelMergeStrategy) throws IOException { + this(is, function, charset, n, labelPrototype, labelMapping, labelMergeStrategy, false); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a scattered list of arcs. + * @param function an explicitly provided function from string representing nodes to node numbers, or + * null for the standard behaviour. + * @param charset a character set that will be used to read the identifiers passed to function, or + * null for ISO-8859-1 (used only if function is not null). + * @param n the number of nodes of the graph (used only if function is not null). + * @param symmetrize the new graph will be forced to be symmetric. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Object2LongFunction function, final Charset charset, final int n, final boolean symmetrize) throws IOException { + this(is, function, charset, n, DEFAULT_LABEL_PROTOTYPE, DEFAULT_LABEL_MAPPING, null, symmetrize, false); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a scattered list of arcs. + * @param function an explicitly provided function from string representing nodes to node numbers, or + * null for the standard behaviour. + * @param charset a character set that will be used to read the identifiers passed to function, or + * null for ISO-8859-1 (used only if function is not null). + * @param n the number of nodes of the graph (used only if function is not null). + * @param labelPrototype an example of the labels contained in the graph. + * @param labelMapping a function mapping string into the label defined by the prototype. + * @param labelMergeStrategy a merge strategy to apply when encountering duplicate arcs with different labels. + * @param symmetrize the new graph will be forced to be symmetric. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Object2LongFunction function, final Charset charset, final int n, final Label labelPrototype, final LabelMapping labelMapping, final LabelMergeStrategy labelMergeStrategy, final boolean symmetrize) throws IOException { + this(is, function, charset, n, labelPrototype, labelMapping, labelMergeStrategy, symmetrize, false); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a scattered list of arcs. + * @param function an explicitly provided function from string representing nodes to node numbers, or + * null for the standard behaviour. + * @param charset a character set that will be used to read the identifiers passed to function, or + * null for ISO-8859-1 (used only if function is not null). + * @param n the number of nodes of the graph (used only if function is not null). + * @param labelPrototype an example of the labels contained in the graph. + * @param labelMapping a function mapping string into the label defined by the prototype. + * @param labelMergeStrategy a merge strategy to apply when encountering duplicate arcs with different labels. + * @param symmetrize the new graph will be forced to be symmetric. + * @param noLoops the new graph will have no loops. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Object2LongFunction function, final Charset charset, final int n, final Label labelPrototype, final LabelMapping labelMapping, final LabelMergeStrategy labelMergeStrategy, final boolean symmetrize, final boolean noLoops) throws IOException { + this(is, function, charset, n, labelPrototype, labelMapping, labelMergeStrategy, symmetrize, noLoops, DEFAULT_BATCH_SIZE); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a scattered list of arcs. + * @param function an explicitly provided function from string representing nodes to node numbers, or + * null for the standard behaviour. + * @param charset a character set that will be used to read the identifiers passed to function, or + * null for ISO-8859-1 (used only if function is not null). + * @param n the number of nodes of the graph (used only if function is not null). + * @param labelPrototype an example of the labels contained in the graph. + * @param labelMapping a function mapping string into the label defined by the prototype. + * @param labelMergeStrategy a merge strategy to apply when encountering duplicate arcs with different labels. + * @param symmetrize the new graph will be forced to be symmetric. + * @param noLoops the new graph will have no loops. + * @param batchSize the number of integers in a batch; two arrays of integers of this size will be allocated by + * this method. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Object2LongFunction function, final Charset charset, final int n, final Label labelPrototype, final LabelMapping labelMapping, final LabelMergeStrategy labelMergeStrategy, final boolean symmetrize, final boolean noLoops, final int batchSize) throws IOException { + this(is, function, charset, n, labelPrototype, labelMapping, labelMergeStrategy, symmetrize, noLoops, batchSize, null); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a scattered list of arcs. + * @param function an explicitly provided function from string representing nodes to node numbers, or + * null for the standard behaviour. + * @param charset a character set that will be used to read the identifiers passed to function, or + * null for ISO-8859-1 (used only if function is not null). + * @param n the number of nodes of the graph (used only if function is not null). + * @param labelPrototype an example of the labels contained in the graph. + * @param labelMapping a function mapping string into the label defined by the prototype. + * @param labelMergeStrategy a merge strategy to apply when encountering duplicate arcs with different labels. + * @param symmetrize the new graph will be forced to be symmetric. + * @param noLoops the new graph will have no loops. + * @param batchSize the number of integers in a batch; two arrays of integers of this size will be allocated by + * this method. + * @param tempDir a temporary directory for the batches, or null for + * {@link File#createTempFile(String, String)}'s choice. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Object2LongFunction function, final Charset charset, final int n, final Label labelPrototype, final LabelMapping labelMapping, final LabelMergeStrategy labelMergeStrategy, final boolean symmetrize, final boolean noLoops, final int batchSize, final File tempDir) throws IOException { + this(is, function, charset, n, labelPrototype, labelMapping, labelMergeStrategy, symmetrize, noLoops, batchSize, tempDir, null); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param is an input stream containing a scattered list of arcs. + * @param function an explicitly provided function from string representing nodes to node numbers, or + * null for the standard behaviour. + * @param charset a character set that will be used to read the identifiers passed to function, or + * null for ISO-8859-1 (used only if function is not null). + * @param n the number of nodes of the graph (used only if function is not null). + * @param labelPrototype an example of the labels contained in the graph. + * @param labelMapping a function mapping string into the label defined by the prototype. + * @param labelMergeStrategy a merge strategy to apply when encountering duplicate arcs with different labels. + * @param symmetrize the new graph will be forced to be symmetric. + * @param noLoops the new graph will have no loops. + * @param batchSize the number of integers in a batch; two arrays of integers of this size will be allocated by + * this method. + * @param tempDir a temporary directory for the batches, or null for + * {@link File#createTempFile(String, String)}'s choice. + * @param pl a progress logger, or null. + */ + public ScatteredLabelledArcsASCIIGraph(final InputStream is, final Object2LongFunction function, Charset charset, final int n, final Label labelPrototype, final LabelMapping labelMapping, final LabelMergeStrategy labelMergeStrategy, final boolean symmetrize, final boolean noLoops, final int batchSize, final File tempDir, final ProgressLogger pl) throws IOException { + @SuppressWarnings("resource") + final FastBufferedInputStream fbis = new FastBufferedInputStream(is); + ScatteredArcsASCIIGraph.Id2NodeMap map = new ScatteredArcsASCIIGraph.Id2NodeMap(); + + int numNodes = -1; + if (charset == null) charset = StandardCharsets.ISO_8859_1; + + int j; + int[] source = new int[batchSize], target = new int[batchSize]; + long[] labelStart = new long[batchSize]; + FastByteArrayOutputStream fbos = new FastByteArrayOutputStream(); + OutputBitStream obs = new OutputBitStream(fbos); + final ObjectArrayList batches = new ObjectArrayList<>(), labelBatches = new ObjectArrayList<>(); + final Label prototype = labelPrototype.copy(); + + if (pl != null) { + pl.itemsName = "labelled arcs"; + pl.start("Creating sorted batches..."); + } + + j = 0; + long pairs = 0; // Number of pairs + byte[] array = new byte[1024]; + for (long line = 1; ; line++) { + int start = 0, len; + while ((len = fbis.readLine(array, start, array.length - start, FastBufferedInputStream.ALL_TERMINATORS)) == array.length - start) { + start += len; + array = ByteArrays.grow(array, array.length + 1); + } + + if (len == -1) break; // EOF + + final int lineLength = start + len; + + if (DEBUG) + System.err.println("Reading line " + line + "... (" + new String(array, 0, lineLength, charset) + ")"); + + // Skip whitespace at the start of the line. + int offset = 0; + while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ') offset++; + + if (offset == lineLength) { + if (DEBUG) System.err.println("Skipping line " + line + "..."); + continue; // Whitespace line + } + + if (array[0] == '#') continue; + + // Scan source id. + start = offset; + while (offset < lineLength && (array[offset] < 0 || array[offset] > ' ')) offset++; + + int s; + + if (function == null) { + final long sl; + try { + sl = getLong(array, start, offset - start); + } catch (final RuntimeException e) { + // Discard up to the end of line + LOGGER.error("Error at line " + line + ": " + e.getMessage()); + continue; + } + + s = map.getNode(sl); + + if (DEBUG) System.err.println("Parsed source at line " + line + ": " + sl + " => " + s); + } else { + final String ss = new String(array, start, offset - start, charset); + final long sl = function.getLong(ss); + if (sl == -1) { + LOGGER.warn("Unknown source identifier " + ss + " at line " + line); + continue; + } + if (sl < 0 || sl >= n) + throw new IllegalArgumentException("Source node number out of range for node " + ss + ": " + sl); + s = (int)sl; + if (DEBUG) System.err.println("Parsed target at line " + line + ": " + ss + " => " + s); + } + + // Skip whitespace between identifiers. + while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ') offset++; + + if (offset == lineLength) { + LOGGER.error("Error at line " + line + ": no target"); + continue; + } + + // Scan target id. + start = offset; + while (offset < lineLength && (array[offset] < 0 || array[offset] > ' ')) offset++; + + int t; + + if (function == null) { + final long tl; + try { + tl = getLong(array, start, offset - start); + } catch (final RuntimeException e) { + // Discard up to the end of line + LOGGER.error("Error at line " + line + ": " + e.getMessage()); + continue; + } + + t = map.getNode(tl); + + if (DEBUG) System.err.println("Parsed target at line " + line + ": " + tl + " => " + t); + } else { + final String ts = new String(array, start, offset - start, charset); + final long tl = function.getLong(ts); + if (tl == -1) { + LOGGER.warn("Unknown target identifier " + ts + " at line " + line); + continue; + } + + if (tl < 0 || tl >= n) + throw new IllegalArgumentException("Target node number out of range for node " + ts + ": " + tl); + t = (int)tl; + if (DEBUG) System.err.println("Parsed target at line " + line + ": " + ts + " => " + t); + } + + // Skip whitespace between identifiers. + while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ') offset++; + + if (offset == lineLength) { + LOGGER.error("Error at line " + line + ": no label"); + continue; + } + + // Scan label. + start = offset; + while (offset < lineLength && (array[offset] < 0 || array[offset] > ' ')) offset++; + + final String ls = new String(array, start, offset - start, charset); + + // Insert current value into the prototype label. + labelMapping.apply(prototype, ls); + if (DEBUG) System.err.println("Parsed label at line " + line + ": " + ls + " => " + prototype.get()); + + // Skip whitespace after label. + while (offset < lineLength && array[offset] >= 0 && array[offset] <= ' ') offset++; + + if (offset < lineLength) LOGGER.warn("Trailing characters ignored at line " + line); + + if (DEBUG) + System.err.println("Parsed labelled arc at line " + line + ": " + s + " -> " + t + " (" + prototype.get() + ")"); + + if (s != t || !noLoops) { + source[j] = s; + target[j] = t; + labelStart[j] = obs.writtenBits(); + prototype.toBitStream(obs, s); + j++; + + if (j == batchSize) { + obs.flush(); + pairs += processTransposeBatch(batchSize, source, target, labelStart, new InputBitStream(fbos.array), tempDir, batches, labelBatches, prototype, labelMergeStrategy); + fbos = new FastByteArrayOutputStream(); + obs = new OutputBitStream(fbos); + j = 0; + } + + if (symmetrize && s != t) { + source[j] = t; + target[j] = s; + labelStart[j] = obs.writtenBits(); + prototype.toBitStream(obs, t); + j++; + + if (j == batchSize) { + obs.flush(); + pairs += processTransposeBatch(batchSize, source, target, labelStart, new InputBitStream(fbos.array), tempDir, batches, labelBatches, prototype, labelMergeStrategy); + fbos = new FastByteArrayOutputStream(); + obs = new OutputBitStream(fbos); + j = 0; + } + } + + if (pl != null) pl.lightUpdate(); + } + } + + if (j != 0) { + obs.flush(); + pairs += processTransposeBatch(j, source, target, labelStart, new InputBitStream(fbos.array), tempDir, batches, labelBatches, prototype, labelMergeStrategy); + } + + if (pl != null) { + pl.done(); + logBatches(batches, pairs, pl); + } + + numNodes = function == null ? (int)map.size() : function.size(); + source = null; + target = null; + labelStart = null; + + if (function == null) { + ids = map.getIds(tempDir); + } + + this.arcLabelledBatchGraph = new Transform.ArcLabelledBatchGraph(function == null ? numNodes : n, pairs, batches, labelBatches, prototype, labelMergeStrategy); + } + + /** + * Creates a scattered-arcs ASCII graph. + * + * @param arcs an iterator returning the arcs as two-element arrays. + * @param function a function to map the long ids passed in arcs to int nodes. + * @param n the number of nodes of the graph (used only if function is not null). + * @param arcLabels a homogeneous iterator returning the labels in the same order as the arcs. + * @param labelMergeStrategy a merge strategy to apply when encountering duplicate arcs with different labels. + * @param symmetrize the new graph will be forced to be symmetric. + * @param noLoops the new graph will have no loops. + * @param batchSize the number of integers in a batch; two arrays of integers of this size will be allocated by + * this method. + * @param tempDir a temporary directory for the batches, or null for + * {@link File#createTempFile(String, String)}'s choice. + * @param pl a progress logger, or null. + */ + public ScatteredLabelledArcsASCIIGraph(final Iterator arcs, final Long2IntFunction function, final int n, final Iterator