Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

Sets the `ic_env` cookie for all HTML files only if the canister environment changed in the `commit_batch` method.

- Module hash: c156183e6a6f5c3c71ecde5f38a7280b770ff172b4827127ef03b89b764065ba
Use canister self-calls to avoid hitting instruction limits during `commit_batch`, `compute_evidence`, and `compute_state_hash`.

- Module hash: 63d122d0149a29f4e48603efdd7d2bce656a6a83bac1e3207897c68e8e225bb6
- https://github.com/dfinity/sdk/pull/4450
- https://github.com/dfinity/sdk/pull/4446

# 0.30.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,6 @@ impl<'agent> ChunkUploader<'agent> {
self.chunks.load(Ordering::SeqCst)
}

/// Get total size of chunks by their canister chunk IDs
pub(crate) async fn get_canister_chunk_total_size(&self, canister_chunk_ids: &[Nat]) -> usize {
let sizes = self.canister_chunk_sizes.lock().await;
canister_chunk_ids
.iter()
.filter_map(|id| sizes.get(id))
.sum()
}

/// Call only after `finalize_upload` has completed.
pub(crate) async fn uploader_ids_to_canister_chunk_ids(
&self,
Expand Down
55 changes: 11 additions & 44 deletions src/canisters/frontend/ic-asset/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ use walkdir::WalkDir;
const KNOWN_DIRECTORIES: [&str; 1] = [".well-known"];

/// Sets the contents of the asset canister to the contents of a directory, including deleting old assets.
pub async fn upload_content_and_assemble_sync_operations<'a>(
canister: &Canister<'a>,
pub async fn upload_content_and_assemble_sync_operations(
canister: &Canister<'_>,
canister_api_version: u16,
dirs: &[&Path],
no_delete: bool,
mode: batch_upload::plumbing::Mode,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<(CommitBatchArguments, ChunkUploader<'a>), UploadContentError> {
) -> Result<CommitBatchArguments, UploadContentError> {
if let Some(progress) = progress {
progress.set_state(AssetSyncState::GatherAssetDescriptors);
}
Expand Down Expand Up @@ -144,7 +144,7 @@ pub async fn upload_content_and_assemble_sync_operations<'a>(
// -vv
trace!(logger, "Value of CommitBatch: {:?}", commit_batch_args);

Ok((commit_batch_args, chunk_uploader))
Ok(commit_batch_args)
}

/// Sets the contents of the asset canister to the contents of a directory, including deleting old assets.
Expand All @@ -156,7 +156,7 @@ pub async fn sync(
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<(), SyncError> {
let canister_api_version = api_version(canister).await;
let (commit_batch_args, chunk_uploader) = upload_content_and_assemble_sync_operations(
let commit_batch_args = upload_content_and_assemble_sync_operations(
canister,
canister_api_version,
dirs,
Expand All @@ -180,7 +180,7 @@ pub async fn sync(
warn!(logger, "The asset canister is running an old version of the API. It will not be able to set assets properties.");
commit_batch(canister, commit_batch_args_v0).await
}
BATCH_UPLOAD_API_VERSION.. => commit_in_stages(canister, commit_batch_args, &chunk_uploader, logger, progress).await,
BATCH_UPLOAD_API_VERSION.. => commit_in_stages(canister, commit_batch_args, logger, progress).await,
}.map_err(CommitBatchFailed)?;
if let Some(progress) = progress {
progress.set_state(AssetSyncState::Done);
Expand All @@ -193,48 +193,36 @@ pub async fn sync(
/// Batches are created based on three conditions (any of which triggers a new batch):
/// 1. 500 operations reached - generally respected limit to avoid too much cert tree work
/// 2. 1.5MB of header map data reached - headers are the largest part of ingress message size
/// 3. 100MB of total chunk size reached - the full asset content gets hashed in the commit message
async fn create_commit_batches<'a>(
operations: Vec<BatchOperationKind>,
chunk_uploader: &ChunkUploader<'a>,
) -> Vec<Vec<BatchOperationKind>> {
fn create_commit_batches(operations: Vec<BatchOperationKind>) -> Vec<Vec<BatchOperationKind>> {
const MAX_OPERATIONS_PER_BATCH: usize = 500; // empirically this works good enough
const MAX_HEADER_MAP_SIZE: usize = 1_500_000; // 1.5 MB leaves plenty of room for other data that we do not calculate precisely
const MAX_ASSET_CONTENT_SIZE: usize = 100_000_000; // 100 MB is ~20% of how much data we can hash in a single message: 40b instructions per update call, measured best case of 80 instructions per byte hashed -> ~500MB limit

let mut batches = Vec::new();
let mut current_batch = Vec::new();
let mut operation_count = 0;
let mut header_map_size = 0;
let mut content_size = 0;

for operation in operations {
let operation_header_size = calculate_header_size(&operation);
let operation_chunk_size = calculate_content_size(&operation, chunk_uploader).await;

// Check if adding this operation would exceed any limits
let would_exceed_operation_limit = operation_count >= MAX_OPERATIONS_PER_BATCH;
let would_exceed_header_limit =
header_map_size + operation_header_size >= MAX_HEADER_MAP_SIZE;
let would_exceed_chunk_limit =
content_size + operation_chunk_size >= MAX_ASSET_CONTENT_SIZE;

if (would_exceed_operation_limit || would_exceed_header_limit || would_exceed_chunk_limit)
&& !current_batch.is_empty()
if (would_exceed_operation_limit || would_exceed_header_limit) && !current_batch.is_empty()
{
// Start a new batch
batches.push(current_batch);
current_batch = Vec::new();
operation_count = 0;
header_map_size = 0;
content_size = 0;
}

// Add operation to current batch
current_batch.push(operation);
operation_count += 1;
header_map_size += operation_header_size;
content_size += operation_chunk_size;
}

// Add the last batch if it has any operations
Expand Down Expand Up @@ -262,38 +250,17 @@ fn calculate_header_size(operation: &BatchOperationKind) -> usize {
}
}

/// Calculate the size in bytes of chunk data for an operation.
/// This includes both:
/// - Chunks referenced by `chunk_ids` (looked up from ChunkUploader)
/// - The `last_chunk` field which is included directly in the commit message
async fn calculate_content_size<'a>(
operation: &BatchOperationKind,
chunk_uploader: &ChunkUploader<'a>,
) -> usize {
match operation {
BatchOperationKind::SetAssetContent(args) => {
let chunk_ids_size = chunk_uploader
.get_canister_chunk_total_size(&args.chunk_ids)
.await;
let last_chunk_size = args.last_chunk.as_ref().map_or(0, |chunk| chunk.len());
chunk_ids_size + last_chunk_size
}
_ => 0,
}
}

async fn commit_in_stages<'a>(
async fn commit_in_stages(
canister: &Canister<'_>,
commit_batch_args: CommitBatchArguments,
chunk_uploader: &ChunkUploader<'a>,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<(), AgentError> {
if let Some(progress) = progress {
progress.set_total_batch_operations(commit_batch_args.operations.len());
}

let batches = create_commit_batches(commit_batch_args.operations, chunk_uploader).await;
let batches = create_commit_batches(commit_batch_args.operations);

for operations in batches {
let op_amount = operations.len();
Expand Down Expand Up @@ -330,7 +297,7 @@ pub async fn prepare_sync_for_proposal(
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<(Nat, ByteBuf), PrepareSyncForProposalError> {
let canister_api_version = api_version(canister).await;
let (arg, _chunk_uploader) = upload_content_and_assemble_sync_operations(
let arg = upload_content_and_assemble_sync_operations(
canister,
canister_api_version,
dirs,
Expand Down
104 changes: 73 additions & 31 deletions src/canisters/frontend/ic-certified-assets/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
CallbackFunc, HttpRequest, HttpResponse, StreamingCallbackHttpResponse,
StreamingCallbackToken,
},
state_machine::{AssetDetails, CertifiedTree, EncodedAsset, State},
state_machine::{AssetDetails, CertifiedTree, ComputationStatus, EncodedAsset, State},
system_context::SystemContext,
types::*,
};
Expand Down Expand Up @@ -203,15 +203,18 @@ pub fn clear() {
});
}

pub fn commit_batch(arg: CommitBatchArguments) {
pub async fn commit_batch(arg: CommitBatchArguments) {
let system_context = SystemContext::new();
let arg_ref = &arg;

with_state_mut(|s| {
if let Err(msg) = s.commit_batch(arg, &system_context) {
trap(&msg);
}
certified_data_set(s.root_hash());
});
loop_with_message_extension_until_completion(|progress| {
with_state_mut(|s| s.commit_batch(arg_ref, progress, &system_context))
})
.await
.map_err(|msg| trap(&msg))
.ok();

with_state_mut(|s| certified_data_set(s.root_hash()));
}

pub fn propose_commit_batch(arg: CommitBatchArguments) {
Expand All @@ -223,32 +226,41 @@ pub fn propose_commit_batch(arg: CommitBatchArguments) {
});
}

pub fn compute_evidence(arg: ComputeEvidenceArguments) -> Option<ic_certified_assets_ByteBuf> {
with_state_mut(|s| match s.compute_evidence(arg) {
Err(msg) => trap(&msg),
Ok(maybe_evidence) => maybe_evidence,
pub async fn compute_evidence(
arg: ComputeEvidenceArguments,
) -> Option<ic_certified_assets_ByteBuf> {
let arg_ref = &arg;
loop_with_message_extension_until_completion(|_progress| {
with_state_mut(|s| s.compute_evidence(arg_ref))
})
.await
.ok()
}

pub fn compute_state_hash() -> Option<String> {
let system_context = SystemContext::new();

with_state_mut(|s| s.compute_state_hash(&system_context))
pub async fn compute_state_hash() -> Option<String> {
loop_with_message_extension_until_completion(|_progress| {
with_state_mut(|s| s.compute_state_hash())
})
.await
.ok()
}

pub fn get_state_info() -> StateInfo {
with_state(|s| s.get_state_info())
}

pub fn commit_proposed_batch(arg: CommitProposedBatchArguments) {
pub async fn commit_proposed_batch(arg: CommitProposedBatchArguments) {
let system_context = SystemContext::new();
let arg_ref = &arg;

with_state_mut(|s| {
if let Err(msg) = s.commit_proposed_batch(arg, &system_context) {
trap(&msg);
}
certified_data_set(s.root_hash());
});
loop_with_message_extension_until_completion(|progress| {
with_state_mut(|s| s.commit_proposed_batch(arg_ref, progress, &system_context))
})
.await
.map_err(|msg| trap(&msg))
.ok();

with_state_mut(|s| certified_data_set(s.root_hash()));
}

pub fn validate_commit_proposed_batch(arg: CommitProposedBatchArguments) -> Result<String, String> {
Expand Down Expand Up @@ -428,6 +440,36 @@ where
STATE.with(|s| f(&s.borrow()))
}

/// Loops calling a state machine function until completion, periodically async-calling
/// self to reset the instruction counter when needed.
async fn loop_with_message_extension_until_completion<F, D, P, E>(mut compute_fn: F) -> Result<D, E>
where
F: FnMut(P) -> ComputationStatus<D, P, E>,
P: Default,
{
const INSTRUCTION_THRESHOLD: u64 = 35_000_000_000; // At the time of writing, 40b instructions are the limit for single message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this limit low enough? The worst single hashing operation I saw was 3b instructions (press-kit.zip).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be fine as we now hash a single chunk at a time (limited by the ingress message size) and no longer an entire file. From what I measured a single chunk should not take more than ~2MB*80 instructions

I also tested this with a 700MB file, which took ~50b instructions to hash

let mut progress = P::default();

loop {
match compute_fn(progress) {
ComputationStatus::Done(done) => return Ok(done),
ComputationStatus::InProgress(p) => {
progress = p;
if ic_cdk::api::performance_counter(0) > INSTRUCTION_THRESHOLD {
// Reset instruction counter 0 by doing a bogus self-call
// (self-calls are most likely to be short-circuited by the scheduler so we don't incur more wait time than necessary)
let _ = ic_cdk::call::Call::bounded_wait(
ic_cdk::api::canister_self(),
"__this-FunctionDoes_not-Exist",
)
.await;
}
}
ComputationStatus::Error(e) => return Err(e),
}
}
}

/// Exports the whole asset canister interface, but does not handle init/pre_/post_upgrade for initial configuration or state persistence across upgrades.
///
/// For a working example how to use this macro, see [here](https://github.com/dfinity/sdk/blob/master/src/canisters/frontend/ic-frontend-canister/src/lib.rs).
Expand Down Expand Up @@ -636,8 +678,8 @@ macro_rules! export_canister_methods {

#[$crate::ic_certified_assets_update(guard = "__ic_certified_assets_can_commit")]
#[$crate::ic_certified_assets_candid_method(update)]
fn commit_batch(arg: types::CommitBatchArguments) {
$crate::commit_batch(arg)
async fn commit_batch(arg: types::CommitBatchArguments) {
$crate::commit_batch(arg).await
}

#[$crate::ic_certified_assets_update(guard = "__ic_certified_assets_can_prepare")]
Expand All @@ -648,16 +690,16 @@ macro_rules! export_canister_methods {

#[$crate::ic_certified_assets_update(guard = "__ic_certified_assets_can_prepare")]
#[$crate::ic_certified_assets_candid_method(update)]
fn compute_evidence(
async fn compute_evidence(
arg: types::ComputeEvidenceArguments,
) -> Option<ic_certified_assets_ByteBuf> {
$crate::compute_evidence(arg)
$crate::compute_evidence(arg).await
}

#[$crate::ic_certified_assets_update]
#[$crate::ic_certified_assets_candid_method(update)]
fn compute_state_hash() -> Option<String> {
$crate::compute_state_hash()
async fn compute_state_hash() -> Option<String> {
$crate::compute_state_hash().await
}

#[$crate::ic_certified_assets_query]
Expand All @@ -668,8 +710,8 @@ macro_rules! export_canister_methods {

#[$crate::ic_certified_assets_update(guard = "__ic_certified_assets_can_commit")]
#[$crate::ic_certified_assets_candid_method(update)]
fn commit_proposed_batch(arg: types::CommitProposedBatchArguments) {
$crate::commit_proposed_batch(arg)
async fn commit_proposed_batch(arg: types::CommitProposedBatchArguments) {
$crate::commit_proposed_batch(arg).await
}

#[$crate::ic_certified_assets_update]
Expand Down
Loading