Skip to content

Commit 09e1b1e

Browse files
committed
test: add tests for manifest change notifications
1 parent e5cd860 commit 09e1b1e

9 files changed

+324
-32
lines changed

src/mito2/src/engine/alter_test.rs

+57-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,12 @@ use common_error::status_code::StatusCode;
2323
use common_recordbatch::RecordBatches;
2424
use datatypes::prelude::ConcreteDataType;
2525
use datatypes::schema::ColumnSchema;
26+
use futures::TryStreamExt;
27+
use log_store::kafka::log_store::KafkaLogStore;
28+
use rstest::rstest;
29+
use rstest_reuse::{self, apply};
30+
use store_api::logstore::provider::Provider;
31+
use store_api::logstore::LogStore;
2632
use store_api::metadata::ColumnMetadata;
2733
use store_api::region_engine::RegionEngine;
2834
use store_api::region_request::{
@@ -34,9 +40,12 @@ use crate::config::MitoConfig;
3440
use crate::engine::listener::AlterFlushListener;
3541
use crate::engine::MitoEngine;
3642
use crate::test_util::{
37-
build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
38-
TestEnv,
43+
build_rows, build_rows_for_key, flush_region, kafka_log_store_factory,
44+
prepare_test_for_kafka_log_store, put_rows, rows_schema, single_kafka_log_store_factory,
45+
CreateRequestBuilder, LogStoreFactory, TestEnv,
3946
};
47+
use crate::wal::entry_reader::decode_stream;
48+
use crate::wal::raw_entry_reader::flatten_stream;
4049

4150
async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
4251
let request = ScanRequest::default();
@@ -68,6 +77,52 @@ fn add_tag1() -> RegionAlterRequest {
6877
}
6978
}
7079

80+
#[apply(single_kafka_log_store_factory)]
81+
async fn test_alter_region_notification(factory: Option<LogStoreFactory>) {
82+
common_telemetry::init_default_ut_logging();
83+
let Some(factory) = factory else {
84+
return;
85+
};
86+
87+
let mut env =
88+
TestEnv::with_prefix("alter-notification").with_log_store_factory(factory.clone());
89+
let engine = env.create_engine(MitoConfig::default()).await;
90+
let region_id = RegionId::new(1, 1);
91+
let topic = prepare_test_for_kafka_log_store(&factory).await;
92+
let request = CreateRequestBuilder::new()
93+
.kafka_topic(topic.clone())
94+
.build();
95+
let column_schemas = rows_schema(&request);
96+
engine
97+
.handle_request(region_id, RegionRequest::Create(request))
98+
.await
99+
.unwrap();
100+
let rows = Rows {
101+
schema: column_schemas.clone(),
102+
rows: build_rows_for_key("a", 0, 3, 0),
103+
};
104+
put_rows(&engine, region_id, rows).await;
105+
let request = add_tag1();
106+
engine
107+
.handle_request(region_id, RegionRequest::Alter(request))
108+
.await
109+
.unwrap();
110+
111+
let topic = topic.unwrap();
112+
let log_store = env.log_store().unwrap().into_kafka_log_store();
113+
let provider = Provider::kafka_provider(topic);
114+
let stream = log_store.read(&provider, 0, None).await.unwrap();
115+
let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
116+
.try_collect::<Vec<_>>()
117+
.await
118+
.unwrap();
119+
120+
// Flush sst notification
121+
assert_eq!(entries[1].1.mutations[0].op_type(), api::v1::OpType::Notify);
122+
// Modify table metadata notification
123+
assert_eq!(entries[2].1.mutations[0].op_type(), api::v1::OpType::Notify);
124+
}
125+
71126
#[tokio::test]
72127
async fn test_alter_region() {
73128
common_telemetry::init_default_ut_logging();

src/mito2/src/engine/compaction_test.rs

+71-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ use api::v1::{ColumnSchema, Rows};
1919
use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
2020
use datatypes::prelude::ScalarVector;
2121
use datatypes::vectors::TimestampMillisecondVector;
22+
use futures::TryStreamExt;
23+
use log_store::kafka::log_store::KafkaLogStore;
24+
use rstest::rstest;
25+
use rstest_reuse::{self, apply};
26+
use store_api::logstore::provider::Provider;
27+
use store_api::logstore::LogStore;
2228
use store_api::region_engine::RegionEngine;
2329
use store_api::region_request::{
2430
RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest,
@@ -30,8 +36,12 @@ use crate::config::MitoConfig;
3036
use crate::engine::listener::CompactionListener;
3137
use crate::engine::MitoEngine;
3238
use crate::test_util::{
33-
build_rows_for_key, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv,
39+
build_rows_for_key, column_metadata_to_column_schema, kafka_log_store_factory,
40+
prepare_test_for_kafka_log_store, put_rows, single_kafka_log_store_factory,
41+
CreateRequestBuilder, LogStoreFactory, TestEnv,
3442
};
43+
use crate::wal::entry_reader::decode_stream;
44+
use crate::wal::raw_entry_reader::flatten_stream;
3545

3646
async fn put_and_flush(
3747
engine: &MitoEngine,
@@ -105,6 +115,66 @@ async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec<i64> {
105115
res
106116
}
107117

118+
#[apply(single_kafka_log_store_factory)]
119+
async fn test_compaction_region_notification(factory: Option<LogStoreFactory>) {
120+
common_telemetry::init_default_ut_logging();
121+
let Some(factory) = factory else {
122+
return;
123+
};
124+
125+
let mut env =
126+
TestEnv::with_prefix("compaction_notification").with_log_store_factory(factory.clone());
127+
let engine = env.create_engine(MitoConfig::default()).await;
128+
let topic = prepare_test_for_kafka_log_store(&factory).await;
129+
let region_id = RegionId::new(1, 1);
130+
let request = CreateRequestBuilder::new()
131+
.kafka_topic(topic.clone())
132+
.insert_option("compaction.type", "twcs")
133+
.insert_option("compaction.twcs.max_active_window_runs", "1")
134+
.insert_option("compaction.twcs.max_inactive_window_runs", "1")
135+
.build();
136+
137+
let column_schemas = request
138+
.column_metadatas
139+
.iter()
140+
.map(column_metadata_to_column_schema)
141+
.collect::<Vec<_>>();
142+
engine
143+
.handle_request(region_id, RegionRequest::Create(request))
144+
.await
145+
.unwrap();
146+
// Flush 5 SSTs for compaction.
147+
put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
148+
put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
149+
put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
150+
delete_and_flush(&engine, region_id, &column_schemas, 15..30).await;
151+
put_and_flush(&engine, region_id, &column_schemas, 15..25).await;
152+
153+
let result = engine
154+
.handle_request(
155+
region_id,
156+
RegionRequest::Compact(RegionCompactRequest::default()),
157+
)
158+
.await
159+
.unwrap();
160+
assert_eq!(result.affected_rows, 0);
161+
162+
let topic = topic.unwrap();
163+
let log_store = env.log_store().unwrap().into_kafka_log_store();
164+
let provider = Provider::kafka_provider(topic);
165+
let stream = log_store.read(&provider, 0, None).await.unwrap();
166+
let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
167+
.try_collect::<Vec<_>>()
168+
.await
169+
.unwrap();
170+
171+
let notifications = entries
172+
.into_iter()
173+
.filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify))
174+
.count();
175+
assert_eq!(notifications, 6);
176+
}
177+
108178
#[tokio::test]
109179
async fn test_compaction_region() {
110180
common_telemetry::init_default_ut_logging();

src/mito2/src/engine/edit_region_test.rs

+69-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,13 @@ use std::sync::{Arc, Mutex};
1616
use std::time::Duration;
1717

1818
use common_time::util::current_time_millis;
19+
use futures::TryStreamExt;
20+
use log_store::kafka::log_store::KafkaLogStore;
1921
use object_store::ObjectStore;
22+
use rstest::rstest;
23+
use rstest_reuse::{self, apply};
24+
use store_api::logstore::provider::Provider;
25+
use store_api::logstore::LogStore;
2026
use store_api::region_engine::RegionEngine;
2127
use store_api::region_request::RegionRequest;
2228
use store_api::storage::RegionId;
@@ -29,7 +35,69 @@ use crate::engine::MitoEngine;
2935
use crate::manifest::action::RegionEdit;
3036
use crate::region::MitoRegionRef;
3137
use crate::sst::file::{FileId, FileMeta};
32-
use crate::test_util::{CreateRequestBuilder, TestEnv};
38+
use crate::test_util::{
39+
kafka_log_store_factory, prepare_test_for_kafka_log_store, single_kafka_log_store_factory,
40+
CreateRequestBuilder, LogStoreFactory, TestEnv,
41+
};
42+
use crate::wal::entry_reader::decode_stream;
43+
use crate::wal::raw_entry_reader::flatten_stream;
44+
45+
#[apply(single_kafka_log_store_factory)]
46+
async fn test_edit_region_notification(factory: Option<LogStoreFactory>) {
47+
common_telemetry::init_default_ut_logging();
48+
let Some(factory) = factory else {
49+
return;
50+
};
51+
52+
let mut env = TestEnv::with_prefix("edit-notification").with_log_store_factory(factory.clone());
53+
let engine = env.create_engine(MitoConfig::default()).await;
54+
let topic = prepare_test_for_kafka_log_store(&factory).await;
55+
let region_id = RegionId::new(1, 1);
56+
let request = CreateRequestBuilder::new()
57+
.kafka_topic(topic.clone())
58+
.build();
59+
engine
60+
.handle_request(region_id, RegionRequest::Create(request))
61+
.await
62+
.unwrap();
63+
let region = engine.get_region(region_id).unwrap();
64+
let file_id = FileId::random();
65+
// Simulating the ingestion of an SST file.
66+
env.get_object_store()
67+
.unwrap()
68+
.write(
69+
&format!("{}/{}.parquet", region.region_dir(), file_id),
70+
b"x".as_slice(),
71+
)
72+
.await
73+
.unwrap();
74+
let edit = RegionEdit {
75+
files_to_add: vec![FileMeta {
76+
region_id: region.region_id,
77+
file_id,
78+
level: 0,
79+
..Default::default()
80+
}],
81+
files_to_remove: vec![],
82+
compaction_time_window: None,
83+
flushed_entry_id: None,
84+
flushed_sequence: None,
85+
};
86+
engine.edit_region(region.region_id, edit).await.unwrap();
87+
let topic = topic.unwrap();
88+
let log_store = env.log_store().unwrap().into_kafka_log_store();
89+
let provider = Provider::kafka_provider(topic);
90+
let stream = log_store.read(&provider, 0, None).await.unwrap();
91+
let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
92+
.try_collect::<Vec<_>>()
93+
.await
94+
.unwrap();
95+
let notifications = entries
96+
.into_iter()
97+
.filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify))
98+
.count();
99+
assert_eq!(notifications, 1);
100+
}
33101

34102
#[tokio::test]
35103
async fn test_edit_region_schedule_compaction() {

src/mito2/src/engine/flush_test.rs

+53-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use common_time::util::current_time_millis;
2424
use common_wal::options::WAL_OPTIONS_KEY;
2525
use rstest::rstest;
2626
use rstest_reuse::{self, apply};
27+
use store_api::logstore::LogStore;
2728
use store_api::region_engine::RegionEngine;
2829
use store_api::region_request::RegionRequest;
2930
use store_api::storage::{RegionId, ScanRequest};
@@ -33,8 +34,8 @@ use crate::engine::listener::{FlushListener, StallListener};
3334
use crate::test_util::{
3435
build_rows, build_rows_for_key, flush_region, kafka_log_store_factory,
3536
multiple_log_store_factories, prepare_test_for_kafka_log_store, put_rows,
36-
raft_engine_log_store_factory, reopen_region, rows_schema, CreateRequestBuilder,
37-
LogStoreFactory, MockWriteBufferManager, TestEnv,
37+
raft_engine_log_store_factory, reopen_region, rows_schema, single_kafka_log_store_factory,
38+
CreateRequestBuilder, LogStoreFactory, MockWriteBufferManager, TestEnv,
3839
};
3940
use crate::time_provider::TimeProvider;
4041
use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS;
@@ -247,7 +248,7 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
247248
return;
248249
};
249250

250-
let mut env = TestEnv::new().with_log_store_factory(factory.clone());
251+
let mut env = TestEnv::with_prefix("flush-reopen").with_log_store_factory(factory.clone());
251252
let engine = env.create_engine(MitoConfig::default()).await;
252253

253254
let region_id = RegionId::new(1, 1);
@@ -305,6 +306,55 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
305306
assert_eq!(5, version_data.committed_sequence);
306307
}
307308

309+
#[apply(single_kafka_log_store_factory)]
310+
async fn test_flush_notification(factory: Option<LogStoreFactory>) {
311+
use futures::TryStreamExt;
312+
use log_store::kafka::log_store::KafkaLogStore;
313+
use store_api::logstore::provider::Provider;
314+
315+
use crate::wal::entry_reader::decode_stream;
316+
use crate::wal::raw_entry_reader::flatten_stream;
317+
318+
common_telemetry::init_default_ut_logging();
319+
let Some(factory) = factory else {
320+
return;
321+
};
322+
323+
let mut env =
324+
TestEnv::with_prefix("flush-notification").with_log_store_factory(factory.clone());
325+
let engine = env.create_engine(MitoConfig::default()).await;
326+
let region_id = RegionId::new(1, 1);
327+
let topic = prepare_test_for_kafka_log_store(&factory).await;
328+
let request = CreateRequestBuilder::new()
329+
.kafka_topic(topic.clone())
330+
.build();
331+
let column_schemas = rows_schema(&request);
332+
engine
333+
.handle_request(region_id, RegionRequest::Create(request))
334+
.await
335+
.unwrap();
336+
let rows = Rows {
337+
schema: column_schemas.clone(),
338+
rows: build_rows_for_key("a", 0, 3, 0),
339+
};
340+
put_rows(&engine, region_id, rows).await;
341+
flush_region(&engine, region_id, None).await;
342+
343+
let topic = topic.unwrap();
344+
let log_store = env.log_store().unwrap().into_kafka_log_store();
345+
let provider = Provider::kafka_provider(topic);
346+
let stream = log_store.read(&provider, 0, None).await.unwrap();
347+
let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
348+
.try_collect::<Vec<_>>()
349+
.await
350+
.unwrap();
351+
let notifications = entries
352+
.into_iter()
353+
.filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify))
354+
.count();
355+
assert_eq!(notifications, 1);
356+
}
357+
308358
#[derive(Debug)]
309359
pub(crate) struct MockTimeProvider {
310360
now: AtomicI64,

src/mito2/src/manifest_notifier.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl ManifestNotifier {
7070
pub(crate) fn push_notification(&mut self, version: ManifestVersion) {
7171
self.wal_entry.mutations.push(Mutation {
7272
op_type: OpType::Notify.into(),
73-
sequence: self.next_entry_id,
73+
sequence: self.next_sequence,
7474
rows: None,
7575
manifest_notification: Some(api::v1::ManifestNotification { version }),
7676
});
@@ -99,6 +99,6 @@ impl ManifestNotifier {
9999

100100
pub(crate) fn finish(&mut self) {
101101
self.version_control
102-
.set_sequence_and_entry_id(self.next_sequence - 1, self.next_sequence - 1);
102+
.set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
103103
}
104104
}

0 commit comments

Comments
 (0)