Skip to content

Commit 597112f

Browse files
authored
Vamana index save/load to a stream using intermendiate temp directory. (#192)
1. Simple `DirectoryArchiver` to pack/unpack a directory to a stream 2. Helper class to make/remove temp directory 3. Vamana and DynamicVamana orchestrators save/assembly to/from a stream. 4. Added `DirectoryArchiver` unit test and updated `Vamana` integration search test.
1 parent 46d8105 commit 597112f

File tree

10 files changed

+565
-0
lines changed

10 files changed

+565
-0
lines changed

include/svs/index/flat/flat.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ template <
158158
typename Ownership = OwnsMembers>
159159
class FlatIndex {
160160
public:
161+
static constexpr bool supports_insertions = false;
162+
static constexpr bool supports_deletions = false;
163+
static constexpr bool supports_saving = true;
164+
static constexpr bool needs_id_translation = false;
165+
161166
using const_value_type = data::const_value_type_t<Data>;
162167

163168
/// The type of the distance functor.

include/svs/lib/file.h

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@
1818

1919
// svs
2020
#include "svs/lib/exception.h"
21+
#include "svs/lib/uuid.h"
2122

2223
// stl
2324
#include <filesystem>
2425
#include <fstream>
26+
#include <span>
2527

2628
namespace svs::lib {
2729

@@ -110,4 +112,232 @@ inline std::ifstream open_read(
110112
return std::ifstream(path, mode);
111113
}
112114

115+
inline std::filesystem::path unique_temp_directory_path(const std::string& prefix) {
116+
namespace fs = std::filesystem;
117+
auto temp_dir = fs::temp_directory_path();
118+
// Try up to 10 times to create a unique directory.
119+
for (int i = 0; i < 10; ++i) {
120+
auto dir = temp_dir / (prefix + "-" + svs::lib::UUID().str());
121+
if (!fs::exists(dir)) {
122+
return dir;
123+
}
124+
return dir;
125+
}
126+
throw ANNEXCEPTION("Could not create a unique temporary directory!");
127+
}
128+
129+
// RAII helper to create and delete a temporary directory.
130+
struct UniqueTempDirectory {
131+
std::filesystem::path path;
132+
133+
UniqueTempDirectory(const std::string& prefix)
134+
: path{unique_temp_directory_path(prefix)} {
135+
std::filesystem::create_directories(path);
136+
}
137+
138+
~UniqueTempDirectory() {
139+
try {
140+
std::filesystem::remove_all(path);
141+
} catch (...) {
142+
// Ignore errors.
143+
}
144+
}
145+
146+
std::filesystem::path get() const { return path; }
147+
operator const std::filesystem::path&() const { return path; }
148+
};
149+
150+
// Simple directory archiver to pack/unpack a directory to/from a stream.
151+
// Uses a simple custom binary format.
152+
// Not meant to be super efficient, just a simple way to serialize a directory
153+
// structure to a stream.
154+
struct DirectoryArchiver {
155+
using size_type = uint64_t;
156+
157+
// TODO: Define CACHELINE_BYTES in a common place
158+
// rather than duplicating it here and in prefetch.h
159+
static constexpr auto CACHELINE_BYTES = 64;
160+
static constexpr size_type magic_number = 0x5e2d58d9f3b4a6c1;
161+
162+
static size_type write_size(std::ostream& os, size_type size) {
163+
os.write(reinterpret_cast<const char*>(&size), sizeof(size));
164+
if (!os) {
165+
throw ANNEXCEPTION("Error writing to stream!");
166+
}
167+
return sizeof(size);
168+
}
169+
170+
static size_type read_size(std::istream& is, size_type& size) {
171+
is.read(reinterpret_cast<char*>(&size), sizeof(size));
172+
if (!is) {
173+
throw ANNEXCEPTION("Error reading from stream!");
174+
}
175+
return sizeof(size);
176+
}
177+
178+
static size_type write_name(std::ostream& os, const std::string& name) {
179+
auto bytes = write_size(os, name.size());
180+
os.write(name.data(), name.size());
181+
if (!os) {
182+
throw ANNEXCEPTION("Error writing to stream!");
183+
}
184+
return bytes + name.size();
185+
}
186+
187+
static size_type read_name(std::istream& is, std::string& name) {
188+
size_type size = 0;
189+
auto bytes = read_size(is, size);
190+
name.resize(size);
191+
is.read(name.data(), size);
192+
if (!is) {
193+
throw ANNEXCEPTION("Error reading from stream!");
194+
}
195+
return bytes + size;
196+
}
197+
198+
static size_type write_file(
199+
std::ostream& stream,
200+
const std::filesystem::path& path,
201+
const std::filesystem::path& root
202+
) {
203+
namespace fs = std::filesystem;
204+
check_file(path, std::ios_base::in | std::ios_base::binary);
205+
206+
// Write the filename as a string.
207+
std::string filename = fs::relative(path, root).string();
208+
auto header_bytes = write_name(stream, filename);
209+
if (!stream) {
210+
throw ANNEXCEPTION("Error writing to stream!");
211+
}
212+
213+
// Write the size of the file.
214+
size_type filesize = fs::file_size(path);
215+
header_bytes += write_size(stream, filesize);
216+
if (!stream) {
217+
throw ANNEXCEPTION("Error writing to stream!");
218+
}
219+
220+
// Now write the actual file contents.
221+
std::ifstream in(path, std::ios_base::in | std::ios_base::binary);
222+
if (!in) {
223+
throw ANNEXCEPTION("Error opening file {} for reading!", path);
224+
}
225+
stream << in.rdbuf();
226+
if (!stream) {
227+
throw ANNEXCEPTION("Error writing to stream!");
228+
}
229+
230+
return header_bytes + filesize;
231+
}
232+
233+
static size_type read_file(std::istream& stream, const std::filesystem::path& root) {
234+
namespace fs = std::filesystem;
235+
236+
// Read the filename as a string.
237+
std::string filename;
238+
auto header_bytes = read_name(stream, filename);
239+
if (!stream) {
240+
throw ANNEXCEPTION("Error reading from stream!");
241+
}
242+
243+
auto path = root / filename;
244+
auto parent_dir = path.parent_path();
245+
if (!fs::exists(parent_dir)) {
246+
fs::create_directories(parent_dir);
247+
} else if (!fs::is_directory(parent_dir)) {
248+
throw ANNEXCEPTION("Path {} exists and is not a directory!", root);
249+
}
250+
check_file(path, std::ios_base::out | std::ios_base::binary);
251+
252+
// Read the size of the file.
253+
std::uint64_t filesize = 0;
254+
header_bytes += read_size(stream, filesize);
255+
if (!stream) {
256+
throw ANNEXCEPTION("Error reading from stream!");
257+
}
258+
259+
// Now write the actual file contents.
260+
std::ofstream out(path, std::ios_base::out | std::ios_base::binary);
261+
if (!out) {
262+
throw ANNEXCEPTION("Error opening file {} for writing!", path);
263+
}
264+
265+
// Copy the data in chunks.
266+
constexpr size_t buffer_size = 1 << 13; // 8KB buffer
267+
alignas(CACHELINE_BYTES) char buffer[buffer_size];
268+
269+
size_t bytes_remaining = filesize;
270+
while (bytes_remaining > 0) {
271+
size_t to_read = std::min(buffer_size, bytes_remaining);
272+
stream.read(buffer, to_read);
273+
if (!stream) {
274+
throw ANNEXCEPTION("Error reading from stream!");
275+
}
276+
out.write(buffer, to_read);
277+
if (!out) {
278+
throw ANNEXCEPTION("Error writing to file {}!", path);
279+
}
280+
bytes_remaining -= to_read;
281+
}
282+
283+
return header_bytes + filesize;
284+
}
285+
286+
static size_t pack(const std::filesystem::path& dir, std::ostream& stream) {
287+
namespace fs = std::filesystem;
288+
if (!fs::is_directory(dir)) {
289+
throw ANNEXCEPTION("Path {} is not a directory!", dir);
290+
}
291+
292+
auto total_bytes = write_size(stream, magic_number);
293+
294+
// Calculate the number of files in the directory.
295+
uint64_t filesnum = std::count_if(
296+
fs::recursive_directory_iterator{dir},
297+
fs::recursive_directory_iterator{},
298+
[&](const auto& entry) { return entry.is_regular_file(); }
299+
);
300+
total_bytes += write_size(stream, filesnum);
301+
302+
// Now serialize each file in the directory recursively.
303+
for (const auto& entry : fs::recursive_directory_iterator{dir}) {
304+
if (entry.is_regular_file()) {
305+
total_bytes += write_file(stream, entry.path(), dir);
306+
}
307+
// Ignore other types of entries.
308+
}
309+
310+
return total_bytes;
311+
}
312+
313+
static size_t unpack(std::istream& stream, const std::filesystem::path& root) {
314+
namespace fs = std::filesystem;
315+
316+
// Read and verify the magic number.
317+
size_type magic = 0;
318+
auto total_bytes = read_size(stream, magic);
319+
if (magic != magic_number) {
320+
throw ANNEXCEPTION("Invalid magic number in directory unpacking!");
321+
}
322+
323+
size_type num_files = 0;
324+
total_bytes += read_size(stream, num_files);
325+
if (!stream) {
326+
throw ANNEXCEPTION("Error reading from stream!");
327+
}
328+
329+
if (!fs::exists(root)) {
330+
fs::create_directories(root);
331+
} else if (!fs::is_directory(root)) {
332+
throw ANNEXCEPTION("Path {} exists and is not a directory!", root);
333+
}
334+
335+
// Now deserialize each file in the directory.
336+
for (size_type i = 0; i < num_files; ++i) {
337+
total_bytes += read_file(stream, root);
338+
}
339+
340+
return total_bytes;
341+
}
342+
};
113343
} // namespace svs::lib

include/svs/orchestrators/dynamic_flat.h

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ class DynamicFlatInterface {
5858
const std::filesystem::path& config_directory,
5959
const std::filesystem::path& data_directory
6060
) = 0;
61+
virtual void save(std::ostream& stream) = 0;
6162
};
6263

6364
template <lib::TypeList QueryTypes, typename Impl>
@@ -118,6 +119,21 @@ class DynamicFlatImpl
118119
) override {
119120
impl().save(config_directory, data_directory);
120121
}
122+
123+
// Stream-based save implementation
124+
void save(std::ostream& stream) override {
125+
if constexpr (Impl::supports_saving) {
126+
lib::UniqueTempDirectory tempdir{"svs_dynflat_save"};
127+
const auto config_dir = tempdir.get() / "config";
128+
const auto data_dir = tempdir.get() / "data";
129+
std::filesystem::create_directories(config_dir);
130+
std::filesystem::create_directories(data_dir);
131+
save(config_dir, data_dir);
132+
lib::DirectoryArchiver::pack(tempdir, stream);
133+
} else {
134+
throw ANNEXCEPTION("The current DynamicFlat backend doesn't support saving!");
135+
}
136+
}
121137
};
122138

123139
// Forward Declarations.
@@ -253,6 +269,44 @@ class DynamicFlat : public manager::IndexManager<DynamicFlatInterface> {
253269
);
254270
}
255271

272+
// Assembly from stream
273+
template <
274+
manager::QueryTypeDefinition QueryTypes,
275+
typename Data,
276+
typename Distance,
277+
typename ThreadPoolProto,
278+
typename... DataLoaderArgs>
279+
static DynamicFlat assemble(
280+
std::istream& stream,
281+
const Distance& distance,
282+
ThreadPoolProto threadpool_proto,
283+
DataLoaderArgs&&... data_args
284+
) {
285+
namespace fs = std::filesystem;
286+
lib::UniqueTempDirectory tempdir{"svs_dynflat_load"};
287+
lib::DirectoryArchiver::unpack(stream, tempdir);
288+
289+
const auto config_path = tempdir.get() / "config";
290+
if (!fs::is_directory(config_path)) {
291+
throw ANNEXCEPTION(
292+
"Invalid Dynamic Flat index archive: missing config directory!"
293+
);
294+
}
295+
296+
const auto data_path = tempdir.get() / "data";
297+
if (!fs::is_directory(data_path)) {
298+
throw ANNEXCEPTION("Invalid Dynamic Flat index archive: missing data directory!"
299+
);
300+
}
301+
302+
return assemble<QueryTypes>(
303+
config_path,
304+
lib::load_from_disk<Data>(data_path, SVS_FWD(data_args)...),
305+
distance,
306+
threads::as_threadpool(std::move(threadpool_proto))
307+
);
308+
}
309+
256310
///// Distance
257311
/// @brief Get the distance between a vector in the index and a query vector
258312
/// @tparam Query The query vector type

include/svs/orchestrators/dynamic_vamana.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,8 @@ class DynamicVamana : public manager::IndexManager<DynamicVamanaInterface> {
255255
impl_->save(config_dir, graph_dir, data_dir);
256256
}
257257

258+
void save(std::ostream& stream) { impl_->save(stream); }
259+
258260
/// Reconstruction
259261
void reconstruct_at(data::SimpleDataView<float> data, std::span<const uint64_t> ids) {
260262
impl_->reconstruct_at(data, ids);
@@ -340,6 +342,48 @@ class DynamicVamana : public manager::IndexManager<DynamicVamanaInterface> {
340342
);
341343
}
342344

345+
// Assembly from stream
346+
template <
347+
manager::QueryTypeDefinition QueryTypes,
348+
typename Data,
349+
typename Distance,
350+
typename ThreadPoolProto,
351+
typename... DataLoaderArgs>
352+
static DynamicVamana assemble(
353+
std::istream& stream,
354+
const Distance& distance,
355+
ThreadPoolProto threadpool_proto,
356+
DataLoaderArgs&&... data_args
357+
) {
358+
namespace fs = std::filesystem;
359+
lib::UniqueTempDirectory tempdir{"svs_vamana_load"};
360+
lib::DirectoryArchiver::unpack(stream, tempdir);
361+
362+
const auto config_path = tempdir.get() / "config";
363+
if (!fs::is_directory(config_path)) {
364+
throw ANNEXCEPTION("Invalid Vamana index archive: missing config directory!");
365+
}
366+
367+
const auto graph_path = tempdir.get() / "graph";
368+
if (!fs::is_directory(graph_path)) {
369+
throw ANNEXCEPTION("Invalid Vamana index archive: missing graph directory!");
370+
}
371+
372+
const auto data_path = tempdir.get() / "data";
373+
if (!fs::is_directory(data_path)) {
374+
throw ANNEXCEPTION("Invalid Vamana index archive: missing data directory!");
375+
}
376+
377+
return assemble<QueryTypes>(
378+
config_path,
379+
svs::GraphLoader{graph_path},
380+
lib::load_from_disk<Data>(data_path, SVS_FWD(data_args)...),
381+
distance,
382+
threads::as_threadpool(std::move(threadpool_proto)),
383+
false
384+
);
385+
}
386+
343387
/// @copydoc svs::Vamana::batch_iterator
344388
template <typename QueryType, size_t N>
345389
svs::VamanaIterator batch_iterator(

0 commit comments

Comments
 (0)