Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

Sets the `ic_env` cookie for all HTML files only if the canister environment changed in the `commit_batch` method.

- Module hash: c156183e6a6f5c3c71ecde5f38a7280b770ff172b4827127ef03b89b764065ba
Use canister self-calls to avoid hitting instruction limits during `commit_batch`, `compute_evidence`, and `compute_state_hash`.

- Module hash: 63d122d0149a29f4e48603efdd7d2bce656a6a83bac1e3207897c68e8e225bb6
- https://github.com/dfinity/sdk/pull/4450
- https://github.com/dfinity/sdk/pull/4446

# 0.30.2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,6 @@ impl<'agent> ChunkUploader<'agent> {
self.chunks.load(Ordering::SeqCst)
}

/// Get total size of chunks by their canister chunk IDs
pub(crate) async fn get_canister_chunk_total_size(&self, canister_chunk_ids: &[Nat]) -> usize {
let sizes = self.canister_chunk_sizes.lock().await;
canister_chunk_ids
.iter()
.filter_map(|id| sizes.get(id))
.sum()
}

/// Call only after `finalize_upload` has completed.
pub(crate) async fn uploader_ids_to_canister_chunk_ids(
&self,
Expand Down
55 changes: 11 additions & 44 deletions src/canisters/frontend/ic-asset/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ use walkdir::WalkDir;
const KNOWN_DIRECTORIES: [&str; 1] = [".well-known"];

/// Sets the contents of the asset canister to the contents of a directory, including deleting old assets.
pub async fn upload_content_and_assemble_sync_operations<'a>(
canister: &Canister<'a>,
pub async fn upload_content_and_assemble_sync_operations(
canister: &Canister<'_>,
canister_api_version: u16,
dirs: &[&Path],
no_delete: bool,
mode: batch_upload::plumbing::Mode,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<(CommitBatchArguments, ChunkUploader<'a>), UploadContentError> {
) -> Result<CommitBatchArguments, UploadContentError> {
if let Some(progress) = progress {
progress.set_state(AssetSyncState::GatherAssetDescriptors);
}
Expand Down Expand Up @@ -144,7 +144,7 @@ pub async fn upload_content_and_assemble_sync_operations<'a>(
// -vv
trace!(logger, "Value of CommitBatch: {:?}", commit_batch_args);

Ok((commit_batch_args, chunk_uploader))
Ok(commit_batch_args)
}

/// Sets the contents of the asset canister to the contents of a directory, including deleting old assets.
Expand All @@ -156,7 +156,7 @@ pub async fn sync(
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<(), SyncError> {
let canister_api_version = api_version(canister).await;
let (commit_batch_args, chunk_uploader) = upload_content_and_assemble_sync_operations(
let commit_batch_args = upload_content_and_assemble_sync_operations(
canister,
canister_api_version,
dirs,
Expand All @@ -180,7 +180,7 @@ pub async fn sync(
warn!(logger, "The asset canister is running an old version of the API. It will not be able to set assets properties.");
commit_batch(canister, commit_batch_args_v0).await
}
BATCH_UPLOAD_API_VERSION.. => commit_in_stages(canister, commit_batch_args, &chunk_uploader, logger, progress).await,
BATCH_UPLOAD_API_VERSION.. => commit_in_stages(canister, commit_batch_args, logger, progress).await,
}.map_err(CommitBatchFailed)?;
if let Some(progress) = progress {
progress.set_state(AssetSyncState::Done);
Expand All @@ -193,48 +193,36 @@ pub async fn sync(
/// Batches are created based on three conditions (any of which triggers a new batch):
/// 1. 500 operations reached - generally respected limit to avoid too much cert tree work
/// 2. 1.5MB of header map data reached - headers are the largest part of ingress message size
/// 3. 100MB of total chunk size reached - the full asset content gets hashed in the commit message
async fn create_commit_batches<'a>(
operations: Vec<BatchOperationKind>,
chunk_uploader: &ChunkUploader<'a>,
) -> Vec<Vec<BatchOperationKind>> {
fn create_commit_batches(operations: Vec<BatchOperationKind>) -> Vec<Vec<BatchOperationKind>> {
const MAX_OPERATIONS_PER_BATCH: usize = 500; // empirically this works good enough
const MAX_HEADER_MAP_SIZE: usize = 1_500_000; // 1.5 MB leaves plenty of room for other data that we do not calculate precisely
const MAX_ASSET_CONTENT_SIZE: usize = 100_000_000; // 100 MB is ~20% of how much data we can hash in a single message: 40b instructions per update call, measured best case of 80 instructions per byte hashed -> ~500MB limit

let mut batches = Vec::new();
let mut current_batch = Vec::new();
let mut operation_count = 0;
let mut header_map_size = 0;
let mut content_size = 0;

for operation in operations {
let operation_header_size = calculate_header_size(&operation);
let operation_chunk_size = calculate_content_size(&operation, chunk_uploader).await;

// Check if adding this operation would exceed any limits
let would_exceed_operation_limit = operation_count >= MAX_OPERATIONS_PER_BATCH;
let would_exceed_header_limit =
header_map_size + operation_header_size >= MAX_HEADER_MAP_SIZE;
let would_exceed_chunk_limit =
content_size + operation_chunk_size >= MAX_ASSET_CONTENT_SIZE;

if (would_exceed_operation_limit || would_exceed_header_limit || would_exceed_chunk_limit)
&& !current_batch.is_empty()
if (would_exceed_operation_limit || would_exceed_header_limit) && !current_batch.is_empty()
{
// Start a new batch
batches.push(current_batch);
current_batch = Vec::new();
operation_count = 0;
header_map_size = 0;
content_size = 0;
}

// Add operation to current batch
current_batch.push(operation);
operation_count += 1;
header_map_size += operation_header_size;
content_size += operation_chunk_size;
}

// Add the last batch if it has any operations
Expand Down Expand Up @@ -262,38 +250,17 @@ fn calculate_header_size(operation: &BatchOperationKind) -> usize {
}
}

/// Calculate the size in bytes of chunk data for an operation.
/// This includes both:
/// - Chunks referenced by `chunk_ids` (looked up from ChunkUploader)
/// - The `last_chunk` field which is included directly in the commit message
async fn calculate_content_size<'a>(
operation: &BatchOperationKind,
chunk_uploader: &ChunkUploader<'a>,
) -> usize {
match operation {
BatchOperationKind::SetAssetContent(args) => {
let chunk_ids_size = chunk_uploader
.get_canister_chunk_total_size(&args.chunk_ids)
.await;
let last_chunk_size = args.last_chunk.as_ref().map_or(0, |chunk| chunk.len());
chunk_ids_size + last_chunk_size
}
_ => 0,
}
}

async fn commit_in_stages<'a>(
async fn commit_in_stages(
canister: &Canister<'_>,
commit_batch_args: CommitBatchArguments,
chunk_uploader: &ChunkUploader<'a>,
logger: &Logger,
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<(), AgentError> {
if let Some(progress) = progress {
progress.set_total_batch_operations(commit_batch_args.operations.len());
}

let batches = create_commit_batches(commit_batch_args.operations, chunk_uploader).await;
let batches = create_commit_batches(commit_batch_args.operations);

for operations in batches {
let op_amount = operations.len();
Expand Down Expand Up @@ -330,7 +297,7 @@ pub async fn prepare_sync_for_proposal(
progress: Option<&dyn AssetSyncProgressRenderer>,
) -> Result<(Nat, ByteBuf), PrepareSyncForProposalError> {
let canister_api_version = api_version(canister).await;
let (arg, _chunk_uploader) = upload_content_and_assemble_sync_operations(
let arg = upload_content_and_assemble_sync_operations(
canister,
canister_api_version,
dirs,
Expand Down
104 changes: 73 additions & 31 deletions src/canisters/frontend/ic-certified-assets/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{
CallbackFunc, HttpRequest, HttpResponse, StreamingCallbackHttpResponse,
StreamingCallbackToken,
},
state_machine::{AssetDetails, CertifiedTree, EncodedAsset, State},
state_machine::{AssetDetails, CertifiedTree, ComputationStatus, EncodedAsset, State},
system_context::SystemContext,
types::*,
};
Expand Down Expand Up @@ -203,15 +203,18 @@ pub fn clear() {
});
}

pub fn commit_batch(arg: CommitBatchArguments) {
pub async fn commit_batch(arg: CommitBatchArguments) {
let system_context = SystemContext::new();
let arg_ref = &arg;

with_state_mut(|s| {
if let Err(msg) = s.commit_batch(arg, &system_context) {
trap(&msg);
}
certified_data_set(s.root_hash());
});
loop_with_message_extension_until_completion(|progress| {
with_state_mut(|s| s.commit_batch(arg_ref, progress, &system_context))
})
.await
.map_err(|msg| trap(&msg))
.ok();

with_state_mut(|s| certified_data_set(s.root_hash()));
}

pub fn propose_commit_batch(arg: CommitBatchArguments) {
Expand All @@ -223,32 +226,41 @@ pub fn propose_commit_batch(arg: CommitBatchArguments) {
});
}

pub fn compute_evidence(arg: ComputeEvidenceArguments) -> Option<ic_certified_assets_ByteBuf> {
with_state_mut(|s| match s.compute_evidence(arg) {
Err(msg) => trap(&msg),
Ok(maybe_evidence) => maybe_evidence,
pub async fn compute_evidence(
arg: ComputeEvidenceArguments,
) -> Option<ic_certified_assets_ByteBuf> {
let arg_ref = &arg;
loop_with_message_extension_until_completion(|_progress| {
with_state_mut(|s| s.compute_evidence(arg_ref))
})
.await
.ok()
}

pub fn compute_state_hash() -> Option<String> {
let system_context = SystemContext::new();

with_state_mut(|s| s.compute_state_hash(&system_context))
pub async fn compute_state_hash() -> Option<String> {
loop_with_message_extension_until_completion(|_progress| {
with_state_mut(|s| s.compute_state_hash())
})
.await
.ok()
}

pub fn get_state_info() -> StateInfo {
with_state(|s| s.get_state_info())
}

pub fn commit_proposed_batch(arg: CommitProposedBatchArguments) {
pub async fn commit_proposed_batch(arg: CommitProposedBatchArguments) {
let system_context = SystemContext::new();
let arg_ref = &arg;

with_state_mut(|s| {
if let Err(msg) = s.commit_proposed_batch(arg, &system_context) {
trap(&msg);
}
certified_data_set(s.root_hash());
});
loop_with_message_extension_until_completion(|progress| {
with_state_mut(|s| s.commit_proposed_batch(arg_ref, progress, &system_context))
})
.await
.map_err(|msg| trap(&msg))
.ok();

with_state_mut(|s| certified_data_set(s.root_hash()));
}

pub fn validate_commit_proposed_batch(arg: CommitProposedBatchArguments) -> Result<String, String> {
Expand Down Expand Up @@ -428,6 +440,36 @@ where
STATE.with(|s| f(&s.borrow()))
}

/// Loops calling a state machine function until completion, periodically async-calling
/// self to reset the instruction counter when needed.
async fn loop_with_message_extension_until_completion<F, D, P, E>(mut compute_fn: F) -> Result<D, E>
where
F: FnMut(P) -> ComputationStatus<D, P, E>,
P: Default,
{
const INSTRUCTION_THRESHOLD: u64 = 35_000_000_000; // At the time of writing, 40b instructions are the limit for single message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this limit low enough? The worst single hashing operation I saw was 3b instructions (press-kit.zip).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be fine as we now hash a single chunk at a time (limited by the ingress message size) and no longer an entire file. From what I measured a single chunk should not take more than ~2MB*80 instructions

I also tested this with a 700MB file, which took ~50b instructions to hash

let mut progress = P::default();

loop {
match compute_fn(progress) {
ComputationStatus::Done(done) => return Ok(done),
ComputationStatus::InProgress(p) => {
progress = p;
if ic_cdk::api::performance_counter(0) > INSTRUCTION_THRESHOLD {
// Reset instruction counter 0 by doing a bogus self-call
// (self-calls are most likely to be short-circuited by the scheduler so we don't incur more wait time than necessary)
let _ = ic_cdk::call::Call::bounded_wait(
ic_cdk::api::canister_self(),
"__this-FunctionDoes_not-Exist",
)
.await;
}
}
ComputationStatus::Error(e) => return Err(e),
}
}
}

/// Exports the whole asset canister interface, but does not handle init/pre_/post_upgrade for initial configuration or state persistence across upgrades.
///
/// For a working example how to use this macro, see [here](https://github.com/dfinity/sdk/blob/master/src/canisters/frontend/ic-frontend-canister/src/lib.rs).
Expand Down Expand Up @@ -636,8 +678,8 @@ macro_rules! export_canister_methods {

#[$crate::ic_certified_assets_update(guard = "__ic_certified_assets_can_commit")]
#[$crate::ic_certified_assets_candid_method(update)]
fn commit_batch(arg: types::CommitBatchArguments) {
$crate::commit_batch(arg)
async fn commit_batch(arg: types::CommitBatchArguments) {
$crate::commit_batch(arg).await
}

#[$crate::ic_certified_assets_update(guard = "__ic_certified_assets_can_prepare")]
Expand All @@ -648,16 +690,16 @@ macro_rules! export_canister_methods {

#[$crate::ic_certified_assets_update(guard = "__ic_certified_assets_can_prepare")]
#[$crate::ic_certified_assets_candid_method(update)]
fn compute_evidence(
async fn compute_evidence(
arg: types::ComputeEvidenceArguments,
) -> Option<ic_certified_assets_ByteBuf> {
$crate::compute_evidence(arg)
$crate::compute_evidence(arg).await
}

#[$crate::ic_certified_assets_update]
#[$crate::ic_certified_assets_candid_method(update)]
fn compute_state_hash() -> Option<String> {
$crate::compute_state_hash()
async fn compute_state_hash() -> Option<String> {
$crate::compute_state_hash().await
}

#[$crate::ic_certified_assets_query]
Expand All @@ -668,8 +710,8 @@ macro_rules! export_canister_methods {

#[$crate::ic_certified_assets_update(guard = "__ic_certified_assets_can_commit")]
#[$crate::ic_certified_assets_candid_method(update)]
fn commit_proposed_batch(arg: types::CommitProposedBatchArguments) {
$crate::commit_proposed_batch(arg)
async fn commit_proposed_batch(arg: types::CommitProposedBatchArguments) {
$crate::commit_proposed_batch(arg).await
}

#[$crate::ic_certified_assets_update]
Expand Down
Loading