Skip to content

Commit c2364bb

Browse files
committed
chore: after rebase fix
Signed-off-by: discord9 <[email protected]>
1 parent 799ff60 commit c2364bb

File tree

4 files changed

+32
-28
lines changed

4 files changed

+32
-28
lines changed

src/meta-srv/src/gc/ctx.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,6 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
16-
1715
use std::collections::{HashMap, HashSet};
1816
use std::time::Duration;
1917

@@ -34,11 +32,6 @@ use table::metadata::TableId;
3432
use crate::cluster::MetaPeerClientRef;
3533
use crate::error::{self, Result, TableMetadataManagerSnafu, UnexpectedSnafu};
3634
use crate::gc::Region2Peers;
37-
38-
use crate::cluster::MetaPeerClientRef;
39-
use crate::error;
40-
use crate::error::{Result, TableMetadataManagerSnafu};
41-
use crate::gc::handler::Region2Peers;
4235
use crate::gc::procedure::GcRegionProcedure;
4336
use crate::handler::HeartbeatMailbox;
4437
use crate::service::mailbox::{Channel, MailboxRef};

src/meta-srv/src/gc/handler.rs

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use common_meta::key::table_route::PhysicalTableRouteValue;
1919
use common_meta::peer::Peer;
2020
use common_telemetry::{debug, error, info, warn};
2121
use futures::StreamExt;
22+
use itertools::Itertools;
2223
use store_api::storage::{FileRefsManifest, GcReport, RegionId};
2324
use table::metadata::TableId;
2425
use tokio::time::sleep;
@@ -88,8 +89,8 @@ impl GcScheduler {
8889
pub(crate) async fn find_related_regions(
8990
&self,
9091
candidate_region_ids: &[RegionId],
91-
) -> Result<Vec<RegionId>> {
92-
Ok(candidate_region_ids.to_vec())
92+
) -> Result<HashMap<RegionId, Vec<RegionId>>> {
93+
Ok(candidate_region_ids.iter().map(|&r| (r, vec![r])).collect())
9394
}
9495

9596
/// Aggregate GC candidates by their corresponding datanode peer.
@@ -235,15 +236,18 @@ impl GcScheduler {
235236

236237
let all_region_ids: Vec<RegionId> = candidates.iter().map(|(_, c)| c.region_id).collect();
237238

239+
let all_related_regions = self.find_related_regions(&all_region_ids).await?;
240+
238241
let (region_to_peer, _) = self
239-
.discover_datanodes_for_regions(&all_region_ids, 0)
242+
.discover_datanodes_for_regions(&all_related_regions.keys().cloned().collect_vec(), 0)
240243
.await?;
241244

242245
// Step 1: Get file references for all regions on this datanode
243246
let file_refs_manifest = self
244247
.ctx
245248
.get_file_references(
246249
&all_region_ids,
250+
all_related_regions,
247251
&region_to_peer,
248252
self.config.mailbox_timeout,
249253
)
@@ -407,7 +411,7 @@ impl GcScheduler {
407411
// Step 2: Process retry regions by calling gc_regions on rediscovered datanodes (batched)
408412
let retry_result = self
409413
.process_retry_regions_by_peers(region_to_peer, peer_to_regions, retry_round)
410-
.await;
414+
.await?;
411415

412416
// Merge the retry results into the final report
413417
final_report.merge(retry_result);
@@ -429,19 +433,29 @@ impl GcScheduler {
429433
Ok(final_report)
430434
}
431435

432-
/// Discover datanodes for the given regions by fetching table routes in batches.
436+
/// Discover datanodes for the given regions(and it's related regions) by fetching table routes in batches.
433437
/// Returns mappings from region to peer(leader, Vec<followers>) and peer to regions.
434438
async fn discover_datanodes_for_regions(
435439
&self,
436440
regions: &[RegionId],
437441
retry_round: usize,
438442
) -> Result<(Region2Peers, Peer2Regions)> {
443+
let all_related_regions = self
444+
.find_related_regions(&regions)
445+
.await?
446+
.into_iter()
447+
.map(|(k, mut v)| {
448+
v.push(k);
449+
v
450+
})
451+
.flatten()
452+
.collect_vec();
439453
let mut region_to_peer = HashMap::new();
440454
let mut peer_to_regions = HashMap::new();
441455

442456
// Group regions by table ID for batch processing
443457
let mut table_to_regions: HashMap<TableId, Vec<RegionId>> = HashMap::new();
444-
for &region_id in regions {
458+
for region_id in all_related_regions {
445459
let table_id = region_id.table_id();
446460
table_to_regions
447461
.entry(table_id)
@@ -520,7 +534,7 @@ impl GcScheduler {
520534
region_to_peer: Region2Peers,
521535
peer_to_regions: Peer2Regions,
522536
retry_round: usize,
523-
) -> GcJobReport {
537+
) -> Result<GcJobReport> {
524538
let mut round_report = GcJobReport::default();
525539

526540
for (peer, regions) in peer_to_regions {
@@ -531,17 +545,17 @@ impl GcScheduler {
531545
retry_round
532546
);
533547

534-
let all_related_regions = self
535-
.find_related_regions(&regions.iter().cloned().collect::<Vec<_>>())
536-
.await
537-
.unwrap_or_default();
548+
let query_regions = regions.clone().into_iter().collect::<Vec<_>>();
549+
550+
let all_related_regions = self.find_related_regions(&query_regions).await?;
538551

539552
// Get fresh file references for these regions
540553
let file_refs_manifest = match self
541554
.ctx
542555
.get_file_references(
543-
&all_related_regions,
544-
&region_to_peer,
556+
&query_regions,
557+
all_related_regions,
558+
&region_to_peer, // FIXME: update region_to_peer?
545559
self.config.mailbox_timeout,
546560
)
547561
.await
@@ -576,7 +590,7 @@ impl GcScheduler {
576590
}
577591
}
578592

579-
round_report
593+
Ok(round_report)
580594
}
581595

582596
/// Process a batch of regions on a single peer.

src/meta-srv/src/gc/mock.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,8 @@ impl SchedulerCtx for MockSchedulerCtx {
212212

213213
async fn get_file_references(
214214
&self,
215-
region_ids: &[RegionId],
215+
query_regions: &[RegionId],
216+
related_regions: HashMap<RegionId, Vec<RegionId>>,
216217
region_to_peer: &Region2Peers,
217218
_timeout: Duration,
218219
) -> Result<FileRefsManifest> {
@@ -222,14 +223,14 @@ impl SchedulerCtx for MockSchedulerCtx {
222223
if let Some(error) = self.get_file_references_error.lock().unwrap().take() {
223224
return Err(error);
224225
}
225-
if region_ids
226+
if query_regions
226227
.iter()
227228
.any(|region_id| !region_to_peer.contains_key(region_id))
228229
{
229230
UnexpectedSnafu {
230231
violated: format!(
231232
"region_to_peer{region_to_peer:?} does not contain all region_ids requested: {:?}",
232-
region_ids
233+
query_regions
233234
),
234235
}.fail()?;
235236
}

src/mito2/src/gc.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,6 @@ impl LocalGcWorker {
322322
let region_id = manifest.metadata.region_id;
323323
let current_files = &manifest.files;
324324

325-
let in_manifest_file_cnt = current_files.len();
326-
327325
let recently_removed_files = self.get_removed_files_expel_times(&manifest).await?;
328326

329327
if recently_removed_files.is_empty() {
@@ -346,8 +344,6 @@ impl LocalGcWorker {
346344
.chain(tmp_ref_files.clone().into_iter())
347345
.collect();
348346

349-
let in_used_file_cnt = in_used.len();
350-
351347
let unused_files = self
352348
.list_to_be_deleted_files(region_id, &in_used, recently_removed_files, concurrency)
353349
.await?;

0 commit comments

Comments
 (0)