From 4e0e9f3985925d62e63b0882042664b32827d518 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 14 Oct 2025 00:25:31 +0800 Subject: [PATCH 1/2] initial pass --- apps/desktop/src-tauri/src/api.rs | 2 -- apps/desktop/src-tauri/src/upload.rs | 22 ++++++++++++++++--- .../app/api/upload/[...route]/multipart.ts | 6 ++--- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/apps/desktop/src-tauri/src/api.rs b/apps/desktop/src-tauri/src/api.rs index 92a2ba2030..a4b56a840b 100644 --- a/apps/desktop/src-tauri/src/api.rs +++ b/apps/desktop/src-tauri/src/api.rs @@ -48,7 +48,6 @@ pub async fn upload_multipart_presign_part( video_id: &str, upload_id: &str, part_number: u32, - md5_sum: &str, ) -> Result { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] @@ -64,7 +63,6 @@ pub async fn upload_multipart_presign_part( "videoId": video_id, "uploadId": upload_id, "partNumber": part_number, - "md5Sum": md5_sum })) }) .await diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index d0bb1a535d..43b5c0884c 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -607,6 +607,11 @@ fn multipart_uploader( try_stream! { let mut stream = pin!(stream); let mut prev_part_number = None; + + let mut optimistic_presigned_url = + (1, api::upload_multipart_presign_part(&app, &video_id, &upload_id, 1) + .await?); + while let Some(item) = stream.next().await { let Chunk { total_size, part_number, chunk } = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; debug!("Uploading chunk {part_number} for video {video_id:?}"); @@ -614,9 +619,14 @@ fn multipart_uploader( let md5_sum = base64::encode(md5::compute(&chunk).0); let size = chunk.len(); - let presigned_url = - api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, &md5_sum) - .await?; + // The optimistically generated `upload_multipart_presign_part` could be wrong. + // In that case throw it out and generate a new correct one. + if optimistic_presigned_url.0 != part_number { + warn!("Throwing out optimistic presigned URL for part {part_number} as part {part_number} was requested!"); + optimistic_presigned_url = (part_number, api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) + .await?); + } + let (part_number, presigned_url) = optimistic_presigned_url; let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?; let resp = retryable_client(url.host().unwrap_or("").to_string()) @@ -644,6 +654,12 @@ fn multipart_uploader( size, total_size }; + + // We generate the presigned URL ahead of time. + // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, we aren't just doing nothing. + optimistic_presigned_url = + (part_number + 1, api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number + 1) + .await?); } } } diff --git a/apps/web/app/api/upload/[...route]/multipart.ts b/apps/web/app/api/upload/[...route]/multipart.ts index 89c8d200f6..284013861b 100644 --- a/apps/web/app/api/upload/[...route]/multipart.ts +++ b/apps/web/app/api/upload/[...route]/multipart.ts @@ -99,18 +99,19 @@ app.post( .object({ uploadId: z.string(), partNumber: z.number(), - md5Sum: z.string(), }) .and( z.union([ z.object({ videoId: z.string() }), // deprecated z.object({ fileKey: z.string() }), + // deprecated + // z.object({ md5Sum: z.string() }), ]), ), ), async (c) => { - const { uploadId, partNumber, md5Sum, ...body } = c.req.valid("json"); + const { uploadId, partNumber, ...body } = c.req.valid("json"); const user = c.get("user"); const fileKey = parseVideoIdOrFileKey(user.id, { @@ -132,7 +133,6 @@ app.post( fileKey, uploadId, partNumber, - { ContentMD5: md5Sum }, ); return presignedUrl; From f4af36cc1186891165a5d322f084ad677a8b307c Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 14 Oct 2025 00:32:18 +0800 Subject: [PATCH 2/2] improved --- apps/desktop/src-tauri/src/upload.rs | 66 +++++++++++++++++++++------- 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 43b5c0884c..d2d5989bed 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -608,9 +608,19 @@ fn multipart_uploader( let mut stream = pin!(stream); let mut prev_part_number = None; - let mut optimistic_presigned_url = - (1, api::upload_multipart_presign_part(&app, &video_id, &upload_id, 1) - .await?); + let mut optimistic_presigned_url_task: Option>> = Some( + tokio::spawn({ + let app = app.clone(); + let video_id = video_id.clone(); + let upload_id = upload_id.clone(); + + async move { + api::upload_multipart_presign_part(&app, &video_id, &upload_id, 1) + .await + } + }) + ); + let mut expected_part_number = 1u32; while let Some(item) = stream.next().await { let Chunk { total_size, part_number, chunk } = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; @@ -619,14 +629,27 @@ fn multipart_uploader( let md5_sum = base64::encode(md5::compute(&chunk).0); let size = chunk.len(); - // The optimistically generated `upload_multipart_presign_part` could be wrong. - // In that case throw it out and generate a new correct one. - if optimistic_presigned_url.0 != part_number { - warn!("Throwing out optimistic presigned URL for part {part_number} as part {part_number} was requested!"); - optimistic_presigned_url = (part_number, api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) - .await?); - } - let (part_number, presigned_url) = optimistic_presigned_url; + let presigned_url = if expected_part_number == part_number { + // The optimistic presigned URL matches, wait for it + if let Some(task) = optimistic_presigned_url_task.take() { + task.await + .map_err(|e| format!("uploader/part/{part_number}/task_join: {e:?}"))? + .map_err(|e| format!("uploader/part/{part_number}/presign: {e}"))? + } else { + // Fallback if no task available + api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) + .await? + } + } else { + // The optimistic presigned URL doesn't match, abort it and generate a new correct one + if let Some(task) = optimistic_presigned_url_task.take() { + task.abort(); + } + debug!("Throwing out optimistic presigned URL for part {expected_part_number} as part {part_number} was requested!"); + expected_part_number = part_number; + api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) + .await? + }; let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?; let resp = retryable_client(url.host().unwrap_or("").to_string()) @@ -655,11 +678,24 @@ fn multipart_uploader( total_size }; - // We generate the presigned URL ahead of time. + // We generate the presigned URL ahead of time for the next expected part. // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, we aren't just doing nothing. - optimistic_presigned_url = - (part_number + 1, api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number + 1) - .await?); + expected_part_number = part_number + 1; + optimistic_presigned_url_task = Some(tokio::spawn({ + let app = app.clone(); + let video_id = video_id.clone(); + let upload_id = upload_id.clone(); + + async move { + api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number) + .await + } + })); + } + + // Clean up any remaining optimistic task + if let Some(task) = optimistic_presigned_url_task.take() { + task.abort(); } } }