diff --git a/Cargo.lock b/Cargo.lock
index 47b22a44d7d3..82d4569601b4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -4271,7 +4271,7 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b"
 [[package]]
 name = "greptime-proto"
 version = "0.1.0"
-source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=c437b55725b7f5224fe9d46db21072b4a682ee4b#c437b55725b7f5224fe9d46db21072b4a682ee4b"
+source = "git+https://github.com/GreptimeTeam/greptime-proto.git?rev=b27d5d0f6b88b0416ee622623f25b99b81a742ac#b27d5d0f6b88b0416ee622623f25b99b81a742ac"
 dependencies = [
  "prost 0.12.6",
  "serde",
diff --git a/Cargo.toml b/Cargo.toml
index b596558713bc..3e28dbed25c7 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -120,7 +120,7 @@ etcd-client = { version = "0.13" }
 fst = "0.4.7"
 futures = "0.3"
 futures-util = "0.3"
-greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "c437b55725b7f5224fe9d46db21072b4a682ee4b" }
+greptime-proto = { git = "https://github.com/GreptimeTeam/greptime-proto.git", rev = "b27d5d0f6b88b0416ee622623f25b99b81a742ac" }
 humantime = "2.1"
 humantime-serde = "1.1"
 itertools = "0.10"
diff --git a/src/mito2/benches/memtable_bench.rs b/src/mito2/benches/memtable_bench.rs
index 4309520cdd0c..1252c11633d0 100644
--- a/src/mito2/benches/memtable_bench.rs
+++ b/src/mito2/benches/memtable_bench.rs
@@ -266,6 +266,7 @@ impl CpuDataGenerator {
                 schema: self.column_schemas.clone(),
                 rows,
             }),
+            manifest_notification: None,
         };
 
         KeyValues::new(&self.metadata, mutation).unwrap()
diff --git a/src/mito2/src/compaction/compactor.rs b/src/mito2/src/compaction/compactor.rs
index 9c8c0e02bd1e..1a90efc305a1 100644
--- a/src/mito2/src/compaction/compactor.rs
+++ b/src/mito2/src/compaction/compactor.rs
@@ -21,6 +21,7 @@ use object_store::manager::ObjectStoreManagerRef;
 use serde::{Deserialize, Serialize};
 use smallvec::SmallVec;
 use snafu::{OptionExt, ResultExt};
+use store_api::manifest::ManifestVersion;
 use store_api::metadata::RegionMetadataRef;
 use store_api::storage::RegionId;
 
@@ -217,7 +218,7 @@ pub trait Compactor: Send + Sync + 'static {
         &self,
         compaction_region: &CompactionRegion,
         merge_output: MergeOutput,
-    ) -> Result<RegionEdit>;
+    ) -> Result<(ManifestVersion, RegionEdit)>;
 
     /// Execute compaction for a region.
     async fn compact(
@@ -364,7 +365,7 @@ impl Compactor for DefaultCompactor {
         &self,
         compaction_region: &CompactionRegion,
         merge_output: MergeOutput,
-    ) -> Result<RegionEdit> {
+    ) -> Result<(ManifestVersion, RegionEdit)> {
         // Write region edit to manifest.
         let edit = RegionEdit {
             files_to_add: merge_output.files_to_add,
@@ -378,12 +379,12 @@ impl Compactor for DefaultCompactor {
 
         let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
         // TODO: We might leak files if we fail to update manifest. We can add a cleanup task to remove them later.
-        compaction_region
+        let version = compaction_region
             .manifest_ctx
             .update_manifest(Writable, action_list)
             .await?;
 
-        Ok(edit)
+        Ok((version, edit))
     }
 
     // The default implementation of compact combines the merge_ssts and update_manifest functions.
diff --git a/src/mito2/src/compaction/task.rs b/src/mito2/src/compaction/task.rs
index c76595097753..b0cb29a2d57e 100644
--- a/src/mito2/src/compaction/task.rs
+++ b/src/mito2/src/compaction/task.rs
@@ -18,6 +18,7 @@ use std::time::Instant;
 
 use common_telemetry::{error, info};
 use snafu::ResultExt;
+use store_api::manifest::ManifestVersion;
 use tokio::sync::mpsc;
 
 use crate::compaction::compactor::{CompactionRegion, Compactor};
@@ -77,7 +78,7 @@ impl CompactionTaskImpl {
             .for_each(|o| o.inputs.iter().for_each(|f| f.set_compacting(compacting)));
     }
 
-    async fn handle_compaction(&mut self) -> error::Result<RegionEdit> {
+    async fn handle_compaction(&mut self) -> error::Result<(ManifestVersion, RegionEdit)> {
         self.mark_files_compacting(true);
 
         let merge_timer = COMPACTION_STAGE_ELAPSED
@@ -146,12 +147,15 @@ impl CompactionTaskImpl {
 impl CompactionTask for CompactionTaskImpl {
     async fn run(&mut self) {
         let notify = match self.handle_compaction().await {
-            Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
-                region_id: self.compaction_region.region_id,
-                senders: std::mem::take(&mut self.waiters),
-                start_time: self.start_time,
-                edit,
-            }),
+            Ok((manifest_version, edit)) => {
+                BackgroundNotify::CompactionFinished(CompactionFinished {
+                    region_id: self.compaction_region.region_id,
+                    senders: std::mem::take(&mut self.waiters),
+                    start_time: self.start_time,
+                    edit,
+                    manifest_version,
+                })
+            }
             Err(e) => {
                 error!(e; "Failed to compact region, region id: {}", self.compaction_region.region_id);
                 let err = Arc::new(e);
diff --git a/src/mito2/src/engine.rs b/src/mito2/src/engine.rs
index 62912b8ffb10..4c3e48f03dd9 100644
--- a/src/mito2/src/engine.rs
+++ b/src/mito2/src/engine.rs
@@ -74,6 +74,7 @@ use object_store::manager::ObjectStoreManagerRef;
 use snafu::{ensure, OptionExt, ResultExt};
 use store_api::logstore::provider::Provider;
 use store_api::logstore::LogStore;
+use store_api::manifest::ManifestVersion;
 use store_api::metadata::RegionMetadataRef;
 use store_api::region_engine::{
     BatchResponses, RegionEngine, RegionRole, RegionScannerRef, SetReadonlyResponse,
@@ -184,7 +185,11 @@ impl MitoEngine {
     /// Now we only allow adding files to region (the [RegionEdit] struct can only contain a non-empty "files_to_add" field).
     /// Other region editing intention will result in an "invalid request" error.
     /// Also note that if a region is to be edited directly, we MUST not write data to it thereafter.
-    pub async fn edit_region(&self, region_id: RegionId, edit: RegionEdit) -> Result<()> {
+    pub async fn edit_region(
+        &self,
+        region_id: RegionId,
+        edit: RegionEdit,
+    ) -> Result<ManifestVersion> {
         let _timer = HANDLE_REQUEST_ELAPSED
             .with_label_values(&["edit_region"])
             .start_timer();
diff --git a/src/mito2/src/engine/alter_test.rs b/src/mito2/src/engine/alter_test.rs
index b48dc2ccfb08..146667ee7453 100644
--- a/src/mito2/src/engine/alter_test.rs
+++ b/src/mito2/src/engine/alter_test.rs
@@ -23,6 +23,12 @@ use common_error::status_code::StatusCode;
 use common_recordbatch::RecordBatches;
 use datatypes::prelude::ConcreteDataType;
 use datatypes::schema::ColumnSchema;
+use futures::TryStreamExt;
+use log_store::kafka::log_store::KafkaLogStore;
+use rstest::rstest;
+use rstest_reuse::{self, apply};
+use store_api::logstore::provider::Provider;
+use store_api::logstore::LogStore;
 use store_api::metadata::ColumnMetadata;
 use store_api::region_engine::RegionEngine;
 use store_api::region_request::{
@@ -34,9 +40,12 @@ use crate::config::MitoConfig;
 use crate::engine::listener::AlterFlushListener;
 use crate::engine::MitoEngine;
 use crate::test_util::{
-    build_rows, build_rows_for_key, flush_region, put_rows, rows_schema, CreateRequestBuilder,
-    TestEnv,
+    build_rows, build_rows_for_key, flush_region, kafka_log_store_factory,
+    prepare_test_for_kafka_log_store, put_rows, rows_schema, single_kafka_log_store_factory,
+    CreateRequestBuilder, LogStoreFactory, TestEnv,
 };
+use crate::wal::entry_reader::decode_stream;
+use crate::wal::raw_entry_reader::flatten_stream;
 
 async fn scan_check_after_alter(engine: &MitoEngine, region_id: RegionId, expected: &str) {
     let request = ScanRequest::default();
@@ -68,6 +77,52 @@ fn add_tag1() -> RegionAlterRequest {
     }
 }
 
+#[apply(single_kafka_log_store_factory)]
+async fn test_alter_region_notification(factory: Option<LogStoreFactory>) {
+    common_telemetry::init_default_ut_logging();
+    let Some(factory) = factory else {
+        return;
+    };
+
+    let mut env =
+        TestEnv::with_prefix("alter-notification").with_log_store_factory(factory.clone());
+    let engine = env.create_engine(MitoConfig::default()).await;
+    let region_id = RegionId::new(1, 1);
+    let topic = prepare_test_for_kafka_log_store(&factory).await;
+    let request = CreateRequestBuilder::new()
+        .kafka_topic(topic.clone())
+        .build();
+    let column_schemas = rows_schema(&request);
+    engine
+        .handle_request(region_id, RegionRequest::Create(request))
+        .await
+        .unwrap();
+    let rows = Rows {
+        schema: column_schemas.clone(),
+        rows: build_rows_for_key("a", 0, 3, 0),
+    };
+    put_rows(&engine, region_id, rows).await;
+    let request = add_tag1();
+    engine
+        .handle_request(region_id, RegionRequest::Alter(request))
+        .await
+        .unwrap();
+
+    let topic = topic.unwrap();
+    let log_store = env.log_store().unwrap().into_kafka_log_store();
+    let provider = Provider::kafka_provider(topic);
+    let stream = log_store.read(&provider, 0, None).await.unwrap();
+    let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
+        .try_collect::<Vec<_>>()
+        .await
+        .unwrap();
+
+    // Flush sst notification
+    assert_eq!(entries[1].1.mutations[0].op_type(), api::v1::OpType::Notify);
+    // Modify table metadata notification
+    assert_eq!(entries[2].1.mutations[0].op_type(), api::v1::OpType::Notify);
+}
+
 #[tokio::test]
 async fn test_alter_region() {
     common_telemetry::init_default_ut_logging();
diff --git a/src/mito2/src/engine/compaction_test.rs b/src/mito2/src/engine/compaction_test.rs
index e19f95088c46..7f376af52c29 100644
--- a/src/mito2/src/engine/compaction_test.rs
+++ b/src/mito2/src/engine/compaction_test.rs
@@ -19,6 +19,12 @@ use api::v1::{ColumnSchema, Rows};
 use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
 use datatypes::prelude::ScalarVector;
 use datatypes::vectors::TimestampMillisecondVector;
+use futures::TryStreamExt;
+use log_store::kafka::log_store::KafkaLogStore;
+use rstest::rstest;
+use rstest_reuse::{self, apply};
+use store_api::logstore::provider::Provider;
+use store_api::logstore::LogStore;
 use store_api::region_engine::RegionEngine;
 use store_api::region_request::{
     RegionCompactRequest, RegionDeleteRequest, RegionFlushRequest, RegionRequest,
@@ -30,8 +36,12 @@ use crate::config::MitoConfig;
 use crate::engine::listener::CompactionListener;
 use crate::engine::MitoEngine;
 use crate::test_util::{
-    build_rows_for_key, column_metadata_to_column_schema, put_rows, CreateRequestBuilder, TestEnv,
+    build_rows_for_key, column_metadata_to_column_schema, kafka_log_store_factory,
+    prepare_test_for_kafka_log_store, put_rows, single_kafka_log_store_factory,
+    CreateRequestBuilder, LogStoreFactory, TestEnv,
 };
+use crate::wal::entry_reader::decode_stream;
+use crate::wal::raw_entry_reader::flatten_stream;
 
 async fn put_and_flush(
     engine: &MitoEngine,
@@ -105,6 +115,66 @@ async fn collect_stream_ts(stream: SendableRecordBatchStream) -> Vec<i64> {
     res
 }
 
+#[apply(single_kafka_log_store_factory)]
+async fn test_compaction_region_notification(factory: Option<LogStoreFactory>) {
+    common_telemetry::init_default_ut_logging();
+    let Some(factory) = factory else {
+        return;
+    };
+
+    let mut env =
+        TestEnv::with_prefix("compaction_notification").with_log_store_factory(factory.clone());
+    let engine = env.create_engine(MitoConfig::default()).await;
+    let topic = prepare_test_for_kafka_log_store(&factory).await;
+    let region_id = RegionId::new(1, 1);
+    let request = CreateRequestBuilder::new()
+        .kafka_topic(topic.clone())
+        .insert_option("compaction.type", "twcs")
+        .insert_option("compaction.twcs.max_active_window_runs", "1")
+        .insert_option("compaction.twcs.max_inactive_window_runs", "1")
+        .build();
+
+    let column_schemas = request
+        .column_metadatas
+        .iter()
+        .map(column_metadata_to_column_schema)
+        .collect::<Vec<_>>();
+    engine
+        .handle_request(region_id, RegionRequest::Create(request))
+        .await
+        .unwrap();
+    // Flush 5 SSTs for compaction.
+    put_and_flush(&engine, region_id, &column_schemas, 0..10).await;
+    put_and_flush(&engine, region_id, &column_schemas, 10..20).await;
+    put_and_flush(&engine, region_id, &column_schemas, 20..30).await;
+    delete_and_flush(&engine, region_id, &column_schemas, 15..30).await;
+    put_and_flush(&engine, region_id, &column_schemas, 15..25).await;
+
+    let result = engine
+        .handle_request(
+            region_id,
+            RegionRequest::Compact(RegionCompactRequest::default()),
+        )
+        .await
+        .unwrap();
+    assert_eq!(result.affected_rows, 0);
+
+    let topic = topic.unwrap();
+    let log_store = env.log_store().unwrap().into_kafka_log_store();
+    let provider = Provider::kafka_provider(topic);
+    let stream = log_store.read(&provider, 0, None).await.unwrap();
+    let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
+        .try_collect::<Vec<_>>()
+        .await
+        .unwrap();
+
+    let notifications = entries
+        .into_iter()
+        .filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify))
+        .count();
+    assert_eq!(notifications, 6);
+}
+
 #[tokio::test]
 async fn test_compaction_region() {
     common_telemetry::init_default_ut_logging();
diff --git a/src/mito2/src/engine/edit_region_test.rs b/src/mito2/src/engine/edit_region_test.rs
index 51f2a976b343..b792183e2f31 100644
--- a/src/mito2/src/engine/edit_region_test.rs
+++ b/src/mito2/src/engine/edit_region_test.rs
@@ -16,7 +16,13 @@ use std::sync::{Arc, Mutex};
 use std::time::Duration;
 
 use common_time::util::current_time_millis;
+use futures::TryStreamExt;
+use log_store::kafka::log_store::KafkaLogStore;
 use object_store::ObjectStore;
+use rstest::rstest;
+use rstest_reuse::{self, apply};
+use store_api::logstore::provider::Provider;
+use store_api::logstore::LogStore;
 use store_api::region_engine::RegionEngine;
 use store_api::region_request::RegionRequest;
 use store_api::storage::RegionId;
@@ -29,7 +35,69 @@ use crate::engine::MitoEngine;
 use crate::manifest::action::RegionEdit;
 use crate::region::MitoRegionRef;
 use crate::sst::file::{FileId, FileMeta};
-use crate::test_util::{CreateRequestBuilder, TestEnv};
+use crate::test_util::{
+    kafka_log_store_factory, prepare_test_for_kafka_log_store, single_kafka_log_store_factory,
+    CreateRequestBuilder, LogStoreFactory, TestEnv,
+};
+use crate::wal::entry_reader::decode_stream;
+use crate::wal::raw_entry_reader::flatten_stream;
+
+#[apply(single_kafka_log_store_factory)]
+async fn test_edit_region_notification(factory: Option<LogStoreFactory>) {
+    common_telemetry::init_default_ut_logging();
+    let Some(factory) = factory else {
+        return;
+    };
+
+    let mut env = TestEnv::with_prefix("edit-notification").with_log_store_factory(factory.clone());
+    let engine = env.create_engine(MitoConfig::default()).await;
+    let topic = prepare_test_for_kafka_log_store(&factory).await;
+    let region_id = RegionId::new(1, 1);
+    let request = CreateRequestBuilder::new()
+        .kafka_topic(topic.clone())
+        .build();
+    engine
+        .handle_request(region_id, RegionRequest::Create(request))
+        .await
+        .unwrap();
+    let region = engine.get_region(region_id).unwrap();
+    let file_id = FileId::random();
+    // Simulating the ingestion of an SST file.
+    env.get_object_store()
+        .unwrap()
+        .write(
+            &format!("{}/{}.parquet", region.region_dir(), file_id),
+            b"x".as_slice(),
+        )
+        .await
+        .unwrap();
+    let edit = RegionEdit {
+        files_to_add: vec![FileMeta {
+            region_id: region.region_id,
+            file_id,
+            level: 0,
+            ..Default::default()
+        }],
+        files_to_remove: vec![],
+        compaction_time_window: None,
+        flushed_entry_id: None,
+        flushed_sequence: None,
+    };
+    engine.edit_region(region.region_id, edit).await.unwrap();
+    let topic = topic.unwrap();
+    let log_store = env.log_store().unwrap().into_kafka_log_store();
+    let provider = Provider::kafka_provider(topic);
+    let stream = log_store.read(&provider, 0, None).await.unwrap();
+    let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
+        .try_collect::<Vec<_>>()
+        .await
+        .unwrap();
+    let notifications = entries
+        .into_iter()
+        .filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify))
+        .count();
+    assert_eq!(notifications, 1);
+}
 
 #[tokio::test]
 async fn test_edit_region_schedule_compaction() {
diff --git a/src/mito2/src/engine/flush_test.rs b/src/mito2/src/engine/flush_test.rs
index aac02db91ef2..5bd78a655f8c 100644
--- a/src/mito2/src/engine/flush_test.rs
+++ b/src/mito2/src/engine/flush_test.rs
@@ -24,6 +24,7 @@ use common_time::util::current_time_millis;
 use common_wal::options::WAL_OPTIONS_KEY;
 use rstest::rstest;
 use rstest_reuse::{self, apply};
+use store_api::logstore::LogStore;
 use store_api::region_engine::RegionEngine;
 use store_api::region_request::RegionRequest;
 use store_api::storage::{RegionId, ScanRequest};
@@ -33,8 +34,8 @@ use crate::engine::listener::{FlushListener, StallListener};
 use crate::test_util::{
     build_rows, build_rows_for_key, flush_region, kafka_log_store_factory,
     multiple_log_store_factories, prepare_test_for_kafka_log_store, put_rows,
-    raft_engine_log_store_factory, reopen_region, rows_schema, CreateRequestBuilder,
-    LogStoreFactory, MockWriteBufferManager, TestEnv,
+    raft_engine_log_store_factory, reopen_region, rows_schema, single_kafka_log_store_factory,
+    CreateRequestBuilder, LogStoreFactory, MockWriteBufferManager, TestEnv,
 };
 use crate::time_provider::TimeProvider;
 use crate::worker::MAX_INITIAL_CHECK_DELAY_SECS;
@@ -247,7 +248,7 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
         return;
     };
 
-    let mut env = TestEnv::new().with_log_store_factory(factory.clone());
+    let mut env = TestEnv::with_prefix("flush-reopen").with_log_store_factory(factory.clone());
     let engine = env.create_engine(MitoConfig::default()).await;
 
     let region_id = RegionId::new(1, 1);
@@ -273,8 +274,13 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
     let check_region = || {
         let region = engine.get_region(region_id).unwrap();
         let version_data = region.version_control.current();
-        assert_eq!(1, version_data.last_entry_id);
-        assert_eq!(3, version_data.committed_sequence);
+        if factory.is_kafka() {
+            assert_eq!(2, version_data.last_entry_id);
+            assert_eq!(4, version_data.committed_sequence);
+        } else {
+            assert_eq!(1, version_data.last_entry_id);
+            assert_eq!(3, version_data.committed_sequence);
+        }
         assert_eq!(1, version_data.version.flushed_entry_id);
         assert_eq!(3, version_data.version.flushed_sequence);
     };
@@ -301,8 +307,62 @@ async fn test_flush_reopen_region(factory: Option<LogStoreFactory>) {
     put_rows(&engine, region_id, rows).await;
     let region = engine.get_region(region_id).unwrap();
     let version_data = region.version_control.current();
-    assert_eq!(2, version_data.last_entry_id);
-    assert_eq!(5, version_data.committed_sequence);
+    if factory.is_kafka() {
+        assert_eq!(3, version_data.last_entry_id);
+        assert_eq!(6, version_data.committed_sequence);
+    } else {
+        assert_eq!(2, version_data.last_entry_id);
+        assert_eq!(5, version_data.committed_sequence);
+    }
+}
+
+#[apply(single_kafka_log_store_factory)]
+async fn test_flush_notification(factory: Option<LogStoreFactory>) {
+    use futures::TryStreamExt;
+    use log_store::kafka::log_store::KafkaLogStore;
+    use store_api::logstore::provider::Provider;
+
+    use crate::wal::entry_reader::decode_stream;
+    use crate::wal::raw_entry_reader::flatten_stream;
+
+    common_telemetry::init_default_ut_logging();
+    let Some(factory) = factory else {
+        return;
+    };
+
+    let mut env =
+        TestEnv::with_prefix("flush-notification").with_log_store_factory(factory.clone());
+    let engine = env.create_engine(MitoConfig::default()).await;
+    let region_id = RegionId::new(1, 1);
+    let topic = prepare_test_for_kafka_log_store(&factory).await;
+    let request = CreateRequestBuilder::new()
+        .kafka_topic(topic.clone())
+        .build();
+    let column_schemas = rows_schema(&request);
+    engine
+        .handle_request(region_id, RegionRequest::Create(request))
+        .await
+        .unwrap();
+    let rows = Rows {
+        schema: column_schemas.clone(),
+        rows: build_rows_for_key("a", 0, 3, 0),
+    };
+    put_rows(&engine, region_id, rows).await;
+    flush_region(&engine, region_id, None).await;
+
+    let topic = topic.unwrap();
+    let log_store = env.log_store().unwrap().into_kafka_log_store();
+    let provider = Provider::kafka_provider(topic);
+    let stream = log_store.read(&provider, 0, None).await.unwrap();
+    let entries = decode_stream(flatten_stream::<KafkaLogStore>(stream, provider.clone()))
+        .try_collect::<Vec<_>>()
+        .await
+        .unwrap();
+    let notifications = entries
+        .into_iter()
+        .filter(|(_, entry)| matches!(entry.mutations[0].op_type(), api::v1::OpType::Notify))
+        .count();
+    assert_eq!(notifications, 1);
 }
 
 #[derive(Debug)]
diff --git a/src/mito2/src/error.rs b/src/mito2/src/error.rs
index 2d045e8c579a..842801f56d00 100644
--- a/src/mito2/src/error.rs
+++ b/src/mito2/src/error.rs
@@ -491,6 +491,13 @@ pub enum Error {
         location: Location,
     },
 
+    #[snafu(display("Region {} is readonly", region_id))]
+    ReadonlyRegion {
+        region_id: RegionId,
+        #[snafu(implicit)]
+        location: Location,
+    },
+
     #[snafu(display("Invalid options"))]
     JsonOptions {
         #[snafu(source)]
@@ -955,6 +962,7 @@ impl ErrorExt for Error {
             CompatReader { .. } => StatusCode::Unexpected,
             InvalidRegionRequest { source, .. } => source.status_code(),
             RegionState { .. } => StatusCode::RegionNotReady,
+            ReadonlyRegion { .. } => StatusCode::Unexpected,
             JsonOptions { .. } => StatusCode::InvalidArguments,
             EmptyRegionDir { .. } | EmptyManifestDir { .. } => StatusCode::RegionNotFound,
             ArrowReader { .. } => StatusCode::StorageUnavailable,
diff --git a/src/mito2/src/flush.rs b/src/mito2/src/flush.rs
index 05561b6080ff..16ca20844eb4 100644
--- a/src/mito2/src/flush.rs
+++ b/src/mito2/src/flush.rs
@@ -21,6 +21,7 @@ use std::sync::Arc;
 use common_telemetry::{debug, error, info};
 use smallvec::SmallVec;
 use snafu::ResultExt;
+use store_api::manifest::ManifestVersion;
 use store_api::storage::RegionId;
 use strum::IntoStaticStr;
 use tokio::sync::{mpsc, watch};
@@ -269,7 +270,7 @@ impl RegionFlushTask {
         self.listener.on_flush_begin(self.region_id).await;
 
         let worker_request = match self.flush_memtables(&version_data).await {
-            Ok(edit) => {
+            Ok((manifest_version, edit)) => {
                 let memtables_to_remove = version_data
                     .version
                     .memtables
@@ -284,6 +285,7 @@ impl RegionFlushTask {
                     senders: std::mem::take(&mut self.senders),
                     _timer: timer,
                     edit,
+                    manifest_version,
                     memtables_to_remove,
                 };
                 WorkerRequest::Background {
@@ -309,7 +311,10 @@ impl RegionFlushTask {
 
     /// Flushes memtables to level 0 SSTs and updates the manifest.
     /// Returns the [RegionEdit] to apply.
-    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
+    async fn flush_memtables(
+        &self,
+        version_data: &VersionControlData,
+    ) -> Result<(ManifestVersion, RegionEdit)> {
         // We must use the immutable memtables list and entry ids from the `version_data`
         // for consistency as others might already modify the version in the `version_control`.
         let version = &version_data.version;
@@ -409,11 +414,12 @@ impl RegionFlushTask {
         let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
         // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
         // add a cleanup job to remove them later.
-        self.manifest_ctx
+        let version = self
+            .manifest_ctx
             .update_manifest(RegionState::Writable, action_list)
             .await?;
 
-        Ok(edit)
+        Ok((version, edit))
     }
 
     /// Notify flush job status.
diff --git a/src/mito2/src/lib.rs b/src/mito2/src/lib.rs
index cd0cb3710223..655ccc0dbca6 100644
--- a/src/mito2/src/lib.rs
+++ b/src/mito2/src/lib.rs
@@ -31,6 +31,7 @@ pub mod engine;
 pub mod error;
 pub mod flush;
 pub mod manifest;
+mod manifest_notifier;
 pub mod memtable;
 mod metrics;
 pub mod read;
diff --git a/src/mito2/src/manifest_notifier.rs b/src/mito2/src/manifest_notifier.rs
new file mode 100644
index 000000000000..89d016c07c37
--- /dev/null
+++ b/src/mito2/src/manifest_notifier.rs
@@ -0,0 +1,104 @@
+// Copyright 2023 Greptime Team
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+use api::v1::{Mutation, OpType, WalEntry};
+use store_api::logstore::provider::Provider;
+use store_api::logstore::LogStore;
+use store_api::manifest::ManifestVersion;
+use store_api::storage::{RegionId, SequenceNumber};
+
+use crate::error::Result;
+use crate::region::version::{VersionControlData, VersionControlRef};
+use crate::wal::{EntryId, WalWriter};
+
+/// Used to notify follower regions to apply manifest changes.
+pub(crate) struct ManifestNotifier {
+    /// Id of region to write.
+    region_id: RegionId,
+    /// VersionControl of the region.
+    version_control: VersionControlRef,
+    /// Next sequence number to write.
+    ///
+    /// The context assigns a unique sequence number for each row.
+    next_sequence: SequenceNumber,
+    /// Next entry id of WAL to write.
+    next_entry_id: EntryId,
+    /// Valid WAL entry to write.
+    ///
+    /// We keep [WalEntry] instead of mutations to avoid taking mutations
+    /// out of the context to construct the wal entry when we write to the wal.
+    wal_entry: WalEntry,
+
+    /// Wal options of the region being written to.
+    provider: Provider,
+}
+
+impl ManifestNotifier {
+    pub(crate) fn new(
+        region_id: RegionId,
+        version_control: &VersionControlRef,
+        provider: Provider,
+    ) -> ManifestNotifier {
+        let VersionControlData {
+            committed_sequence,
+            last_entry_id,
+            ..
+        } = version_control.current();
+
+        ManifestNotifier {
+            region_id,
+            version_control: version_control.clone(),
+            next_sequence: committed_sequence + 1,
+            next_entry_id: last_entry_id + 1,
+            wal_entry: WalEntry::default(),
+            provider,
+        }
+    }
+
+    // Push manifest notification.
+    pub(crate) fn push_notification(&mut self, version: ManifestVersion) {
+        self.wal_entry.mutations.push(Mutation {
+            op_type: OpType::Notify.into(),
+            sequence: self.next_sequence,
+            rows: None,
+            manifest_notification: Some(api::v1::ManifestNotification { version }),
+        });
+        self.next_sequence += 1;
+    }
+
+    /// Encode and add WAL entry to the writer.
+    pub(crate) fn add_wal_entry<S: LogStore>(
+        &mut self,
+        wal_writer: &mut WalWriter<S>,
+    ) -> Result<()> {
+        wal_writer.add_entry(
+            self.region_id,
+            self.next_entry_id,
+            &self.wal_entry,
+            &self.provider,
+        )?;
+        self.next_entry_id += 1;
+        Ok(())
+    }
+
+    /// Updates next entry id.
+    pub(crate) fn set_next_entry_id(&mut self, next_entry_id: EntryId) {
+        self.next_entry_id = next_entry_id
+    }
+
+    pub(crate) fn finish(&mut self) {
+        self.version_control
+            .set_sequence_and_entry_id(self.next_sequence - 1, self.next_entry_id - 1);
+    }
+}
diff --git a/src/mito2/src/memtable/key_values.rs b/src/mito2/src/memtable/key_values.rs
index 59812b46191b..990043b9ceea 100644
--- a/src/mito2/src/memtable/key_values.rs
+++ b/src/mito2/src/memtable/key_values.rs
@@ -304,6 +304,7 @@ mod tests {
             op_type: OpType::Put as i32,
             sequence: START_SEQ,
             rows: Some(rows),
+            manifest_notification: None,
         }
     }
 
@@ -341,6 +342,7 @@ mod tests {
             op_type: OpType::Put as i32,
             sequence: 100,
             rows: None,
+            manifest_notification: None,
         };
         let kvs = KeyValues::new(&meta, mutation);
         assert!(kvs.is_none());
diff --git a/src/mito2/src/memtable/partition_tree.rs b/src/mito2/src/memtable/partition_tree.rs
index e320503886be..f2dcb8ab0ee7 100644
--- a/src/mito2/src/memtable/partition_tree.rs
+++ b/src/mito2/src/memtable/partition_tree.rs
@@ -718,6 +718,7 @@ mod tests {
                 schema: column_schema,
                 rows,
             }),
+            manifest_notification: None,
         };
         KeyValues::new(metadata.as_ref(), mutation).unwrap()
     }
diff --git a/src/mito2/src/memtable/time_series.rs b/src/mito2/src/memtable/time_series.rs
index 6d5fbb33a079..70ee217a5bd7 100644
--- a/src/mito2/src/memtable/time_series.rs
+++ b/src/mito2/src/memtable/time_series.rs
@@ -1182,6 +1182,7 @@ mod tests {
                 schema: column_schema,
                 rows,
             }),
+            manifest_notification: None,
         };
         KeyValues::new(schema.as_ref(), mutation).unwrap()
     }
diff --git a/src/mito2/src/region.rs b/src/mito2/src/region.rs
index 086fbef7d02f..cac6432901fd 100644
--- a/src/mito2/src/region.rs
+++ b/src/mito2/src/region.rs
@@ -27,6 +27,7 @@ use common_telemetry::{error, info, warn};
 use crossbeam_utils::atomic::AtomicCell;
 use snafu::{ensure, OptionExt};
 use store_api::logstore::provider::Provider;
+use store_api::manifest::ManifestVersion;
 use store_api::metadata::RegionMetadataRef;
 use store_api::storage::RegionId;
 
@@ -174,6 +175,11 @@ impl MitoRegion {
         self.manifest_ctx.state.load() == RegionState::Writable
     }
 
+    /// Returns whether the region is readonly
+    pub(crate) fn is_readonly(&self) -> bool {
+        self.manifest_ctx.state.load() == RegionState::ReadOnly
+    }
+
     /// Returns the state of the region.
     pub(crate) fn state(&self) -> RegionState {
         self.manifest_ctx.state.load()
@@ -315,11 +321,12 @@ impl ManifestContext {
     }
 
     /// Updates the manifest if current state is `expect_state`.
+    /// Returns the latest manifest version.
     pub(crate) async fn update_manifest(
         &self,
         expect_state: RegionState,
         action_list: RegionMetaActionList,
-    ) -> Result<()> {
+    ) -> Result<ManifestVersion> {
         // Acquires the write lock of the manifest manager.
         let mut manager = self.manifest_manager.write().await;
         // Gets current manifest.
@@ -372,7 +379,7 @@ impl ManifestContext {
         }
 
         // Now we can update the manifest.
-        manager.update(action_list).await.inspect_err(
+        let version = manager.update(action_list).await.inspect_err(
             |e| error!(e; "Failed to update manifest, region_id: {}", manifest.metadata.region_id),
         )?;
 
@@ -383,7 +390,7 @@ impl ManifestContext {
             );
         }
 
-        Ok(())
+        Ok(version)
     }
 }
 
diff --git a/src/mito2/src/region_write_ctx.rs b/src/mito2/src/region_write_ctx.rs
index e86ff77ca2f1..84e120a91544 100644
--- a/src/mito2/src/region_write_ctx.rs
+++ b/src/mito2/src/region_write_ctx.rs
@@ -137,6 +137,7 @@ impl RegionWriteCtx {
             op_type,
             sequence: self.next_sequence,
             rows,
+            manifest_notification: None,
         });
 
         let notify = WriteNotify::new(tx, num_rows);
@@ -150,6 +151,9 @@ impl RegionWriteCtx {
         match OpType::try_from(op_type) {
             Ok(OpType::Delete) => self.delete_num += num_rows,
             Ok(OpType::Put) => self.put_num += num_rows,
+            Ok(OpType::Notify) => {
+                self.next_sequence += 1;
+            }
             Err(_) => (),
         }
     }
diff --git a/src/mito2/src/request.rs b/src/mito2/src/request.rs
index d88bc994e97e..3029257bb1a1 100644
--- a/src/mito2/src/request.rs
+++ b/src/mito2/src/request.rs
@@ -30,6 +30,7 @@ use prometheus::HistogramTimer;
 use prost::Message;
 use smallvec::SmallVec;
 use snafu::{ensure, OptionExt, ResultExt};
+use store_api::manifest::ManifestVersion;
 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
 use store_api::region_engine::SetReadonlyResponse;
 use store_api::region_request::{
@@ -349,6 +350,10 @@ impl WriteRequest {
                         ),
                     })?
             }
+            OpType::Notify => {
+                // Safety: checked above.
+                unreachable!()
+            }
         };
 
         // Convert default value into proto's value.
@@ -654,6 +659,8 @@ pub(crate) struct FlushFinished {
     pub(crate) _timer: HistogramTimer,
     /// Region edit to apply.
     pub(crate) edit: RegionEdit,
+    /// The manifest version.
+    pub(crate) manifest_version: ManifestVersion,
     /// Memtables to remove.
     pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
 }
@@ -696,6 +703,8 @@ pub(crate) struct CompactionFinished {
     pub(crate) start_time: Instant,
     /// Region edit to apply.
     pub(crate) edit: RegionEdit,
+    /// Manifest version
+    pub(crate) manifest_version: ManifestVersion,
 }
 
 impl CompactionFinished {
@@ -738,7 +747,7 @@ pub(crate) struct TruncateResult {
     /// Result sender.
     pub(crate) sender: OptionOutputTx,
     /// Truncate result.
-    pub(crate) result: Result<()>,
+    pub(crate) result: Result<ManifestVersion>,
     /// Truncated entry id.
     pub(crate) truncated_entry_id: EntryId,
     /// Truncated sequence.
@@ -755,7 +764,7 @@ pub(crate) struct RegionChangeResult {
     /// Result sender.
     pub(crate) sender: OptionOutputTx,
     /// Result from the manifest manager.
-    pub(crate) result: Result<()>,
+    pub(crate) result: Result<ManifestVersion>,
 }
 
 /// Request to edit a region directly.
@@ -764,7 +773,7 @@ pub(crate) struct RegionEditRequest {
     pub(crate) region_id: RegionId,
     pub(crate) edit: RegionEdit,
     /// The sender to notify the result to the region engine.
-    pub(crate) tx: Sender<Result<()>>,
+    pub(crate) tx: Sender<Result<ManifestVersion>>,
 }
 
 /// Notifies the regin the result of editing region.
@@ -773,11 +782,11 @@ pub(crate) struct RegionEditResult {
     /// Region id.
     pub(crate) region_id: RegionId,
     /// Result sender.
-    pub(crate) sender: Sender<Result<()>>,
+    pub(crate) sender: Sender<Result<ManifestVersion>>,
     /// Region edit to apply.
     pub(crate) edit: RegionEdit,
     /// Result from the manifest manager.
-    pub(crate) result: Result<()>,
+    pub(crate) result: Result<ManifestVersion>,
 }
 
 #[cfg(test)]
diff --git a/src/mito2/src/schedule/remote_job_scheduler.rs b/src/mito2/src/schedule/remote_job_scheduler.rs
index 8f51a774d50d..a73354e873db 100644
--- a/src/mito2/src/schedule/remote_job_scheduler.rs
+++ b/src/mito2/src/schedule/remote_job_scheduler.rs
@@ -19,6 +19,7 @@ use std::time::Instant;
 use common_telemetry::error;
 use serde::{Deserialize, Serialize};
 use snafu::{Location, ResultExt, Snafu};
+use store_api::manifest::ManifestVersion;
 use store_api::storage::RegionId;
 use tokio::sync::mpsc::Sender;
 use uuid::Uuid;
@@ -124,7 +125,7 @@ pub struct CompactionJobResult {
     pub job_id: JobId,
     pub region_id: RegionId,
     pub start_time: Instant,
-    pub region_edit: Result<RegionEdit>,
+    pub region_edit: Result<(ManifestVersion, RegionEdit)>,
 }
 
 /// DefaultNotifier is a default implementation of Notifier that sends WorkerRequest to the mito engine.
@@ -149,12 +150,15 @@ impl Notifier for DefaultNotifier {
             RemoteJobResult::CompactionJobResult(result) => {
                 let notify = {
                     match result.region_edit {
-                        Ok(edit) => BackgroundNotify::CompactionFinished(CompactionFinished {
-                            region_id: result.region_id,
-                            senders: waiters,
-                            start_time: result.start_time,
-                            edit,
-                        }),
+                        Ok((manifest_version, edit)) => {
+                            BackgroundNotify::CompactionFinished(CompactionFinished {
+                                region_id: result.region_id,
+                                senders: waiters,
+                                start_time: result.start_time,
+                                edit,
+                                manifest_version,
+                            })
+                        }
                         Err(err) => {
                             error!(
                                 "Compaction failed for region {}: {:?}",
diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs
index da0451bc1aa8..42ca1773315d 100644
--- a/src/mito2/src/test_util.rs
+++ b/src/mito2/src/test_util.rs
@@ -112,6 +112,12 @@ pub(crate) fn kafka_log_store_factory() -> Option<LogStoreFactory> {
 #[tokio::test]
 pub(crate) fn multiple_log_store_factories(#[case] factory: Option<LogStoreFactory>) {}
 
+#[template]
+#[rstest]
+#[case::with_kafka(kafka_log_store_factory())]
+#[tokio::test]
+pub(crate) fn single_kafka_log_store_factory(#[case] factory: Option<LogStoreFactory>) {}
+
 #[derive(Clone)]
 pub(crate) struct RaftEngineLogStoreFactory;
 
@@ -176,12 +182,27 @@ pub(crate) enum LogStoreFactory {
     Kafka(KafkaLogStoreFactory),
 }
 
+impl LogStoreFactory {
+    pub fn is_kafka(&self) -> bool {
+        matches!(self, LogStoreFactory::Kafka(_))
+    }
+}
+
 #[derive(Clone)]
 pub(crate) enum LogStoreImpl {
     RaftEngine(Arc<RaftEngineLogStore>),
     Kafka(Arc<KafkaLogStore>),
 }
 
+impl LogStoreImpl {
+    pub(crate) fn into_kafka_log_store(self) -> Arc<KafkaLogStore> {
+        match self {
+            LogStoreImpl::RaftEngine(_) => unreachable!(),
+            LogStoreImpl::Kafka(log_store) => log_store,
+        }
+    }
+}
+
 /// Env to test mito engine.
 pub struct TestEnv {
     /// Path to store data.
@@ -248,6 +269,10 @@ impl TestEnv {
         self.object_store_manager.clone()
     }
 
+    pub(crate) fn log_store(&self) -> Option<LogStoreImpl> {
+        self.log_store.clone()
+    }
+
     /// Creates a new engine with specific config under this env.
     pub async fn create_engine(&mut self, config: MitoConfig) -> MitoEngine {
         let (log_store, object_store_manager) = self.create_log_and_object_store_manager().await;
diff --git a/src/mito2/src/test_util/memtable_util.rs b/src/mito2/src/test_util/memtable_util.rs
index 235e9694c712..2998eaa9382f 100644
--- a/src/mito2/src/test_util/memtable_util.rs
+++ b/src/mito2/src/test_util/memtable_util.rs
@@ -297,6 +297,7 @@ pub(crate) fn build_key_values_with_ts_seq_values(
             schema: column_schema,
             rows,
         }),
+        manifest_notification: None,
     };
     KeyValues::new(metadata.as_ref(), mutation).unwrap()
 }
diff --git a/src/mito2/src/test_util/version_util.rs b/src/mito2/src/test_util/version_util.rs
index d4a17ffe47c7..8a97e18d7168 100644
--- a/src/mito2/src/test_util/version_util.rs
+++ b/src/mito2/src/test_util/version_util.rs
@@ -164,6 +164,7 @@ pub(crate) fn write_rows_to_version(
         op_type: OpType::Put as i32,
         sequence: start_ts as u64, // The sequence may be incorrect, but it's fine in test.
         rows: Some(rows),
+        manifest_notification: None,
     };
     let key_values = KeyValues::new(&version.metadata, mutation).unwrap();
     version.memtables.mutable.write(&key_values).unwrap();
diff --git a/src/mito2/src/wal.rs b/src/mito2/src/wal.rs
index 710c0ee3c807..2bf7f951aba9 100644
--- a/src/mito2/src/wal.rs
+++ b/src/mito2/src/wal.rs
@@ -287,6 +287,7 @@ mod tests {
             op_type: op_type as i32,
             sequence,
             rows: Some(Rows { schema, rows }),
+            manifest_notification: None,
         }
     }
 
diff --git a/src/mito2/src/wal/entry_distributor.rs b/src/mito2/src/wal/entry_distributor.rs
index e869e5ee1a5c..bb60da71b9ab 100644
--- a/src/mito2/src/wal/entry_distributor.rs
+++ b/src/mito2/src/wal/entry_distributor.rs
@@ -279,6 +279,7 @@ mod tests {
                         op_type: OpType::Put as i32,
                         sequence: 1u64,
                         rows: None,
+                        manifest_notification: None,
                     }],
                 }
                 .encode_to_vec(),
@@ -292,6 +293,7 @@ mod tests {
                         op_type: OpType::Put as i32,
                         sequence: 2u64,
                         rows: None,
+                        manifest_notification: None,
                     }],
                 }
                 .encode_to_vec(),
@@ -305,6 +307,7 @@ mod tests {
                         op_type: OpType::Put as i32,
                         sequence: 3u64,
                         rows: None,
+                        manifest_notification: None,
                     }],
                 }
                 .encode_to_vec(),
@@ -348,6 +351,7 @@ mod tests {
                         op_type: OpType::Put as i32,
                         sequence: 1u64,
                         rows: None,
+                        manifest_notification: None,
                     }],
                 }
             )]
@@ -367,6 +371,7 @@ mod tests {
                         op_type: OpType::Put as i32,
                         sequence: 2u64,
                         rows: None,
+                        manifest_notification: None,
                     }],
                 }
             )]
@@ -382,6 +387,7 @@ mod tests {
                 op_type: OpType::Put as i32,
                 sequence: 1u64,
                 rows: None,
+                manifest_notification: None,
             }],
         };
         let region2 = RegionId::new(1, 2);
@@ -390,6 +396,7 @@ mod tests {
                 op_type: OpType::Put as i32,
                 sequence: 3u64,
                 rows: None,
+                manifest_notification: None,
             }],
         };
         let region3 = RegionId::new(1, 3);
@@ -398,6 +405,7 @@ mod tests {
                 op_type: OpType::Put as i32,
                 sequence: 3u64,
                 rows: None,
+                manifest_notification: None,
             }],
         };
         let provider = Provider::kafka_provider("my_topic".to_string());
@@ -475,6 +483,7 @@ mod tests {
                 op_type: OpType::Put as i32,
                 sequence: 1u64,
                 rows: None,
+                manifest_notification: None,
             }],
         };
         let region2 = RegionId::new(1, 2);
@@ -551,6 +560,7 @@ mod tests {
                         op_type: OpType::Put as i32,
                         sequence: 1u64,
                         rows: None,
+                        manifest_notification: None,
                     }],
                 }
                 .encode_to_vec(),
@@ -564,6 +574,7 @@ mod tests {
                         op_type: OpType::Put as i32,
                         sequence: 2u64,
                         rows: None,
+                        manifest_notification: None,
                     }],
                 }
                 .encode_to_vec(),
@@ -577,6 +588,7 @@ mod tests {
                         op_type: OpType::Put as i32,
                         sequence: 3u64,
                         rows: None,
+                        manifest_notification: None,
                     }],
                 }
                 .encode_to_vec(),
@@ -590,6 +602,7 @@ mod tests {
                         op_type: OpType::Put as i32,
                         sequence: 4u64,
                         rows: None,
+                        manifest_notification: None,
                     }],
                 }
                 .encode_to_vec(),
@@ -624,6 +637,7 @@ mod tests {
                         op_type: OpType::Put as i32,
                         sequence: 4u64,
                         rows: None,
+                        manifest_notification: None,
                     }],
                 }
             )]
diff --git a/src/mito2/src/wal/entry_reader.rs b/src/mito2/src/wal/entry_reader.rs
index 27525155fdb6..14dfab47be36 100644
--- a/src/mito2/src/wal/entry_reader.rs
+++ b/src/mito2/src/wal/entry_reader.rs
@@ -21,6 +21,7 @@ use snafu::{ensure, ResultExt};
 use store_api::logstore::entry::Entry;
 use store_api::logstore::provider::Provider;
 
+use super::raw_entry_reader::EntryStream;
 use crate::error::{CorruptedEntrySnafu, DecodeWalSnafu, Result};
 use crate::wal::raw_entry_reader::RawEntryReader;
 use crate::wal::{EntryId, WalEntryStream};
@@ -64,31 +65,34 @@ impl<R> LogStoreEntryReader<R> {
 impl<R: RawEntryReader> WalEntryReader for LogStoreEntryReader<R> {
     fn read(&mut self, ns: &'_ Provider, start_id: EntryId) -> Result<WalEntryStream<'static>> {
         let LogStoreEntryReader { reader } = self;
-        let mut stream = reader.read(ns, start_id)?;
-
-        let stream = stream! {
-            let mut buffered_entry = None;
-            while let Some(next_entry) = stream.next().await {
-                match buffered_entry.take() {
-                    Some(entry) => {
-                        yield decode_raw_entry(entry);
-                        buffered_entry = Some(next_entry?);
-                    },
-                    None => {
-                        buffered_entry = Some(next_entry?);
-                    }
-                };
-            }
-            if let Some(entry) = buffered_entry {
-                // Ignores tail corrupted data.
-                if entry.is_complete() {
+        let stream = reader.read(ns, start_id)?;
+
+        Ok(decode_stream(stream))
+    }
+}
+
+pub(crate) fn decode_stream(mut stream: EntryStream<'static>) -> WalEntryStream<'static> {
+    stream! {
+        let mut buffered_entry = None;
+        while let Some(next_entry) = stream.next().await {
+            match buffered_entry.take() {
+                Some(entry) => {
                     yield decode_raw_entry(entry);
+                    buffered_entry = Some(next_entry?);
+                },
+                None => {
+                    buffered_entry = Some(next_entry?);
                 }
+            };
+        }
+        if let Some(entry) = buffered_entry {
+            // Ignores tail corrupted data.
+            if entry.is_complete() {
+                yield decode_raw_entry(entry);
             }
-        };
-
-        Ok(Box::pin(stream))
+        }
     }
+    .boxed()
 }
 
 #[cfg(test)]
@@ -115,6 +119,7 @@ mod tests {
                 op_type: OpType::Put as i32,
                 sequence: 1u64,
                 rows: None,
+                manifest_notification: None,
             }],
         };
         let encoded_entry = wal_entry.encode_to_vec();
diff --git a/src/mito2/src/wal/raw_entry_reader.rs b/src/mito2/src/wal/raw_entry_reader.rs
index 85a0c945b9fd..2258ef99410d 100644
--- a/src/mito2/src/wal/raw_entry_reader.rs
+++ b/src/mito2/src/wal/raw_entry_reader.rs
@@ -17,12 +17,14 @@ use std::sync::Arc;
 use async_stream::try_stream;
 use common_error::ext::BoxedError;
 use futures::stream::BoxStream;
+use futures::StreamExt;
 use snafu::ResultExt;
 use store_api::logstore::entry::Entry;
 use store_api::logstore::provider::Provider;
+#[cfg(test)]
+use store_api::logstore::SendableEntryStream;
 use store_api::logstore::{LogStore, WalIndex};
 use store_api::storage::RegionId;
-use tokio_stream::StreamExt;
 
 use crate::error::{self, Result};
 use crate::wal::EntryId;
@@ -123,6 +125,29 @@ where
     }
 }
 
+#[cfg(test)]
+pub(crate) fn flatten_stream<S: LogStore>(
+    mut stream: SendableEntryStream<'static, Entry, S::Error>,
+    provider: Provider,
+) -> EntryStream<'static> {
+    let stream = try_stream!({
+        while let Some(entries) = stream.next().await {
+            let entries =
+                entries
+                    .map_err(BoxedError::new)
+                    .with_context(|_| error::ReadWalSnafu {
+                        provider: provider.clone(),
+                    })?;
+
+            for entry in entries {
+                yield entry
+            }
+        }
+    });
+
+    stream.boxed()
+}
+
 #[cfg(test)]
 mod tests {
     use std::sync::Arc;
diff --git a/src/mito2/src/worker.rs b/src/mito2/src/worker.rs
index 0f872e24e4e7..49cdfeefaf7f 100644
--- a/src/mito2/src/worker.rs
+++ b/src/mito2/src/worker.rs
@@ -39,6 +39,7 @@ use prometheus::IntGauge;
 use rand::{thread_rng, Rng};
 use snafu::{ensure, ResultExt};
 use store_api::logstore::LogStore;
+use store_api::manifest::ManifestVersion;
 use store_api::region_engine::SetReadonlyResponse;
 use store_api::storage::RegionId;
 use tokio::sync::mpsc::{Receiver, Sender};
@@ -48,8 +49,9 @@ use crate::cache::write_cache::{WriteCache, WriteCacheRef};
 use crate::cache::{CacheManager, CacheManagerRef};
 use crate::compaction::CompactionScheduler;
 use crate::config::MitoConfig;
-use crate::error::{JoinSnafu, Result, WorkerStoppedSnafu};
+use crate::error::{JoinSnafu, ReadonlyRegionSnafu, Result, WorkerStoppedSnafu};
 use crate::flush::{FlushScheduler, WriteBufferManagerImpl, WriteBufferManagerRef};
+use crate::manifest_notifier::ManifestNotifier;
 use crate::memtable::MemtableBuilderProvider;
 use crate::metrics::{REGION_COUNT, WRITE_STALL_TOTAL};
 use crate::region::{MitoRegionRef, OpeningRegions, OpeningRegionsRef, RegionMap, RegionMapRef};
@@ -824,11 +826,54 @@ impl<S: LogStore> RegionWorkerLoop<S> {
             }
             BackgroundNotify::CompactionFailed(req) => self.handle_compaction_failure(req).await,
             BackgroundNotify::Truncate(req) => self.handle_truncate_result(req).await,
-            BackgroundNotify::RegionChange(req) => self.handle_manifest_region_change_result(req),
+            BackgroundNotify::RegionChange(req) => {
+                self.handle_manifest_region_change_result(req).await
+            }
             BackgroundNotify::RegionEdit(req) => self.handle_region_edit_result(req).await,
         }
     }
 
+    /// Notifies follower regions to apply manifest change.
+    pub(crate) async fn notify_manifest_change(
+        &mut self,
+        region: &MitoRegionRef,
+        version: ManifestVersion,
+    ) -> Result<()> {
+        if !region.provider.is_remote_wal() {
+            return Ok(());
+        }
+
+        ensure!(
+            !region.is_readonly(),
+            ReadonlyRegionSnafu {
+                region_id: region.region_id
+            }
+        );
+
+        let mut notifier = ManifestNotifier::new(
+            region.region_id,
+            &region.version_control,
+            region.provider.clone(),
+        );
+        notifier.push_notification(version);
+        let mut writer = self.wal.writer();
+        notifier.add_wal_entry(&mut writer)?;
+        let append_response = writer.write_to_wal().await?;
+        // Safety: must exist
+        let last_entry_id = append_response
+            .last_entry_ids
+            .get(&region.region_id)
+            .unwrap();
+        notifier.set_next_entry_id(last_entry_id + 1);
+        notifier.finish();
+
+        info!(
+            "Successfully notify region manifest change, region: {}, version: {}",
+            region.region_id, version
+        );
+        Ok(())
+    }
+
     /// Handles `set_readonly_gracefully`.
     async fn set_readonly_gracefully(
         &mut self,
diff --git a/src/mito2/src/worker/handle_alter.rs b/src/mito2/src/worker/handle_alter.rs
index 27aadbfc0d02..d7a7c7a40bdc 100644
--- a/src/mito2/src/worker/handle_alter.rs
+++ b/src/mito2/src/worker/handle_alter.rs
@@ -18,6 +18,7 @@ use std::sync::Arc;
 
 use common_telemetry::{debug, info};
 use snafu::ResultExt;
+use store_api::logstore::LogStore;
 use store_api::metadata::{RegionMetadata, RegionMetadataBuilder, RegionMetadataRef};
 use store_api::region_request::RegionAlterRequest;
 use store_api::storage::RegionId;
@@ -30,7 +31,7 @@ use crate::manifest::action::RegionChange;
 use crate::request::{DdlRequest, OptionOutputTx, SenderDdlRequest};
 use crate::worker::RegionWorkerLoop;
 
-impl<S> RegionWorkerLoop<S> {
+impl<S: LogStore> RegionWorkerLoop<S> {
     pub(crate) async fn handle_alter_request(
         &mut self,
         region_id: RegionId,
diff --git a/src/mito2/src/worker/handle_compaction.rs b/src/mito2/src/worker/handle_compaction.rs
index e0889320350c..be9290bf1ddd 100644
--- a/src/mito2/src/worker/handle_compaction.rs
+++ b/src/mito2/src/worker/handle_compaction.rs
@@ -14,6 +14,7 @@
 
 use api::v1::region::compact_request;
 use common_telemetry::{error, info, warn};
+use store_api::logstore::LogStore;
 use store_api::region_request::RegionCompactRequest;
 use store_api::storage::RegionId;
 
@@ -23,7 +24,7 @@ use crate::region::MitoRegionRef;
 use crate::request::{CompactionFailed, CompactionFinished, OnFailure, OptionOutputTx};
 use crate::worker::RegionWorkerLoop;
 
-impl<S> RegionWorkerLoop<S> {
+impl<S: LogStore> RegionWorkerLoop<S> {
     /// Handles compaction request submitted to region worker.
     pub(crate) async fn handle_compaction_request(
         &mut self,
@@ -71,6 +72,13 @@ impl<S> RegionWorkerLoop<S> {
         };
         region.update_compaction_millis();
 
+        if let Err(e) = self
+            .notify_manifest_change(&region, request.manifest_version)
+            .await
+        {
+            error!(e; "Failed to notify manifest change, region: {}, version: {}", region_id, request.manifest_version);
+        }
+
         region
             .version_control
             .apply_edit(request.edit.clone(), &[], region.file_purger.clone());
diff --git a/src/mito2/src/worker/handle_flush.rs b/src/mito2/src/worker/handle_flush.rs
index 14a70225bbe1..6d0b7234864f 100644
--- a/src/mito2/src/worker/handle_flush.rs
+++ b/src/mito2/src/worker/handle_flush.rs
@@ -201,6 +201,13 @@ impl<S: LogStore> RegionWorkerLoop<S> {
             }
         };
 
+        if let Err(e) = self
+            .notify_manifest_change(&region, request.manifest_version)
+            .await
+        {
+            error!(e; "Failed to notify manifest change, region: {}, version: {}", region_id, request.manifest_version);
+        }
+
         region.version_control.apply_edit(
             request.edit.clone(),
             &request.memtables_to_remove,
diff --git a/src/mito2/src/worker/handle_manifest.rs b/src/mito2/src/worker/handle_manifest.rs
index de5f4e563d43..0bc46cd3f8f0 100644
--- a/src/mito2/src/worker/handle_manifest.rs
+++ b/src/mito2/src/worker/handle_manifest.rs
@@ -18,7 +18,9 @@
 
 use std::collections::{HashMap, VecDeque};
 
-use common_telemetry::{info, warn};
+use common_telemetry::{error, info, warn};
+use store_api::logstore::LogStore;
+use store_api::manifest::ManifestVersion;
 use store_api::storage::RegionId;
 
 use crate::cache::file_cache::{FileType, IndexKey};
@@ -74,7 +76,7 @@ impl RegionEditQueue {
     }
 }
 
-impl<S> RegionWorkerLoop<S> {
+impl<S: LogStore> RegionWorkerLoop<S> {
     /// Handles region edit request.
     pub(crate) async fn handle_region_edit(&mut self, request: RegionEditRequest) {
         let region_id = request.region_id;
@@ -151,11 +153,15 @@ impl<S> RegionWorkerLoop<S> {
         let need_compaction =
             edit_result.result.is_ok() && !edit_result.edit.files_to_add.is_empty();
 
-        if edit_result.result.is_ok() {
+        if let Ok(version) = edit_result.result {
             // Applies the edit to the region.
             region
                 .version_control
                 .apply_edit(edit_result.edit, &[], region.file_purger.clone());
+
+            if let Err(e) = self.notify_manifest_change(&region, version).await {
+                error!(e; "Failed to notify manifest change, region: {}, version: {}", region.region_id, version);
+            }
         }
 
         // Sets the region as writable.
@@ -263,7 +269,10 @@ impl<S> RegionWorkerLoop<S> {
     }
 
     /// Handles region change result.
-    pub(crate) fn handle_manifest_region_change_result(&self, change_result: RegionChangeResult) {
+    pub(crate) async fn handle_manifest_region_change_result(
+        &mut self,
+        change_result: RegionChangeResult,
+    ) {
         let region = match self.regions.get_region(change_result.region_id) {
             Some(region) => region,
             None => {
@@ -277,7 +286,10 @@ impl<S> RegionWorkerLoop<S> {
             }
         };
 
-        if change_result.result.is_ok() {
+        if let Ok(version) = change_result.result {
+            if let Err(e) = self.notify_manifest_change(&region, version).await {
+                error!(e; "Failed to notify manifest change, region: {}, version: {}", region.region_id, version);
+            }
             // Apply the metadata to region's version.
             region
                 .version_control
@@ -303,7 +315,7 @@ async fn edit_region(
     edit: RegionEdit,
     cache_manager: CacheManagerRef,
     listener: WorkerListener,
-) -> Result<()> {
+) -> Result<ManifestVersion> {
     let region_id = region.region_id;
     if let Some(write_cache) = cache_manager.write_cache() {
         for file_meta in &edit.files_to_add {
diff --git a/src/mito2/src/worker/handle_truncate.rs b/src/mito2/src/worker/handle_truncate.rs
index da5b74e511f3..434bbbcdb37e 100644
--- a/src/mito2/src/worker/handle_truncate.rs
+++ b/src/mito2/src/worker/handle_truncate.rs
@@ -14,7 +14,7 @@
 
 //! Handling truncate related requests.
 
-use common_telemetry::info;
+use common_telemetry::{error, info};
 use store_api::logstore::LogStore;
 use store_api::storage::RegionId;
 
@@ -66,7 +66,10 @@ impl<S: LogStore> RegionWorkerLoop<S> {
         region.switch_state_to_writable(RegionState::Truncating);
 
         match truncate_result.result {
-            Ok(()) => {
+            Ok(version) => {
+                if let Err(e) = self.notify_manifest_change(&region, version).await {
+                    error!(e; "Failed to notify manifest change, region: {}, version: {}", region_id, version);
+                }
                 // Applies the truncate action to the region.
                 region.version_control.truncate(
                     truncate_result.truncated_entry_id,
diff --git a/src/store-api/src/logstore/provider.rs b/src/store-api/src/logstore/provider.rs
index 16f907f3b439..e90f5c2aedc1 100644
--- a/src/store-api/src/logstore/provider.rs
+++ b/src/store-api/src/logstore/provider.rs
@@ -107,4 +107,9 @@ impl Provider {
         }
         None
     }
+
+    /// Returns true if it's remote WAL.
+    pub fn is_remote_wal(&self) -> bool {
+        matches!(self, Provider::Kafka(_))
+    }
 }