diff --git a/benchmark/datasets.py b/benchmark/datasets.py index 4692367e..3fa77187 100644 --- a/benchmark/datasets.py +++ b/benchmark/datasets.py @@ -1,6 +1,7 @@ import math import numpy import os +import gzip import random import sys import struct @@ -550,6 +551,168 @@ def search_type(self): return "knn" +def strip_gz(filename): + if not filename.endswith('.gz'): + raise RuntimeError(f"expected a filename ending with '.gz'. Received: {filename}") + return filename[:-3] + + +def gunzip_if_needed(filename): + if filename.endswith('.gz'): + print('unzipping', filename, '...', end=" ") + with gzip.open(filename, 'rb') as f: + file_content = f.read() + + with open(strip_gz(filename), 'wb') as f: + f.write(file_content) + + os.remove(filename) + print('done.') + + +class SparseDataset(DatasetCompetitionFormat): + """ the 2023 competition + Sparse vectors for sparse max inner product search + Data is based on MSMARCO passage retrieval data (text passages and queries), + embedded via the SPLADE model. + + The class overrides several methods since the sparse format is different than other datasets. + """ + + def __init__(self, version="small"): + + versions = {"small": (100000, "base_small.csr.gz", "base_small.dev.gt"), + "1M": (1000000, "base_1M.csr.gz", "base_1M.dev.gt"), + "full": (8841823, "base_full.csr.gz", "base_full.dev.gt")} + + assert versions.keys().__contains__( + version), f'version="{version}" is invalid. Please choose one of {list(versions.keys())}.' + + self.nb = versions[version][0] + self.nq = 6980 + self.private_nq = 0 # TBD + + self.ds_fn = versions[version][1] + self.qs_fn = "queries.dev.csr.gz" + + self.qs_private_fn = "" # TBD + + self.base_url = "https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/" + self.basedir = os.path.join(BASEDIR, "sparse") + + self.gt_fn = versions[version][2] + self.private_gt = "" # TBD + + self.d = np.nan # this is only for compatibility with printing the name of the class + + def prepare(self, skip_data=False): + # downloads the datasets and unzips (if necessary). + + if not os.path.exists(self.basedir): + os.makedirs(self.basedir) + + # start with the small ones... + for fn in [self.qs_fn, self.gt_fn]: + if fn is None: + continue + + sourceurl = os.path.join(self.base_url, fn) + outfile = os.path.join(self.basedir, fn) + if outfile.endswith('.gz'): + # check if the unzipped file already exists + if os.path.exists(strip_gz(outfile)): + print("unzipped version of file %s already exists" % outfile) + continue + + if os.path.exists(outfile): + print("file %s already exists" % outfile) + gunzip_if_needed(outfile) + continue + download(sourceurl, outfile) + gunzip_if_needed(outfile) + # # private qs url: todo + + if skip_data: + return + + fn = self.ds_fn + sourceurl = os.path.join(self.base_url, fn) + outfile = os.path.join(self.basedir, fn) + if outfile.endswith('.gz'): + # check if the unzipped file already exists + unzipped_outfile = strip_gz(outfile) + if os.path.exists(unzipped_outfile): + print("unzipped version of file %s already exists" % outfile) + return + if os.path.exists(outfile): + print("file %s already exists" % outfile) + gunzip_if_needed(outfile) + return + download_accelerated(sourceurl, outfile) + gunzip_if_needed(outfile) + + def get_dataset_fn(self): + fn = strip_gz(os.path.join(self.basedir, self.ds_fn)) + if os.path.exists(fn): + return fn + raise RuntimeError("file not found") + + def get_dataset_iterator(self, bs=512, split=(1, 0)): + assert split == (1,0), 'No sharding supported yet.' # todo + + filename = self.get_dataset_fn() + + x = read_sparse_matrix(filename, do_mmap=True) + assert x.shape[0] == self.nb + + for j0 in range(0, self.nb, bs): + j1 = min(j0 + bs, self.nb) + yield x[j0:j1, :] + + # i0, i1 = self.nb * rank // nsplit, self.nb * (rank + 1) // nsplit + # x = xbin_mmap(filename, dtype=self.dtype, maxn=self.nb) + # assert x.shape == (self.nb, self.d) + + # for j0 in range(i0, i1, bs): + # j1 = min(j0 + bs, i1) + # yield sanitize(x[j0:j1]) + + + def get_groundtruth(self, k=None): + assert self.gt_fn is not None + assert self.search_type() == "knn" + + I, D = knn_result_read(os.path.join(self.basedir, self.gt_fn)) + assert I.shape[0] == self.nq + if k is not None: + assert k <= 10 + I = I[:, :k] + D = D[:, :k] + return I, D + + def get_dataset(self): + assert self.nb <= 10 ** 6, "dataset too large, use iterator" + return next(self.get_dataset_iterator(bs=self.nb)) + + def get_queries(self): + filename = os.path.join(self.basedir, self.qs_fn) + x = read_sparse_matrix(strip_gz(filename), do_mmap=False) # read the queries file. It is a small file, so no need to mmap + assert x.shape[0] == self.nq + return x + + def get_private_queries(self): + raise RuntimeError("not implemented yet") + + def get_private_groundtruth(self, k=None): + raise RuntimeError("not implemented yet") + + def distance(self): + return "ip" + + def search_type(self): + return "knn" + + class RandomDS(DatasetCompetitionFormat): def __init__(self, nb, nq, d): self.nb = nb diff --git a/dataset_preparation/make_sparse_groundtruth.py b/dataset_preparation/make_sparse_groundtruth.py new file mode 100644 index 00000000..1c3c3309 --- /dev/null +++ b/dataset_preparation/make_sparse_groundtruth.py @@ -0,0 +1,85 @@ +import argparse +from tqdm import tqdm +import time +import numpy as np +from scipy.sparse import csr_matrix, hstack +from multiprocessing.pool import ThreadPool + +from benchmark.dataset_io import usbin_write, read_sparse_matrix + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + def aa(*args, **kwargs): + group.add_argument(*args, **kwargs) + + group = parser.add_argument_group('File location') + aa('--base_csr_file', required=True, help="location of a .csr file representing the base data") + aa('--query_csr_file', required=True, help="location of a .csr file representing the query data") + aa('--output_file', required=True, help="location of the ground truth file to be generated") + + group = parser.add_argument_group('Computation options') + aa('--k', default=10, type=int, help="number of nearest kNN neighbors to search") + # aa("--maxRAM", default=100, type=int, help="set max RSS in GB (avoid OOM crash)") + aa('--nt', type=int, help="# of processes in thread pool. If omitted, then a single thread is used.") + + args = parser.parse_args() + + print("args:", args) + print('k: ', args.k) + + data = read_sparse_matrix(args.base_csr_file) + queries = read_sparse_matrix(args.query_csr_file) + print('data:', data.shape) + print('queries:', queries.shape) + + # pad the queries with virtual zeros to match the length of the data files + queries = hstack([queries, csr_matrix((queries.shape[0], data.shape[1] - queries.shape[1]))]) + print('after padding: ') + print('data:', data.shape) + print('queries:', queries.shape) + + k = args.k + nq = queries.shape[0] + + D = np.zeros((nq, k), dtype='float32') + I = -np.ones((nq, k), dtype='int32') + + + def process_single_row(i): + res = data.dot(queries.getrow(i).transpose()) + ra = res.toarray() + top_ind = np.argpartition(ra, -k, axis=0)[-k:][:, 0] + + index_and_dot_prod = [(i, ra[i, 0]) for i in top_ind] + index_and_dot_prod.sort(key=lambda a: a[1], reverse=True) + + return index_and_dot_prod + + start = time.time() + # single thread + if args.nt is None: + print('computing ground truth for', nq, 'queries (single thread):') + res = [] + for i in tqdm(range(nq)): # tqdm(range(nq)) + res.append(process_single_row(i)) + else: + print('computing ground truth for', nq, 'queries (' + str(args.nt) + ' threads):') + with ThreadPool(processes=args.nt) as pool: + # Map the function to the array of items + # res = pool.map(process_single_row, range(nq)) + res = list(tqdm(pool.imap(process_single_row, range(nq)), total=nq)) + + end = time.time() + elapsed = end - start + print(f'Elapsed {elapsed}s for {nq} queries ({nq / elapsed} QPS) ') + + # rearrange to match the format of usbin_write: + for i in range(nq): + I[i, :] = [p[0] for p in res[i]] + D[i, :] = [p[1] for p in res[i]] + + print() + print("Writing result to", args.output_file) + + usbin_write(I, D, args.output_file) diff --git a/dataset_preparation/sparse_dataset.md b/dataset_preparation/sparse_dataset.md new file mode 100644 index 00000000..7aafe3f6 --- /dev/null +++ b/dataset_preparation/sparse_dataset.md @@ -0,0 +1,55 @@ +# Sparse dataset for the 2023 ANN challenge + +## Goal +This is a dataset of sparse vectors. These are vectors of very high dimension (~30k), +but with a small number of nonzero elements. +A typical example is a way to represent text, where the dimension is the vocabulary, +and the values correspond to the different words in each document / paragraph that is indexed. + +## Dataset details + +**Dataset**: sparse embedding of the MS-MARCO passage retrieval dataset. +The embeddings are based on a deep learning model called SPLADE (specifically, it is the + SPLADE CoCondenser EnsembleDistil (`naver/splade-cocondenser-ensembledistil`)). + +The base dataset contains .8M vectors with average sparsity (# of nonzeros): ~130. All nonzero values are positive. + +The common query set (`dev.small`) contains 6980 queries, where the average number of nonzeros is ~49. + +Similarity is measured by max dot-product, and the overall retrieval score is Recall@10. +For scoring the approximate algorithms, we will measure the maximal throughput that is attained, +as long as the recall@10 is at least 90%. + +## Dataset location and format: + +The big-ann-package contains convenience functions for loading the data and ground truth files. + +The dataset, along with smaller versions for development (with their ground truth files) are located in the following location: + +| Name | Description | download link | #rows | ground truth | +|:--------------|----------------------------|----------------------------------------------------------------------------------------------|-----------|-------------------------------------------------------------------------------------------| +| `full` | Full base dataset | [5.5 GB](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/base_full.csr.gz) | 8,841,823 | [545K](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/base_full.dev.gt) | +| `1M` | 1M slice of base dataset | [636.3 MB](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/base_1M.csr.gz) | 1,000,000 | [545K](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/base_1M.dev.gt) | +| `small` | 100k slice of base dataset | [64.3 MB](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/base_small.csr.gz) | 100,000 | [545K](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/base_small.dev.gt) | +| `queries.dev` | queries file | [1.8 MB](https://storage.googleapis.com/ann-challenge-sparse-vectors/csr/queries.dev.csr.gz) | 6,980 | N/A | + +--- + +TODO: + +1. add results of baseline algorithm +2. + +Baseline algorithm +As a baseline algorithm, we propose a basic (but efficient) exact algorithm called linscan. It is based on an inverted index, and can be made faster (and less precise) with an early stopping condition. We (pinecone) can contribute an open source implementation. + +Results of the baseline algorithm: +Llinscan-anytime. Both single-thread and multi-thread: + +TODO (plot throughput/recall). Extract max throughput at 90% recall. + + +Link to open source package (rust with python bindings): + +TODO + diff --git a/sparse_algorithms/basic_sparse_index.py b/sparse_algorithms/basic_sparse_index.py new file mode 100644 index 00000000..4506e8b9 --- /dev/null +++ b/sparse_algorithms/basic_sparse_index.py @@ -0,0 +1,52 @@ +from scipy.sparse import csr_matrix +import numpy as np + +# given a vector x, returns another vector with the minimal number of largest elements of x, +# s.t. their sum is at most a times the sum of the elements in x. +# +# The goal is to sparsify the vector further, +# but at the same time try and preserve as much of the original vector as possible. +def largest_elements(x, a): + # Compute the sum of elements of x + x_sum = np.sum(x) + + # Compute the indices and values of the largest elements of x + ind = np.argsort(-x.data) + cs = np.cumsum(x.data[ind] / x_sum) + + n_elements = min(sum(cs < a) + 1, x.nnz) # rounding errors sometimes results in n_elements > x.nnz + + new_ind = x.indices[ind[:n_elements]] + new_data = x.data[ind[:n_elements]] + return csr_matrix((new_data, new_ind, [0, n_elements]), shape=x.shape) + + +# a basic sparse index. +# methods: +# 1. init: from a csr matrix of data. +# 2. query a singe vector, with parameters: +# - k (# of neighbors), +# - alpha (fraction of the sum of the vector to maintain. alpha=1 is exact search). +class BasicSparseIndex(object): + def __init__(self, data_csr): + self.data_csc = data_csr.tocsc() + + def query(self, q, k, alpha=1): # single query, assumes q is a row vector + if alpha == 1: + q2 = q.transpose() + else: + q2 = largest_elements(q, alpha).transpose() + + # perform (sparse) matrix-vector multiplication + res = self.data_csc.dot(q2) + + if res.nnz <= k: # if there are less than k elements with nonzero score, simply return them + return list(zip(res.indices, res.data)) + + # extract the top k from the res sparse array directly + indices = np.argpartition(res.data, -(k + 1))[-k:] + results = [] + for index in indices: + results.append((res.data[index], index)) + results.sort(reverse=True) + return [(res.indices[b], a) for a, b in results] diff --git a/sparse_algorithms/eval_sparse.py b/sparse_algorithms/eval_sparse.py new file mode 100644 index 00000000..b680215e --- /dev/null +++ b/sparse_algorithms/eval_sparse.py @@ -0,0 +1,86 @@ +import argparse + +from tqdm import tqdm +import time +import numpy as np +from multiprocessing.pool import ThreadPool + +from benchmark.datasets import SparseDataset +from sparse_algorithms.basic_sparse_index import BasicSparseIndex + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + + def aa(*args, **kwargs): + group.add_argument(*args, **kwargs) + + group = parser.add_argument_group('files') + aa('--dataset_name', required=True, help="Sparse dataset version, must be in {'small', '1M', 'full'}.") +# aa('--output_file', required=True, help="location of the results file") + + group = parser.add_argument_group('Computation options') + aa('--alpha', default=1, type=float, help="fraction of weight of the query to retain before search. Default: 1.0 (exact search)") + aa('--k', default=10, type=int, help="number of nearest kNN neighbors to search") + aa('--nt', type=int, help="# of processes in thread pool. If omitted, then a single thread is used.") + + args = parser.parse_args() + + print("args:", args) + print('k: ', args.k) + + ds = SparseDataset(version=args.dataset_name) + + ds.prepare() + + I_gt,D_gt = ds.get_groundtruth() + + data = ds.get_dataset() + queries = ds.get_queries() + + print('data:', data.shape) + print('queries:', queries.shape) + + k = args.k + a = args.alpha + nq = queries.shape[0] + + # build index: + index = BasicSparseIndex(data) + + # prepare location for storing the results of the algorithm + D = np.zeros((ds.nq, k), dtype='float32') + I = -np.ones((ds.nq, k), dtype='int32') + + def process_single_row(i): + res = index.query(q=queries.getrow(i), k=k, alpha=a) + I[i, :] = [rr[0] for rr in res] + D[i, :] = [rr[1] for rr in res] + + start = time.time() + # single thread + if args.nt is None: + print('evaluating', nq, 'queries (single thread):') + for i in tqdm(range(nq)): # tqdm(range(nq)) + process_single_row(i) + else: + print('evaluating', nq, 'queries (' + str(args.nt) + ' threads):') + with ThreadPool(processes=args.nt) as pool: + # Map the function to the array of items + # res = pool.map(process_single_row, range(nq)) + list(tqdm(pool.imap(process_single_row, range(nq)), total=nq)) + + end = time.time() + elapsed = end - start + print(f'Elapsed {elapsed}s for {nq} queries ({nq / elapsed} QPS) ') + + # res is now a list of the outputs of every query + + # compute recall: + + # recall = faiss.eval_intersection(I_gt, I) / k / ds.nq + recall = np.mean([len(set(I_gt[i,:]).intersection(I[i,:]))/k for i in range(ds.nq)]) + + print('recall:', recall) + print() + print(f'Results: {a}, {recall}, {nq / elapsed}') +