Skip to content

Commit 70c1554

Browse files
authored
Merge branch 'main' into com-library-removed
2 parents 1c75de9 + 7ab46a0 commit 70c1554

File tree

13 files changed

+413
-71
lines changed

13 files changed

+413
-71
lines changed

apps/freenet-ping/app/tests/common/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ pub async fn base_node_test_config_with_rng(
127127
network_port: public_port, // if None, node will pick a free one or use default
128128
bandwidth_limit: None,
129129
blocked_addresses,
130+
transient_budget: None,
131+
transient_ttl_secs: None,
130132
},
131133
config_paths: freenet::config::ConfigPathsArgs {
132134
config_dir: Some(temp_dir.path().to_path_buf()),

apps/freenet-ping/app/tests/run_app_blocked_peers.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -244,23 +244,29 @@ async fn run_blocked_peers_test(config: BlockedPeersConfig) -> TestResult {
244244
let (stream_node2, _) = connect_async(&uri_node2).await?;
245245
let mut client_node2 = WebApi::start(stream_node2);
246246

247-
// Load contract code
247+
// Compile/load contract code (same helper used by other app tests)
248248
let path_to_code = std::path::PathBuf::from(PACKAGE_DIR).join(PATH_TO_CONTRACT);
249-
tracing::info!(path=%path_to_code.display(), "Loading contract code");
250-
let code = std::fs::read(path_to_code)
251-
.ok()
252-
.ok_or_else(|| anyhow!("Failed to read contract code"))?;
253-
let code_hash = CodeHash::from_code(&code);
249+
tracing::info!(path = %path_to_code.display(), "Loading contract code");
250+
251+
// First compile to compute the code hash, then rebuild options with the correct code_key
252+
let temp_options = PingContractOptions {
253+
frequency: Duration::from_secs(3),
254+
ttl: Duration::from_secs(30),
255+
tag: APP_TAG.to_string(),
256+
code_key: String::new(),
257+
};
258+
let temp_params = Parameters::from(serde_json::to_vec(&temp_options).unwrap());
259+
let temp_container = common::load_contract(&path_to_code, temp_params)?;
260+
let code_hash = CodeHash::from_code(temp_container.data());
254261

255-
// Define contract options
256262
let ping_options = PingContractOptions {
257263
frequency: Duration::from_secs(3),
258264
ttl: Duration::from_secs(30),
259265
tag: APP_TAG.to_string(),
260266
code_key: code_hash.to_string(),
261267
};
262268
let params = Parameters::from(serde_json::to_vec(&ping_options).unwrap());
263-
let container = ContractContainer::try_from((code, &params))?;
269+
let container = common::load_contract(&path_to_code, params)?;
264270
let contract_key = container.key();
265271

266272
// Gateway puts the contract

crates/core/src/config/mod.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ pub(crate) const PCK_VERSION: &str = env!("CARGO_PKG_VERSION");
4545
// Initialize the executor once.
4646
static ASYNC_RT: LazyLock<Option<Runtime>> = LazyLock::new(GlobalExecutor::initialize_async_rt);
4747

48+
const DEFAULT_TRANSIENT_BUDGET: usize = 32;
49+
const DEFAULT_TRANSIENT_TTL_SECS: u64 = 30;
50+
4851
const QUALIFIER: &str = "";
4952
const ORGANIZATION: &str = "The Freenet Project Inc";
5053
const APPLICATION: &str = "Freenet";
@@ -97,6 +100,8 @@ impl Default for ConfigArgs {
97100
location: None,
98101
bandwidth_limit: Some(3_000_000), // 3 MB/s default for streaming transfers only
99102
blocked_addresses: None,
103+
transient_budget: Some(DEFAULT_TRANSIENT_BUDGET),
104+
transient_ttl_secs: Some(DEFAULT_TRANSIENT_TTL_SECS),
100105
},
101106
ws_api: WebsocketApiArgs {
102107
address: Some(default_listening_address()),
@@ -361,6 +366,14 @@ impl ConfigArgs {
361366
.network_api
362367
.blocked_addresses
363368
.map(|addrs| addrs.into_iter().collect()),
369+
transient_budget: self
370+
.network_api
371+
.transient_budget
372+
.unwrap_or(DEFAULT_TRANSIENT_BUDGET),
373+
transient_ttl_secs: self
374+
.network_api
375+
.transient_ttl_secs
376+
.unwrap_or(DEFAULT_TRANSIENT_TTL_SECS),
364377
},
365378
ws_api: WebsocketApiConfig {
366379
// the websocket API is always local
@@ -542,6 +555,16 @@ pub struct NetworkArgs {
542555
/// List of IP:port addresses to refuse connections to/from.
543556
#[arg(long, num_args = 0..)]
544557
pub blocked_addresses: Option<Vec<SocketAddr>>,
558+
559+
/// Maximum number of concurrent transient connections accepted by a gateway.
560+
#[arg(long, env = "TRANSIENT_BUDGET")]
561+
#[serde(rename = "transient-budget", skip_serializing_if = "Option::is_none")]
562+
pub transient_budget: Option<usize>,
563+
564+
/// Time (in seconds) before an unpromoted transient connection is dropped.
565+
#[arg(long, env = "TRANSIENT_TTL_SECS")]
566+
#[serde(rename = "transient-ttl-secs", skip_serializing_if = "Option::is_none")]
567+
pub transient_ttl_secs: Option<u64>,
545568
}
546569

547570
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -608,6 +631,14 @@ pub struct NetworkApiConfig {
608631
/// List of IP:port addresses to refuse connections to/from.
609632
#[serde(skip_serializing_if = "Option::is_none")]
610633
pub blocked_addresses: Option<HashSet<SocketAddr>>,
634+
635+
/// Maximum number of concurrent transient connections accepted by a gateway.
636+
#[serde(default = "default_transient_budget", rename = "transient-budget")]
637+
pub transient_budget: usize,
638+
639+
/// Time (in seconds) before an unpromoted transient connection is dropped.
640+
#[serde(default = "default_transient_ttl_secs", rename = "transient-ttl-secs")]
641+
pub transient_ttl_secs: u64,
611642
}
612643

613644
mod port_allocation;
@@ -617,6 +648,14 @@ pub fn default_network_api_port() -> u16 {
617648
find_available_port().unwrap_or(31337) // Fallback to 31337 if we can't find a random port
618649
}
619650

651+
fn default_transient_budget() -> usize {
652+
DEFAULT_TRANSIENT_BUDGET
653+
}
654+
655+
fn default_transient_ttl_secs() -> u64 {
656+
DEFAULT_TRANSIENT_TTL_SECS
657+
}
658+
620659
#[derive(clap::Parser, Debug, Default, Copy, Clone, Serialize, Deserialize)]
621660
pub struct WebsocketApiArgs {
622661
/// Address to bind to for the websocket API, default is 0.0.0.0

crates/core/src/node/mod.rs

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ pub struct NodeConfig {
128128
pub(crate) max_upstream_bandwidth: Option<Rate>,
129129
pub(crate) max_downstream_bandwidth: Option<Rate>,
130130
pub(crate) blocked_addresses: Option<HashSet<SocketAddr>>,
131+
pub(crate) transient_budget: usize,
132+
pub(crate) transient_ttl: Duration,
131133
}
132134

133135
impl NodeConfig {
@@ -195,6 +197,8 @@ impl NodeConfig {
195197
max_upstream_bandwidth: None,
196198
max_downstream_bandwidth: None,
197199
blocked_addresses: config.network_api.blocked_addresses.clone(),
200+
transient_budget: config.network_api.transient_budget,
201+
transient_ttl: Duration::from_secs(config.network_api.transient_ttl_secs),
198202
})
199203
}
200204

@@ -1147,35 +1151,51 @@ async fn handle_aborted_op(
11471151
gateways: &[PeerKeyLocation],
11481152
) -> Result<(), OpError> {
11491153
use crate::util::IterExt;
1150-
if let TransactionType::Connect = tx.transaction_type() {
1151-
// attempt to establish a connection failed, this could be a fatal error since the node
1152-
// is useless without connecting to the network, we will retry with exponential backoff
1153-
// if necessary
1154-
match op_manager.pop(&tx) {
1155-
Ok(Some(OpEnum::Connect(op)))
1156-
if op.has_backoff()
1157-
&& op_manager.ring.open_connections()
1158-
< op_manager.ring.connection_manager.min_connections =>
1159-
{
1160-
let gateway = op.gateway().cloned();
1161-
if let Some(gateway) = gateway {
1162-
tracing::warn!("Retry connecting to gateway {}", gateway.peer);
1163-
connect::join_ring_request(None, &gateway, op_manager).await?;
1154+
match tx.transaction_type() {
1155+
TransactionType::Connect => {
1156+
// attempt to establish a connection failed, this could be a fatal error since the node
1157+
// is useless without connecting to the network, we will retry with exponential backoff
1158+
// if necessary
1159+
match op_manager.pop(&tx) {
1160+
Ok(Some(OpEnum::Connect(op)))
1161+
if op.has_backoff()
1162+
&& op_manager.ring.open_connections()
1163+
< op_manager.ring.connection_manager.min_connections =>
1164+
{
1165+
let gateway = op.gateway().cloned();
1166+
if let Some(gateway) = gateway {
1167+
tracing::warn!("Retry connecting to gateway {}", gateway.peer);
1168+
connect::join_ring_request(None, &gateway, op_manager).await?;
1169+
}
1170+
}
1171+
Ok(Some(OpEnum::Connect(_))) => {
1172+
if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() {
1173+
tracing::warn!("Retrying joining the ring with an other gateway");
1174+
if let Some(gateway) = gateways.iter().shuffle().next() {
1175+
connect::join_ring_request(None, gateway, op_manager).await?
1176+
}
1177+
}
1178+
}
1179+
Ok(Some(other)) => {
1180+
op_manager.push(tx, other).await?;
11641181
}
1182+
_ => {}
11651183
}
1166-
Ok(Some(OpEnum::Connect(_))) => {
1167-
if op_manager.ring.open_connections() == 0 && op_manager.ring.is_gateway() {
1168-
tracing::warn!("Retrying joining the ring with an other gateway");
1169-
if let Some(gateway) = gateways.iter().shuffle().next() {
1170-
connect::join_ring_request(None, gateway, op_manager).await?
1184+
}
1185+
TransactionType::Get => match op_manager.pop(&tx) {
1186+
Ok(Some(OpEnum::Get(op))) => {
1187+
if let Err(err) = op.handle_abort(op_manager).await {
1188+
if !matches!(err, OpError::StatePushed) {
1189+
return Err(err);
11711190
}
11721191
}
11731192
}
11741193
Ok(Some(other)) => {
11751194
op_manager.push(tx, other).await?;
11761195
}
11771196
_ => {}
1178-
}
1197+
},
1198+
_ => {}
11791199
}
11801200
Ok(())
11811201
}

0 commit comments

Comments
 (0)