Skip to content
This repository was archived by the owner on Jan 22, 2025. It is now read-only.

Commit 27eff84

Browse files
committed
Revert "Allow configuration of replay thread pools from CLI (#236)"
This reverts commit 973d05c.
1 parent 973d05c commit 27eff84

File tree

7 files changed

+24
-179
lines changed

7 files changed

+24
-179
lines changed

core/src/replay_stage.rs

+11-11
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use {
5151
solana_measure::measure::Measure,
5252
solana_poh::poh_recorder::{PohLeaderStatus, PohRecorder, GRACE_TICKS_FACTOR, MAX_GRACE_SLOTS},
5353
solana_program_runtime::timings::ExecuteTimings,
54+
solana_rayon_threadlimit::get_max_thread_count,
5455
solana_rpc::{
5556
optimistically_confirmed_bank_tracker::{BankNotification, BankNotificationSenderConfig},
5657
rpc_subscriptions::RpcSubscriptions,
@@ -79,7 +80,6 @@ use {
7980
solana_vote_program::vote_state::VoteTransaction,
8081
std::{
8182
collections::{HashMap, HashSet},
82-
num::NonZeroUsize,
8383
result,
8484
sync::{
8585
atomic::{AtomicBool, AtomicU64, Ordering},
@@ -95,9 +95,11 @@ pub const SUPERMINORITY_THRESHOLD: f64 = 1f64 / 3f64;
9595
pub const MAX_UNCONFIRMED_SLOTS: usize = 5;
9696
pub const DUPLICATE_LIVENESS_THRESHOLD: f64 = 0.1;
9797
pub const DUPLICATE_THRESHOLD: f64 = 1.0 - SWITCH_FORK_THRESHOLD - DUPLICATE_LIVENESS_THRESHOLD;
98-
9998
const MAX_VOTE_SIGNATURES: usize = 200;
10099
const MAX_VOTE_REFRESH_INTERVAL_MILLIS: usize = 5000;
100+
// Expect this number to be small enough to minimize thread pool overhead while large enough
101+
// to be able to replay all active forks at the same time in most cases.
102+
const MAX_CONCURRENT_FORKS_TO_REPLAY: usize = 4;
101103
const MAX_REPAIR_RETRY_LOOP_ATTEMPTS: usize = 10;
102104

103105
#[derive(PartialEq, Eq, Debug)]
@@ -289,8 +291,7 @@ pub struct ReplayStageConfig {
289291
// Stops voting until this slot has been reached. Should be used to avoid
290292
// duplicate voting which can lead to slashing.
291293
pub wait_to_vote_slot: Option<Slot>,
292-
pub replay_forks_threads: NonZeroUsize,
293-
pub replay_transactions_threads: NonZeroUsize,
294+
pub replay_slots_concurrently: bool,
294295
}
295296

296297
/// Timing information for the ReplayStage main processing loop
@@ -573,8 +574,7 @@ impl ReplayStage {
573574
ancestor_hashes_replay_update_sender,
574575
tower_storage,
575576
wait_to_vote_slot,
576-
replay_forks_threads,
577-
replay_transactions_threads,
577+
replay_slots_concurrently,
578578
} = config;
579579

580580
trace!("replay stage");
@@ -654,19 +654,19 @@ impl ReplayStage {
654654
)
655655
};
656656
// Thread pool to (maybe) replay multiple threads in parallel
657-
let replay_mode = if replay_forks_threads.get() == 1 {
658-
ForkReplayMode::Serial
659-
} else {
657+
let replay_mode = if replay_slots_concurrently {
660658
let pool = rayon::ThreadPoolBuilder::new()
661-
.num_threads(replay_forks_threads.get())
659+
.num_threads(MAX_CONCURRENT_FORKS_TO_REPLAY)
662660
.thread_name(|i| format!("solReplayFork{i:02}"))
663661
.build()
664662
.expect("new rayon threadpool");
665663
ForkReplayMode::Parallel(pool)
664+
} else {
665+
ForkReplayMode::Serial
666666
};
667667
// Thread pool to replay multiple transactions within one block in parallel
668668
let replay_tx_thread_pool = rayon::ThreadPoolBuilder::new()
669-
.num_threads(replay_transactions_threads.get())
669+
.num_threads(get_max_thread_count())
670670
.thread_name(|i| format!("solReplayTx{i:02}"))
671671
.build()
672672
.expect("new rayon threadpool");

core/src/tvu.rs

+3-19
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ use {
5353
std::{
5454
collections::HashSet,
5555
net::{SocketAddr, UdpSocket},
56-
num::NonZeroUsize,
5756
sync::{atomic::AtomicBool, Arc, RwLock},
5857
thread::{self, JoinHandle},
5958
},
@@ -82,6 +81,7 @@ pub struct TvuSockets {
8281
pub ancestor_hashes_requests: UdpSocket,
8382
}
8483

84+
#[derive(Default)]
8585
pub struct TvuConfig {
8686
pub max_ledger_shreds: Option<u64>,
8787
pub shred_version: u16,
@@ -90,22 +90,7 @@ pub struct TvuConfig {
9090
// Validators which should be given priority when serving repairs
9191
pub repair_whitelist: Arc<RwLock<HashSet<Pubkey>>>,
9292
pub wait_for_vote_to_start_leader: bool,
93-
pub replay_forks_threads: NonZeroUsize,
94-
pub replay_transactions_threads: NonZeroUsize,
95-
}
96-
97-
impl Default for TvuConfig {
98-
fn default() -> Self {
99-
Self {
100-
max_ledger_shreds: None,
101-
shred_version: 0,
102-
repair_validators: None,
103-
repair_whitelist: Arc::new(RwLock::new(HashSet::default())),
104-
wait_for_vote_to_start_leader: false,
105-
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
106-
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
107-
}
108-
}
93+
pub replay_slots_concurrently: bool,
10994
}
11095

11196
impl Tvu {
@@ -280,8 +265,7 @@ impl Tvu {
280265
ancestor_hashes_replay_update_sender,
281266
tower_storage: tower_storage.clone(),
282267
wait_to_vote_slot,
283-
replay_forks_threads: tvu_config.replay_forks_threads,
284-
replay_transactions_threads: tvu_config.replay_transactions_threads,
268+
replay_slots_concurrently: tvu_config.replay_slots_concurrently,
285269
};
286270

287271
let (voting_sender, voting_receiver) = unbounded();

core/src/validator.rs

+3-11
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ use {
7474
poh_service::{self, PohService},
7575
},
7676
solana_program_runtime::runtime_config::RuntimeConfig,
77-
solana_rayon_threadlimit::get_max_thread_count,
7877
solana_rpc::{
7978
max_slots::MaxSlots,
8079
optimistically_confirmed_bank_tracker::{
@@ -124,7 +123,6 @@ use {
124123
std::{
125124
collections::{HashMap, HashSet},
126125
net::SocketAddr,
127-
num::NonZeroUsize,
128126
path::{Path, PathBuf},
129127
sync::{
130128
atomic::{AtomicBool, AtomicU64, Ordering},
@@ -262,15 +260,14 @@ pub struct ValidatorConfig {
262260
pub wait_to_vote_slot: Option<Slot>,
263261
pub ledger_column_options: LedgerColumnOptions,
264262
pub runtime_config: RuntimeConfig,
263+
pub replay_slots_concurrently: bool,
265264
pub banking_trace_dir_byte_limit: banking_trace::DirByteLimit,
266265
pub block_verification_method: BlockVerificationMethod,
267266
pub block_production_method: BlockProductionMethod,
268267
pub generator_config: Option<GeneratorConfig>,
269268
pub use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup,
270269
pub wen_restart_proto_path: Option<PathBuf>,
271270
pub unified_scheduler_handler_threads: Option<usize>,
272-
pub replay_forks_threads: NonZeroUsize,
273-
pub replay_transactions_threads: NonZeroUsize,
274271
}
275272

276273
impl Default for ValidatorConfig {
@@ -331,15 +328,14 @@ impl Default for ValidatorConfig {
331328
wait_to_vote_slot: None,
332329
ledger_column_options: LedgerColumnOptions::default(),
333330
runtime_config: RuntimeConfig::default(),
331+
replay_slots_concurrently: false,
334332
banking_trace_dir_byte_limit: 0,
335333
block_verification_method: BlockVerificationMethod::default(),
336334
block_production_method: BlockProductionMethod::default(),
337335
generator_config: None,
338336
use_snapshot_archives_at_startup: UseSnapshotArchivesAtStartup::default(),
339337
wen_restart_proto_path: None,
340338
unified_scheduler_handler_threads: None,
341-
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
342-
replay_transactions_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
343339
}
344340
}
345341
}
@@ -350,9 +346,6 @@ impl ValidatorConfig {
350346
enforce_ulimit_nofile: false,
351347
rpc_config: JsonRpcConfig::default_for_test(),
352348
block_production_method: BlockProductionMethod::ThreadLocalMultiIterator,
353-
replay_forks_threads: NonZeroUsize::new(1).expect("1 is non-zero"),
354-
replay_transactions_threads: NonZeroUsize::new(get_max_thread_count())
355-
.expect("thread count is non-zero"),
356349
..Self::default()
357350
}
358351
}
@@ -1312,8 +1305,7 @@ impl Validator {
13121305
repair_validators: config.repair_validators.clone(),
13131306
repair_whitelist: config.repair_whitelist.clone(),
13141307
wait_for_vote_to_start_leader,
1315-
replay_forks_threads: config.replay_forks_threads,
1316-
replay_transactions_threads: config.replay_transactions_threads,
1308+
replay_slots_concurrently: config.replay_slots_concurrently,
13171309
},
13181310
&max_slots,
13191311
block_metadata_notifier,

local-cluster/src/validator_configs.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -61,15 +61,14 @@ pub fn safe_clone_config(config: &ValidatorConfig) -> ValidatorConfig {
6161
wait_to_vote_slot: config.wait_to_vote_slot,
6262
ledger_column_options: config.ledger_column_options.clone(),
6363
runtime_config: config.runtime_config.clone(),
64+
replay_slots_concurrently: config.replay_slots_concurrently,
6465
banking_trace_dir_byte_limit: config.banking_trace_dir_byte_limit,
6566
block_verification_method: config.block_verification_method.clone(),
6667
block_production_method: config.block_production_method.clone(),
6768
generator_config: config.generator_config.clone(),
6869
use_snapshot_archives_at_startup: config.use_snapshot_archives_at_startup,
6970
wen_restart_proto_path: config.wen_restart_proto_path.clone(),
7071
unified_scheduler_handler_threads: config.unified_scheduler_handler_threads,
71-
replay_forks_threads: config.replay_forks_threads,
72-
replay_transactions_threads: config.replay_transactions_threads,
7372
}
7473
}
7574

validator/src/cli.rs

+5-14
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,6 @@ use {
5252
std::{path::PathBuf, str::FromStr},
5353
};
5454

55-
pub mod thread_args;
56-
use thread_args::{thread_args, DefaultThreadArgs};
57-
5855
const EXCLUDE_KEY: &str = "account-index-exclude-key";
5956
const INCLUDE_KEY: &str = "account-index-include-key";
6057
// The default minimal snapshot download speed (bytes/second)
@@ -1469,6 +1466,11 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
14691466
.value_name("BYTES")
14701467
.help("Maximum number of bytes written to the program log before truncation"),
14711468
)
1469+
.arg(
1470+
Arg::with_name("replay_slots_concurrently")
1471+
.long("replay-slots-concurrently")
1472+
.help("Allow concurrent replay of slots on different forks"),
1473+
)
14721474
.arg(
14731475
Arg::with_name("banking_trace_dir_byte_limit")
14741476
// expose friendly alternative name to cli than internal
@@ -1553,7 +1555,6 @@ pub fn app<'a>(version: &'a str, default_args: &'a DefaultArgs) -> App<'a, 'a> {
15531555
",
15541556
),
15551557
)
1556-
.args(&thread_args(&default_args.thread_args))
15571558
.args(&get_deprecated_arguments())
15581559
.after_help("The default subcommand is run")
15591560
.subcommand(
@@ -2072,13 +2073,6 @@ fn deprecated_arguments() -> Vec<DeprecatedArg> {
20722073
.long("no-rocksdb-compaction")
20732074
.takes_value(false)
20742075
.help("Disable manual compaction of the ledger database"));
2075-
add_arg!(
2076-
Arg::with_name("replay_slots_concurrently")
2077-
.long("replay-slots-concurrently")
2078-
.help("Allow concurrent replay of slots on different forks")
2079-
.conflicts_with("replay_forks_threads"),
2080-
replaced_by: "replay_forks_threads",
2081-
usage_warning: "Equivalent behavior to this flag would be --replay-forks-threads 4");
20822076
add_arg!(Arg::with_name("rocksdb_compaction_interval")
20832077
.long("rocksdb-compaction-interval-slots")
20842078
.value_name("ROCKSDB_COMPACTION_INTERVAL_SLOTS")
@@ -2201,8 +2195,6 @@ pub struct DefaultArgs {
22012195
pub banking_trace_dir_byte_limit: String,
22022196

22032197
pub wen_restart_path: String,
2204-
2205-
pub thread_args: DefaultThreadArgs,
22062198
}
22072199

22082200
impl DefaultArgs {
@@ -2285,7 +2277,6 @@ impl DefaultArgs {
22852277
wait_for_restart_window_max_delinquent_stake: "5".to_string(),
22862278
banking_trace_dir_byte_limit: BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT.to_string(),
22872279
wen_restart_path: "wen_restart_progress.proto".to_string(),
2288-
thread_args: DefaultThreadArgs::default(),
22892280
}
22902281
}
22912282
}

validator/src/cli/thread_args.rs

-115
This file was deleted.

0 commit comments

Comments
 (0)