Skip to content

Commit

Permalink
sparse dataset ++ (FALCONN-LIB#110)
Browse files Browse the repository at this point in the history
* sparse_GT

* [WIP] add sparse datasets to Datasets

* sparse Dataset object

* [WIP] sparse dataset documentation

* cleanup

* fix gunzip flow

* basic sparse baseline and eval

* drop faiss-based intersection

---------

Co-authored-by: Matthijs Douze <[email protected]>
Co-authored-by: Martin Aumueller <[email protected]>
  • Loading branch information
3 people authored May 16, 2023
1 parent f374b4f commit 11a2ff2
Show file tree
Hide file tree
Showing 5 changed files with 441 additions and 0 deletions.
163 changes: 163 additions & 0 deletions benchmark/datasets.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import math
import numpy
import os
import gzip
import random
import sys
import struct
Expand Down Expand Up @@ -550,6 +551,168 @@ def search_type(self):
return "knn"


def strip_gz(filename):
if not filename.endswith('.gz'):
raise RuntimeError(f"expected a filename ending with '.gz'. Received: {filename}")
return filename[:-3]


def gunzip_if_needed(filename):
if filename.endswith('.gz'):
print('unzipping', filename, '...', end=" ")
with gzip.open(filename, 'rb') as f:
file_content = f.read()

with open(strip_gz(filename), 'wb') as f:
f.write(file_content)

os.remove(filename)
print('done.')


class SparseDataset(DatasetCompetitionFormat):
""" the 2023 competition
Sparse vectors for sparse max inner product search
Data is based on MSMARCO passage retrieval data (text passages and queries),
embedded via the SPLADE model.
The class overrides several methods since the sparse format is different than other datasets.
"""

def __init__(self, version="small"):

versions = {"small": (100000, "base_small.csr.gz", "base_small.dev.gt"),
"1M": (1000000, "base_1M.csr.gz", "base_1M.dev.gt"),
"full": (8841823, "base_full.csr.gz", "base_full.dev.gt")}

assert versions.keys().__contains__(
version), f'version="{version}" is invalid. Please choose one of {list(versions.keys())}.'

self.nb = versions[version][0]
self.nq = 6980
self.private_nq = 0 # TBD

self.ds_fn = versions[version][1]
self.qs_fn = "queries.dev.csr.gz"

self.qs_private_fn = "" # TBD

self.base_url = "https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/"
self.basedir = os.path.join(BASEDIR, "sparse")

self.gt_fn = versions[version][2]
self.private_gt = "" # TBD

self.d = np.nan # this is only for compatibility with printing the name of the class

def prepare(self, skip_data=False):
# downloads the datasets and unzips (if necessary).

if not os.path.exists(self.basedir):
os.makedirs(self.basedir)

# start with the small ones...
for fn in [self.qs_fn, self.gt_fn]:
if fn is None:
continue

sourceurl = os.path.join(self.base_url, fn)
outfile = os.path.join(self.basedir, fn)
if outfile.endswith('.gz'):
# check if the unzipped file already exists
if os.path.exists(strip_gz(outfile)):
print("unzipped version of file %s already exists" % outfile)
continue

if os.path.exists(outfile):
print("file %s already exists" % outfile)
gunzip_if_needed(outfile)
continue
download(sourceurl, outfile)
gunzip_if_needed(outfile)
# # private qs url: todo

if skip_data:
return

fn = self.ds_fn
sourceurl = os.path.join(self.base_url, fn)
outfile = os.path.join(self.basedir, fn)
if outfile.endswith('.gz'):
# check if the unzipped file already exists
unzipped_outfile = strip_gz(outfile)
if os.path.exists(unzipped_outfile):
print("unzipped version of file %s already exists" % outfile)
return
if os.path.exists(outfile):
print("file %s already exists" % outfile)
gunzip_if_needed(outfile)
return
download_accelerated(sourceurl, outfile)
gunzip_if_needed(outfile)

def get_dataset_fn(self):
fn = strip_gz(os.path.join(self.basedir, self.ds_fn))
if os.path.exists(fn):
return fn
raise RuntimeError("file not found")

def get_dataset_iterator(self, bs=512, split=(1, 0)):
assert split == (1,0), 'No sharding supported yet.' # todo

filename = self.get_dataset_fn()

x = read_sparse_matrix(filename, do_mmap=True)
assert x.shape[0] == self.nb

for j0 in range(0, self.nb, bs):
j1 = min(j0 + bs, self.nb)
yield x[j0:j1, :]

# i0, i1 = self.nb * rank // nsplit, self.nb * (rank + 1) // nsplit
# x = xbin_mmap(filename, dtype=self.dtype, maxn=self.nb)
# assert x.shape == (self.nb, self.d)

# for j0 in range(i0, i1, bs):
# j1 = min(j0 + bs, i1)
# yield sanitize(x[j0:j1])


def get_groundtruth(self, k=None):
assert self.gt_fn is not None
assert self.search_type() == "knn"

I, D = knn_result_read(os.path.join(self.basedir, self.gt_fn))
assert I.shape[0] == self.nq
if k is not None:
assert k <= 10
I = I[:, :k]
D = D[:, :k]
return I, D

def get_dataset(self):
assert self.nb <= 10 ** 6, "dataset too large, use iterator"
return next(self.get_dataset_iterator(bs=self.nb))

def get_queries(self):
filename = os.path.join(self.basedir, self.qs_fn)
x = read_sparse_matrix(strip_gz(filename), do_mmap=False) # read the queries file. It is a small file, so no need to mmap
assert x.shape[0] == self.nq
return x

def get_private_queries(self):
raise RuntimeError("not implemented yet")

def get_private_groundtruth(self, k=None):
raise RuntimeError("not implemented yet")

def distance(self):
return "ip"

def search_type(self):
return "knn"


class RandomDS(DatasetCompetitionFormat):
def __init__(self, nb, nq, d):
self.nb = nb
Expand Down
85 changes: 85 additions & 0 deletions dataset_preparation/make_sparse_groundtruth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
import argparse
from tqdm import tqdm
import time
import numpy as np
from scipy.sparse import csr_matrix, hstack
from multiprocessing.pool import ThreadPool

from benchmark.dataset_io import usbin_write, read_sparse_matrix

if __name__ == "__main__":
parser = argparse.ArgumentParser()

def aa(*args, **kwargs):
group.add_argument(*args, **kwargs)

group = parser.add_argument_group('File location')
aa('--base_csr_file', required=True, help="location of a .csr file representing the base data")
aa('--query_csr_file', required=True, help="location of a .csr file representing the query data")
aa('--output_file', required=True, help="location of the ground truth file to be generated")

group = parser.add_argument_group('Computation options')
aa('--k', default=10, type=int, help="number of nearest kNN neighbors to search")
# aa("--maxRAM", default=100, type=int, help="set max RSS in GB (avoid OOM crash)")
aa('--nt', type=int, help="# of processes in thread pool. If omitted, then a single thread is used.")

args = parser.parse_args()

print("args:", args)
print('k: ', args.k)

data = read_sparse_matrix(args.base_csr_file)
queries = read_sparse_matrix(args.query_csr_file)
print('data:', data.shape)
print('queries:', queries.shape)

# pad the queries with virtual zeros to match the length of the data files
queries = hstack([queries, csr_matrix((queries.shape[0], data.shape[1] - queries.shape[1]))])
print('after padding: ')
print('data:', data.shape)
print('queries:', queries.shape)

k = args.k
nq = queries.shape[0]

D = np.zeros((nq, k), dtype='float32')
I = -np.ones((nq, k), dtype='int32')


def process_single_row(i):
res = data.dot(queries.getrow(i).transpose())
ra = res.toarray()
top_ind = np.argpartition(ra, -k, axis=0)[-k:][:, 0]

index_and_dot_prod = [(i, ra[i, 0]) for i in top_ind]
index_and_dot_prod.sort(key=lambda a: a[1], reverse=True)

return index_and_dot_prod

start = time.time()
# single thread
if args.nt is None:
print('computing ground truth for', nq, 'queries (single thread):')
res = []
for i in tqdm(range(nq)): # tqdm(range(nq))
res.append(process_single_row(i))
else:
print('computing ground truth for', nq, 'queries (' + str(args.nt) + ' threads):')
with ThreadPool(processes=args.nt) as pool:
# Map the function to the array of items
# res = pool.map(process_single_row, range(nq))
res = list(tqdm(pool.imap(process_single_row, range(nq)), total=nq))

end = time.time()
elapsed = end - start
print(f'Elapsed {elapsed}s for {nq} queries ({nq / elapsed} QPS) ')

# rearrange to match the format of usbin_write:
for i in range(nq):
I[i, :] = [p[0] for p in res[i]]
D[i, :] = [p[1] for p in res[i]]

print()
print("Writing result to", args.output_file)

usbin_write(I, D, args.output_file)
55 changes: 55 additions & 0 deletions dataset_preparation/sparse_dataset.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Sparse dataset for the 2023 ANN challenge

## Goal
This is a dataset of sparse vectors. These are vectors of very high dimension (~30k),
but with a small number of nonzero elements.
A typical example is a way to represent text, where the dimension is the vocabulary,
and the values correspond to the different words in each document / paragraph that is indexed.

## Dataset details

**Dataset**: sparse embedding of the MS-MARCO passage retrieval dataset.
The embeddings are based on a deep learning model called SPLADE (specifically, it is the
SPLADE CoCondenser EnsembleDistil (`naver/splade-cocondenser-ensembledistil`)).

The base dataset contains .8M vectors with average sparsity (# of nonzeros): ~130. All nonzero values are positive.

The common query set (`dev.small`) contains 6980 queries, where the average number of nonzeros is ~49.

Similarity is measured by max dot-product, and the overall retrieval score is Recall@10.
For scoring the approximate algorithms, we will measure the maximal throughput that is attained,
as long as the recall@10 is at least 90%.

## Dataset location and format:

The big-ann-package contains convenience functions for loading the data and ground truth files.

The dataset, along with smaller versions for development (with their ground truth files) are located in the following location:

| Name | Description | download link | #rows | ground truth |
|:--------------|----------------------------|----------------------------------------------------------------------------------------------|-----------|-------------------------------------------------------------------------------------------|
| `full` | Full base dataset | [5.5 GB](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/base_full.csr.gz) | 8,841,823 | [545K](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/base_full.dev.gt) |
| `1M` | 1M slice of base dataset | [636.3 MB](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/base_1M.csr.gz) | 1,000,000 | [545K](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/base_1M.dev.gt) |
| `small` | 100k slice of base dataset | [64.3 MB](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/base_small.csr.gz) | 100,000 | [545K](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/base_small.dev.gt) |
| `queries.dev` | queries file | [1.8 MB](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/queries.dev.csr.gz) | 6,980 | N/A |

---

TODO:

1. add results of baseline algorithm
2.

Baseline algorithm
As a baseline algorithm, we propose a basic (but efficient) exact algorithm called linscan. It is based on an inverted index, and can be made faster (and less precise) with an early stopping condition. We (pinecone) can contribute an open source implementation.

Results of the baseline algorithm:
Llinscan-anytime. Both single-thread and multi-thread:

TODO (plot throughput/recall). Extract max throughput at 90% recall.


Link to open source package (rust with python bindings):

TODO

52 changes: 52 additions & 0 deletions sparse_algorithms/basic_sparse_index.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from scipy.sparse import csr_matrix
import numpy as np

# given a vector x, returns another vector with the minimal number of largest elements of x,
# s.t. their sum is at most a times the sum of the elements in x.
#
# The goal is to sparsify the vector further,
# but at the same time try and preserve as much of the original vector as possible.
def largest_elements(x, a):
# Compute the sum of elements of x
x_sum = np.sum(x)

# Compute the indices and values of the largest elements of x
ind = np.argsort(-x.data)
cs = np.cumsum(x.data[ind] / x_sum)

n_elements = min(sum(cs < a) + 1, x.nnz) # rounding errors sometimes results in n_elements > x.nnz

new_ind = x.indices[ind[:n_elements]]
new_data = x.data[ind[:n_elements]]
return csr_matrix((new_data, new_ind, [0, n_elements]), shape=x.shape)


# a basic sparse index.
# methods:
# 1. init: from a csr matrix of data.
# 2. query a singe vector, with parameters:
# - k (# of neighbors),
# - alpha (fraction of the sum of the vector to maintain. alpha=1 is exact search).
class BasicSparseIndex(object):
def __init__(self, data_csr):
self.data_csc = data_csr.tocsc()

def query(self, q, k, alpha=1): # single query, assumes q is a row vector
if alpha == 1:
q2 = q.transpose()
else:
q2 = largest_elements(q, alpha).transpose()

# perform (sparse) matrix-vector multiplication
res = self.data_csc.dot(q2)

if res.nnz <= k: # if there are less than k elements with nonzero score, simply return them
return list(zip(res.indices, res.data))

# extract the top k from the res sparse array directly
indices = np.argpartition(res.data, -(k + 1))[-k:]
results = []
for index in indices:
results.append((res.data[index], index))
results.sort(reverse=True)
return [(res.indices[b], a) for a, b in results]
Loading

0 comments on commit 11a2ff2

Please sign in to comment.