Skip to content

Commit edaa6c3

Browse files
committed
chore: rebase main
1 parent 31de401 commit edaa6c3

File tree

4 files changed

+10
-26
lines changed

4 files changed

+10
-26
lines changed

src/mito2/src/access_layer.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ impl FilePathProvider for WriteCachePathProvider {
278278
/// Path provider that builds paths in region storage path.
279279
#[derive(Clone, Debug)]
280280
pub(crate) struct RegionFilePathFactory {
281-
region_dir: String,
281+
pub(crate) region_dir: String,
282282
}
283283

284284
impl RegionFilePathFactory {

src/mito2/src/compaction/twcs.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -323,8 +323,7 @@ mod tests {
323323

324324
use super::*;
325325
use crate::compaction::test_util::{new_file_handle, new_file_handles};
326-
use crate::sst::file::{FileId, FileMeta, Level};
327-
use crate::test_util::NoopFilePurger;
326+
use crate::sst::file::{FileId, Level};
328327

329328
#[test]
330329
fn test_get_latest_window_in_seconds() {

src/mito2/src/sst/parquet.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ mod tests {
102102
use tokio_util::compat::FuturesAsyncWriteCompatExt;
103103

104104
use super::*;
105-
use crate::access_layer::FilePathProvider;
105+
use crate::access_layer::{FilePathProvider, RegionFilePathFactory};
106106
use crate::cache::{CacheManager, CacheStrategy, PageKey};
107107
use crate::read::BatchReader;
108108
use crate::sst::index::{Indexer, IndexerBuilder};
@@ -567,7 +567,7 @@ mod tests {
567567
..Default::default()
568568
};
569569

570-
let path_provider = RegionFilePathProvider {
570+
let path_provider = RegionFilePathFactory {
571571
region_dir: "test".to_string(),
572572
};
573573
let mut writer = ParquetWriter::new_with_object_store(

src/mito2/src/sst/parquet/writer.rs

+6-21
Original file line numberDiff line numberDiff line change
@@ -185,19 +185,6 @@ where
185185
Ok(())
186186
}
187187

188-
async fn get_or_create_indexer(&mut self) -> &mut Indexer {
189-
match self.current_indexer {
190-
None => {
191-
self.current_file = FileId::random();
192-
let indexer = self.indexer_builder.build(self.current_file).await;
193-
self.current_indexer = Some(indexer);
194-
// safety: self.current_indexer already set above.
195-
self.current_indexer.as_mut().unwrap()
196-
}
197-
Some(ref mut indexer) => indexer,
198-
}
199-
}
200-
201188
/// Iterates source and writes all rows to Parquet file.
202189
///
203190
/// Returns the [SstInfo] if the SST is written.
@@ -221,7 +208,11 @@ where
221208
Ok(mut batch) => {
222209
stats.update(&batch);
223210
// safety: self.current_indexer must be set when first batch has been written.
224-
self.current_indexer.as_mut().unwrap().update(&batch).await;
211+
self.current_indexer
212+
.as_mut()
213+
.unwrap()
214+
.update(&mut batch)
215+
.await;
225216
if let Some(max_file_size) = opts.max_file_size
226217
&& self.bytes_written.load(Ordering::Relaxed) > max_file_size
227218
{
@@ -239,8 +230,6 @@ where
239230

240231
self.finish_current_file(&mut results, &mut stats).await?;
241232

242-
let file_id = self.current_file;
243-
244233
// object_store.write will make sure all bytes are written or an error is raised.
245234
Ok(results)
246235
}
@@ -314,11 +303,7 @@ where
314303
.context(WriteParquetSnafu)?;
315304
self.writer = Some(arrow_writer);
316305

317-
let index_file_path = self.path_provider.build_index_file_path(self.current_file);
318-
let indexer = self
319-
.indexer_builder
320-
.build(self.current_file, index_file_path)
321-
.await;
306+
let indexer = self.indexer_builder.build(self.current_file).await;
322307
self.current_indexer = Some(indexer);
323308

324309
// safety: self.writer is assigned above

0 commit comments

Comments
 (0)