Skip to content

Commit 6a79e02

Browse files
pdillingermeta-codesync[bot]
authored andcommitted
Support pre-defined compression dictionaries (#14253)
Summary: ... in addition to those derived from samples. This could be useful when trade-offs favor an offline trained dictionary that's good for the whole work load, which can involve heavy-weight training, vs. on-the-fly training on samples for each file, which has limitations. This involves some breaking changes to some deeper parts of the new compression API. I'm not concerned about performance because this doesn't touch the per-block parts of the API, just the per-file parts. Bonus: change to CompressionManagerWrapper::FindCompatibleCompressionManager to implement what is likely the preferred behavior. Pull Request resolved: #14253 Test Plan: unit test included Reviewed By: hx235 Differential Revision: D91082208 Pulled By: pdillinger fbshipit-source-id: 1442db65e15c9435437204c19787c96f7a40a207
1 parent a9906f0 commit 6a79e02

File tree

9 files changed

+442
-119
lines changed

9 files changed

+442
-119
lines changed

include/rocksdb/advanced_compression.h

Lines changed: 100 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
#pragma once
1313

14+
#include <variant>
15+
1416
#include "rocksdb/cache.h"
1517
#include "rocksdb/compression_type.h"
1618
#include "rocksdb/data_structure.h"
@@ -56,7 +58,64 @@ class Decompressor;
5658
// because RocksDB is not exception-safe. This could cause undefined behavior
5759
// including data loss, unreported corruption, deadlocks, and more.
5860
class Compressor {
59-
public:
61+
public: // Auxiliary types
62+
// No dictionary should be used (for a given block type).
63+
struct DictDisabled {};
64+
65+
// A recommendation for dictionary compression by collecting samples from
66+
// blocks. The caller should collect up to `max_sample_bytes` of sample data
67+
// and pass it to MaybeCloneSpecialized() to create a specialized compressor.
68+
struct DictSampling {
69+
// Maximum total bytes of sample data to collect from blocks.
70+
// This controls how much data is buffered before dictionary training.
71+
size_t max_sample_bytes = 0;
72+
};
73+
74+
// A pre-defined dictionary that is recommended or specified for direct use
75+
// with MaybeCloneSpecialized(), without any sampling.
76+
struct DictPreDefined {
77+
// The owned raw/serialized dictionary bytes. Recommend std::move to
78+
// MaybeCloneSpecialized()
79+
std::string dict_data;
80+
};
81+
82+
// The result type for GetDictGuidance() - indicates how dictionary
83+
// compression should be configured for a given block type.
84+
using DictConfig = std::variant<DictDisabled, DictSampling, DictPreDefined>;
85+
86+
// Sample data collected from blocks for dictionary training.
87+
struct DictSamples {
88+
// All the sample input blocks stored contiguously
89+
std::string sample_data;
90+
// The lengths of each of the sample blocks in `sample_data`
91+
std::vector<size_t> sample_lens;
92+
93+
bool empty() const { return sample_data.empty(); }
94+
bool Verify() const {
95+
size_t total_len = 0;
96+
for (auto len : sample_lens) {
97+
total_len += len;
98+
}
99+
return total_len == sample_data.size();
100+
}
101+
};
102+
103+
// Arguments for MaybeCloneSpecialized() - provides either samples, a
104+
// pre-defined dictionary, or indicates no dictionary should be used.
105+
// NOTE: DictPreDefined here is the same type as above, allowing the
106+
// pre-defined dictionary from GetDictGuidance() to be passed through.
107+
using DictConfigArgs =
108+
std::variant<DictDisabled, DictSamples, DictPreDefined>;
109+
110+
// A WorkingArea is an optional structure (both for callers and
111+
// implementations) that can enable optimizing repeated compressions by
112+
// reusing working space or thread-local tracking of statistics or trends.
113+
// This enables use of ZSTD context, for example.
114+
//
115+
// EXTENSIBLE or reinterpret_cast-able by custom Compressor implementations
116+
struct WorkingArea {};
117+
118+
public: // Functions
60119
Compressor() = default;
61120
virtual ~Compressor() = default;
62121

@@ -69,15 +128,17 @@ class Compressor {
69128
return id;
70129
}
71130

72-
// Returns the max total bytes of for all sampled blocks for creating the data
73-
// dictionary, or zero indicating dictionary compression should not be
74-
// used/configured. This will typically be called after
75-
// CompressionManager::GetCompressor() to see if samples should be accumulated
76-
// and passed to MaybeCloneSpecialized().
77-
virtual size_t GetMaxSampleSizeIfWantDict(CacheEntryRole block_type) const {
131+
// Returns the recommended dictionary configuration for the given block type.
132+
// See the comments on DictConfig and variants for details.
133+
//
134+
// NOTE: This may be called on the "base" Compressor returned by
135+
// CompressionManager, which is not yet configured with a dictionary,
136+
// or it can be skipped by callers not intending to handle dictionary
137+
// compression.
138+
virtual DictConfig GetDictGuidance(CacheEntryRole block_type) const {
78139
// Default implementation: no dictionary
79140
(void)block_type;
80-
return 0;
141+
return DictDisabled{};
81142
}
82143

83144
// Returns the serialized form of the data dictionary associated with this
@@ -94,67 +155,39 @@ class Compressor {
94155
// needed to implement MaybeCloneSpecialized() in wrapper compressors.
95156
virtual std::unique_ptr<Compressor> Clone() const = 0;
96157

97-
// Utility struct for providing sample data for the compression dictionary.
98-
// Potentially extensible by callers of Compressor (but not recommended)
99-
struct DictSampleArgs {
100-
// All the sample input blocks stored contiguously
101-
std::string sample_data;
102-
// The lengths of each of the sample blocks in `sample_data`
103-
std::vector<size_t> sample_lens;
104-
105-
bool empty() { return sample_data.empty(); }
106-
bool Verify() {
107-
size_t total_len = 0;
108-
for (auto len : sample_lens) {
109-
total_len += len;
110-
}
111-
return total_len == sample_data.size();
112-
}
113-
};
114-
115158
// Create potential variants of the same Compressor that might be
116159
// (a) optimized for a particular block type (does not affect correct
117160
// decompression), and/or
118-
// (b) configured to use a compression dictionary, based on the given
119-
// samples (decompression must provide the dictionary from
120-
// GetSerializedDict())
161+
// (b) configured to use a compression dictionary based on the provided
162+
// configuration (samples or pre-defined dictionary). See the comments on
163+
// DictConfigArgs and its variants for detail.
164+
//
121165
// Return of nullptr indicates no specialization exists or was attempted
122-
// and the caller is best to use the current Compressor for the desired
123-
// scenario. Using CacheEntryRole:kMisc for block_type generally means
124-
// "unspecified", and both parameters are merely suggestions. The exact
125-
// dictionary associated with a returned compressor must be read from
126-
// GetSerializedDict().
166+
// and the caller should use the current Compressor for the desired scenario.
167+
// Using CacheEntryRole::kMisc for block_type generally means "unspecified".
168+
//
169+
// The exact dictionary associated with a returned compressor must be read
170+
// from GetSerializedDict().
127171
virtual std::unique_ptr<Compressor> MaybeCloneSpecialized(
128-
CacheEntryRole block_type, DictSampleArgs&& dict_samples) const {
172+
CacheEntryRole block_type, DictConfigArgs&& dict_config) const {
129173
// Default implementation: no specialization
130174
(void)block_type;
131-
(void)dict_samples;
132-
// Caller should have checked GetMaxSampleSizeIfWantDict before attempting
133-
// to provide dictionary samples
134-
assert(dict_samples.empty());
175+
(void)dict_config;
135176
return nullptr;
136177
}
137178

138179
// A convenience function when a clone is needed and may or may not be
139180
// specialized.
140181
std::unique_ptr<Compressor> CloneMaybeSpecialized(
141-
CacheEntryRole block_type, DictSampleArgs&& dict_samples) const {
142-
auto clone = MaybeCloneSpecialized(block_type, std::move(dict_samples));
182+
CacheEntryRole block_type, DictConfigArgs&& dict_config) const {
183+
auto clone = MaybeCloneSpecialized(block_type, std::move(dict_config));
143184
if (clone == nullptr) {
144185
clone = Clone();
145186
assert(clone != nullptr);
146187
}
147188
return clone;
148189
}
149190

150-
// A WorkingArea is an optional structure (both for callers and
151-
// implementations) that can enable optimizing repeated compressions by
152-
// reusing working space or thread-local tracking of statistics or trends.
153-
// This enables use of ZSTD context, for example.
154-
//
155-
// EXTENSIBLE or reinterpret_cast-able by custom Compressor implementations
156-
struct WorkingArea {};
157-
158191
// To allow for flexible re-use / reclaimation, we have explicit Get and
159192
// Release functions, and usually wrap in a special RAII smart pointer.
160193
// For example, a WorkingArea could be saved/recycled in thread-local or
@@ -423,6 +456,12 @@ class CompressionManager
423456
// which is valid at the discretion of the CompressionManager. Returning
424457
// nullptr should normally be the result if preferred == kNoCompression.
425458
//
459+
// Compressors returned here are configured WITHOUT a dictionary, so that
460+
// it's always possible to get correct compression->decompression results
461+
// if not opting-in to dictionary handling. The compressors may recommend
462+
// dictionary usage via GetDictGuidance() and creating a modified Compressor
463+
// for that. See Compressor::GetDictGuidance() etc. for details.
464+
//
426465
// These functions must be thread-safe.
427466

428467
// Get a compressor for an SST file.
@@ -477,8 +516,8 @@ class CompressorWrapper : public Compressor {
477516
CompressorWrapper(const CompressorWrapper&) = delete;
478517
CompressorWrapper& operator=(const CompressorWrapper&) = delete;
479518

480-
size_t GetMaxSampleSizeIfWantDict(CacheEntryRole block_type) const override {
481-
return wrapped_->GetMaxSampleSizeIfWantDict(block_type);
519+
DictConfig GetDictGuidance(CacheEntryRole block_type) const override {
520+
return wrapped_->GetDictGuidance(block_type);
482521
}
483522

484523
Slice GetSerializedDict() const override {
@@ -496,9 +535,9 @@ class CompressorWrapper : public Compressor {
496535
// when the wrapped Compressor uses the default implementation of
497536
// MaybeCloneSpecialized(). This needs to be overridden if not.
498537
std::unique_ptr<Compressor> MaybeCloneSpecialized(
499-
CacheEntryRole block_type, DictSampleArgs&& dict_samples) const override {
538+
CacheEntryRole block_type, DictConfigArgs&& dict_config) const override {
500539
auto clone =
501-
wrapped_->MaybeCloneSpecialized(block_type, std::move(dict_samples));
540+
wrapped_->MaybeCloneSpecialized(block_type, std::move(dict_config));
502541
// Assert default no-op MaybeCloneSpecialized()
503542
assert(clone == nullptr);
504543
return clone;
@@ -592,7 +631,14 @@ class CompressionManagerWrapper : public CompressionManager {
592631

593632
std::shared_ptr<CompressionManager> FindCompatibleCompressionManager(
594633
Slice compatibility_name) override {
595-
return wrapped_->FindCompatibleCompressionManager(compatibility_name);
634+
// NOTE: We expect that the wrapped CompressionManager will generally
635+
// be preferred if compatible, so the default implementation here does
636+
// not purely defer to the wrapped instance
637+
if (compatibility_name == CompatibilityName()) {
638+
return shared_from_this();
639+
} else {
640+
return wrapped_->FindCompatibleCompressionManager(compatibility_name);
641+
}
596642
}
597643

598644
bool SupportsCompressionType(CompressionType type) const override {

table/block_based/block_based_table_builder.cc

Lines changed: 35 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -113,9 +113,9 @@ FilterBlockBuilder* CreateFilterBlockBuilder(
113113
// A convenience function for populating the Compressor* fields; see ~Rep()
114114
Compressor* MaybeCloneSpecialized(
115115
Compressor* compressor, CacheEntryRole block_type,
116-
Compressor::DictSampleArgs&& dict_samples = {}) {
116+
Compressor::DictConfigArgs&& dict_config = Compressor::DictDisabled{}) {
117117
auto specialized =
118-
compressor->MaybeCloneSpecialized(block_type, std::move(dict_samples));
118+
compressor->MaybeCloneSpecialized(block_type, std::move(dict_config));
119119
if (specialized) {
120120
// Caller is responsible for freeing when distinct
121121
return specialized.release();
@@ -833,7 +833,8 @@ struct BlockBasedTableBuilder::Rep {
833833
RelaxedAtomic<uint64_t> sampled_output_fast_data_bytes{0};
834834
uint32_t compression_parallel_threads;
835835
int max_compressed_bytes_per_kb;
836-
size_t max_dict_sample_bytes = 0;
836+
// Dictionary guidance for data blocks (from GetDictGuidance())
837+
Compressor::DictConfig data_block_dict_guidance;
837838

838839
// *** Compressors & decompressors - Yes, it seems like a lot here but ***
839840
// *** these are distinct fields to minimize extra conditionals and ***
@@ -1122,9 +1123,12 @@ struct BlockBasedTableBuilder::Rep {
11221123
index_block_working_area.compress =
11231124
index_block_compressor->ObtainWorkingArea();
11241125
}
1125-
max_dict_sample_bytes = basic_compressor->GetMaxSampleSizeIfWantDict(
1126-
CacheEntryRole::kDataBlock);
1127-
if (max_dict_sample_bytes > 0) {
1126+
data_block_dict_guidance =
1127+
basic_compressor->GetDictGuidance(CacheEntryRole::kDataBlock);
1128+
if (auto* sampling =
1129+
std::get_if<Compressor::DictSampling>(&data_block_dict_guidance);
1130+
sampling != nullptr && sampling->max_sample_bytes > 0) {
1131+
// Sampling mode: collect samples up to max_sample_bytes
11281132
state = State::kBuffered;
11291133
if (tbo.target_file_size == 0) {
11301134
buffer_limit = tbo.compression_opts.max_dict_buffer_bytes;
@@ -1134,7 +1138,22 @@ struct BlockBasedTableBuilder::Rep {
11341138
buffer_limit = std::min(tbo.target_file_size,
11351139
tbo.compression_opts.max_dict_buffer_bytes);
11361140
}
1141+
} else if (auto* predef = std::get_if<Compressor::DictPreDefined>(
1142+
&data_block_dict_guidance);
1143+
predef != nullptr && !predef->dict_data.empty()) {
1144+
// Pre-defined dictionary mode: use it immediately, no buffering
1145+
data_block_compressor = MaybeCloneSpecialized(
1146+
basic_compressor.get(), CacheEntryRole::kDataBlock,
1147+
Compressor::DictPreDefined{std::string{predef->dict_data}});
1148+
data_block_working_area.compress =
1149+
data_block_compressor->ObtainWorkingArea();
11371150
} else {
1151+
assert(std::holds_alternative<Compressor::DictSampling>(
1152+
data_block_dict_guidance) ||
1153+
std::holds_alternative<Compressor::DictPreDefined>(
1154+
data_block_dict_guidance) ||
1155+
std::holds_alternative<Compressor::DictDisabled>(
1156+
data_block_dict_guidance));
11381157
// No distinct data block compressor using dictionary, but
11391158
// implementation might still want to specialize for data blocks
11401159
data_block_compressor = MaybeCloneSpecialized(
@@ -2632,14 +2651,18 @@ void BlockBasedTableBuilder::MaybeEnterUnbuffered(
26322651
kPrimeGenerator % static_cast<uint64_t>(kNumBlocksBuffered));
26332652
const size_t kInitSampleIdx = kNumBlocksBuffered / 2;
26342653

2635-
Compressor::DictSampleArgs samples;
2654+
Compressor::DictSamples samples;
26362655
size_t buffer_idx = kInitSampleIdx;
2637-
for (size_t i = 0; i < kNumBlocksBuffered &&
2638-
samples.sample_data.size() < r->max_dict_sample_bytes;
2656+
// Get max_sample_bytes from the DictSampling guidance
2657+
auto* sampling =
2658+
std::get_if<Compressor::DictSampling>(&r->data_block_dict_guidance);
2659+
assert(sampling != nullptr);
2660+
size_t max_sample_bytes = sampling->max_sample_bytes;
2661+
for (size_t i = 0;
2662+
i < kNumBlocksBuffered && samples.sample_data.size() < max_sample_bytes;
26392663
++i) {
2640-
size_t copy_len =
2641-
std::min(r->max_dict_sample_bytes - samples.sample_data.size(),
2642-
r->data_block_buffers[buffer_idx].size());
2664+
size_t copy_len = std::min(max_sample_bytes - samples.sample_data.size(),
2665+
r->data_block_buffers[buffer_idx].size());
26432666
samples.sample_data.append(r->data_block_buffers[buffer_idx], 0, copy_len);
26442667
samples.sample_lens.emplace_back(copy_len);
26452668

test_util/testutil.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -796,9 +796,9 @@ struct CompressorCustomAlg : public CompressorWrapper {
796796
}
797797

798798
std::unique_ptr<Compressor> MaybeCloneSpecialized(
799-
CacheEntryRole block_type, DictSampleArgs&& dict_samples) const override {
799+
CacheEntryRole block_type, DictConfigArgs&& dict_config) const override {
800800
auto clone =
801-
wrapped_->CloneMaybeSpecialized(block_type, std::move(dict_samples));
801+
wrapped_->CloneMaybeSpecialized(block_type, std::move(dict_config));
802802
return std::make_unique<CompressorCustomAlg>(std::move(clone));
803803
}
804804

util/auto_tune_compressor.cc

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,9 @@ std::unique_ptr<Compressor> AutoSkipCompressorWrapper::Clone() const {
6464
}
6565

6666
std::unique_ptr<Compressor> AutoSkipCompressorWrapper::MaybeCloneSpecialized(
67-
CacheEntryRole block_type, DictSampleArgs&& dict_samples) const {
67+
CacheEntryRole block_type, DictConfigArgs&& dict_config) const {
6868
auto clone =
69-
wrapped_->CloneMaybeSpecialized(block_type, std::move(dict_samples));
69+
wrapped_->CloneMaybeSpecialized(block_type, std::move(dict_config));
7070
return std::make_unique<AutoSkipCompressorWrapper>(std::move(clone), opts_);
7171
}
7272

@@ -189,11 +189,10 @@ const char* CostAwareCompressor::Name() const { return "CostAwareCompressor"; }
189189
std::unique_ptr<Compressor> CostAwareCompressor::Clone() const {
190190
return std::make_unique<CostAwareCompressor>(opts_);
191191
}
192-
size_t CostAwareCompressor::GetMaxSampleSizeIfWantDict(
192+
Compressor::DictConfig CostAwareCompressor::GetDictGuidance(
193193
CacheEntryRole block_type) const {
194194
auto idx = allcompressors_index_.back();
195-
return allcompressors_[idx.first][idx.second]->GetMaxSampleSizeIfWantDict(
196-
block_type);
195+
return allcompressors_[idx.first][idx.second]->GetDictGuidance(block_type);
197196
}
198197

199198
Slice CostAwareCompressor::GetSerializedDict() const {
@@ -205,12 +204,12 @@ CompressionType CostAwareCompressor::GetPreferredCompressionType() const {
205204
return kZSTD;
206205
}
207206
std::unique_ptr<Compressor> CostAwareCompressor::MaybeCloneSpecialized(
208-
CacheEntryRole block_type, DictSampleArgs&& dict_samples) const {
207+
CacheEntryRole block_type, DictConfigArgs&& dict_config) const {
209208
// TODO: full dictionary compression support. Currently this just falls
210209
// back on a non-multi compressor when asked to use a dictionary.
211210
auto idx = allcompressors_index_.back();
212211
return allcompressors_[idx.first][idx.second]->MaybeCloneSpecialized(
213-
block_type, std::move(dict_samples));
212+
block_type, std::move(dict_config));
214213
}
215214
Status CostAwareCompressor::CompressBlock(Slice uncompressed_data,
216215
char* compressed_output,

util/auto_tune_compressor.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class AutoSkipCompressorWrapper : public CompressorWrapper {
6666

6767
std::unique_ptr<Compressor> Clone() const override;
6868
std::unique_ptr<Compressor> MaybeCloneSpecialized(
69-
CacheEntryRole block_type, DictSampleArgs&& dict_samples) const override;
69+
CacheEntryRole block_type, DictConfigArgs&& dict_config) const override;
7070
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
7171
size_t* compressed_output_size,
7272
CompressionType* out_compression_type,
@@ -153,12 +153,12 @@ class CostAwareCompressor : public Compressor {
153153
explicit CostAwareCompressor(const CompressionOptions& opts);
154154
const char* Name() const override;
155155
std::unique_ptr<Compressor> Clone() const override;
156-
size_t GetMaxSampleSizeIfWantDict(CacheEntryRole block_type) const override;
156+
DictConfig GetDictGuidance(CacheEntryRole block_type) const override;
157157
Slice GetSerializedDict() const override;
158158
CompressionType GetPreferredCompressionType() const override;
159159
ManagedWorkingArea ObtainWorkingArea() override;
160160
std::unique_ptr<Compressor> MaybeCloneSpecialized(
161-
CacheEntryRole block_type, DictSampleArgs&& dict_samples) const override;
161+
CacheEntryRole block_type, DictConfigArgs&& dict_config) const override;
162162

163163
Status CompressBlock(Slice uncompressed_data, char* compressed_output,
164164
size_t* compressed_output_size,

0 commit comments

Comments
 (0)