From 4e0e9f3985925d62e63b0882042664b32827d518 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 14 Oct 2025 00:25:31 +0800 Subject: [PATCH 1/7] initial pass --- apps/desktop/src-tauri/src/api.rs | 2 -- apps/desktop/src-tauri/src/upload.rs | 22 ++++++++++++++++--- .../app/api/upload/[...route]/multipart.ts | 6 ++--- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/apps/desktop/src-tauri/src/api.rs b/apps/desktop/src-tauri/src/api.rs index 92a2ba2030..a4b56a840b 100644 --- a/apps/desktop/src-tauri/src/api.rs +++ b/apps/desktop/src-tauri/src/api.rs @@ -48,7 +48,6 @@ pub async fn upload_multipart_presign_part( video_id: &str, upload_id: &str, part_number: u32, - md5_sum: &str, ) -> Result { #[derive(Deserialize)] #[serde(rename_all = "camelCase")] @@ -64,7 +63,6 @@ pub async fn upload_multipart_presign_part( "videoId": video_id, "uploadId": upload_id, "partNumber": part_number, - "md5Sum": md5_sum })) }) .await diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index d0bb1a535d..43b5c0884c 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -607,6 +607,11 @@ fn multipart_uploader( try_stream! { let mut stream = pin!(stream); let mut prev_part_number = None; + + let mut optimistic_presigned_url = + (1, api::upload_multipart_presign_part(&app, &video_id, &upload_id, 1) + .await?); + while let Some(item) = stream.next().await { let Chunk { total_size, part_number, chunk } = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; debug!("Uploading chunk {part_number} for video {video_id:?}"); @@ -614,9 +619,14 @@ fn multipart_uploader( let md5_sum = base64::encode(md5::compute(&chunk).0); let size = chunk.len(); - let presigned_url = - api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number, &md5_sum) - .await?; + // The optimistically generated `upload_multipart_presign_part` could be wrong. + // In that case throw it out and generate a new correct one. + if optimistic_presigned_url.0 != part_number { + warn!("Throwing out optimistic presigned URL for part {part_number} as part {part_number} was requested!"); + optimistic_presigned_url = (part_number, api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) + .await?); + } + let (part_number, presigned_url) = optimistic_presigned_url; let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?; let resp = retryable_client(url.host().unwrap_or("").to_string()) @@ -644,6 +654,12 @@ fn multipart_uploader( size, total_size }; + + // We generate the presigned URL ahead of time. + // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, we aren't just doing nothing. + optimistic_presigned_url = + (part_number + 1, api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number + 1) + .await?); } } } diff --git a/apps/web/app/api/upload/[...route]/multipart.ts b/apps/web/app/api/upload/[...route]/multipart.ts index 89c8d200f6..284013861b 100644 --- a/apps/web/app/api/upload/[...route]/multipart.ts +++ b/apps/web/app/api/upload/[...route]/multipart.ts @@ -99,18 +99,19 @@ app.post( .object({ uploadId: z.string(), partNumber: z.number(), - md5Sum: z.string(), }) .and( z.union([ z.object({ videoId: z.string() }), // deprecated z.object({ fileKey: z.string() }), + // deprecated + // z.object({ md5Sum: z.string() }), ]), ), ), async (c) => { - const { uploadId, partNumber, md5Sum, ...body } = c.req.valid("json"); + const { uploadId, partNumber, ...body } = c.req.valid("json"); const user = c.get("user"); const fileKey = parseVideoIdOrFileKey(user.id, { @@ -132,7 +133,6 @@ app.post( fileKey, uploadId, partNumber, - { ContentMD5: md5Sum }, ); return presignedUrl; From f4af36cc1186891165a5d322f084ad677a8b307c Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 14 Oct 2025 00:32:18 +0800 Subject: [PATCH 2/7] improved --- apps/desktop/src-tauri/src/upload.rs | 66 +++++++++++++++++++++------- 1 file changed, 51 insertions(+), 15 deletions(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 43b5c0884c..d2d5989bed 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -608,9 +608,19 @@ fn multipart_uploader( let mut stream = pin!(stream); let mut prev_part_number = None; - let mut optimistic_presigned_url = - (1, api::upload_multipart_presign_part(&app, &video_id, &upload_id, 1) - .await?); + let mut optimistic_presigned_url_task: Option>> = Some( + tokio::spawn({ + let app = app.clone(); + let video_id = video_id.clone(); + let upload_id = upload_id.clone(); + + async move { + api::upload_multipart_presign_part(&app, &video_id, &upload_id, 1) + .await + } + }) + ); + let mut expected_part_number = 1u32; while let Some(item) = stream.next().await { let Chunk { total_size, part_number, chunk } = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; @@ -619,14 +629,27 @@ fn multipart_uploader( let md5_sum = base64::encode(md5::compute(&chunk).0); let size = chunk.len(); - // The optimistically generated `upload_multipart_presign_part` could be wrong. - // In that case throw it out and generate a new correct one. - if optimistic_presigned_url.0 != part_number { - warn!("Throwing out optimistic presigned URL for part {part_number} as part {part_number} was requested!"); - optimistic_presigned_url = (part_number, api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) - .await?); - } - let (part_number, presigned_url) = optimistic_presigned_url; + let presigned_url = if expected_part_number == part_number { + // The optimistic presigned URL matches, wait for it + if let Some(task) = optimistic_presigned_url_task.take() { + task.await + .map_err(|e| format!("uploader/part/{part_number}/task_join: {e:?}"))? + .map_err(|e| format!("uploader/part/{part_number}/presign: {e}"))? + } else { + // Fallback if no task available + api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) + .await? + } + } else { + // The optimistic presigned URL doesn't match, abort it and generate a new correct one + if let Some(task) = optimistic_presigned_url_task.take() { + task.abort(); + } + debug!("Throwing out optimistic presigned URL for part {expected_part_number} as part {part_number} was requested!"); + expected_part_number = part_number; + api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) + .await? + }; let url = Uri::from_str(&presigned_url).map_err(|err| format!("uploader/part/{part_number}/invalid_url: {err:?}"))?; let resp = retryable_client(url.host().unwrap_or("").to_string()) @@ -655,11 +678,24 @@ fn multipart_uploader( total_size }; - // We generate the presigned URL ahead of time. + // We generate the presigned URL ahead of time for the next expected part. // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, we aren't just doing nothing. - optimistic_presigned_url = - (part_number + 1, api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number + 1) - .await?); + expected_part_number = part_number + 1; + optimistic_presigned_url_task = Some(tokio::spawn({ + let app = app.clone(); + let video_id = video_id.clone(); + let upload_id = upload_id.clone(); + + async move { + api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number) + .await + } + })); + } + + // Clean up any remaining optimistic task + if let Some(task) = optimistic_presigned_url_task.take() { + task.abort(); } } } From 880e480b10a3c3d354c02d62cddbd28e6c29502b Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Tue, 14 Oct 2025 15:28:22 +0800 Subject: [PATCH 3/7] fix --- apps/desktop/src-tauri/src/upload.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 5787cc889e..e02c7b80ab 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -619,7 +619,7 @@ fn multipart_uploader( let mut stream = pin!(stream); let mut prev_part_number = None; - let mut optimistic_presigned_url_task: Option>> = Some( + let mut optimistic_presigned_url_task: Option>> = Some( tokio::spawn({ let app = app.clone(); let video_id = video_id.clone(); From bc40b2ca1ca0c3072816fcd9a03294a8c61bf39e Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Wed, 15 Oct 2025 14:06:37 +0800 Subject: [PATCH 4/7] cleanup --- apps/desktop/src-tauri/src/upload.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 7165357f9c..0aa4a5941c 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -652,13 +652,11 @@ fn multipart_uploader( let size = chunk.len(); let presigned_url = if expected_part_number == part_number { - // The optimistic presigned URL matches, wait for it if let Some(task) = optimistic_presigned_url_task.take() { task.await .map_err(|e| format!("uploader/part/{part_number}/task_join: {e:?}"))? .map_err(|e| format!("uploader/part/{part_number}/presign: {e}"))? } else { - // Fallback if no task available api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) .await? } @@ -668,7 +666,6 @@ fn multipart_uploader( task.abort(); } debug!("Throwing out optimistic presigned URL for part {expected_part_number} as part {part_number} was requested!"); - expected_part_number = part_number; api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) .await? }; @@ -719,7 +716,6 @@ fn multipart_uploader( })); } - // Clean up any remaining optimistic task if let Some(task) = optimistic_presigned_url_task.take() { task.abort(); } From 292dc97df196e56aa7d80008942bd065c741630c Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Wed, 15 Oct 2025 14:45:08 +0800 Subject: [PATCH 5/7] cleanup code a bit --- apps/desktop/src-tauri/src/upload.rs | 63 +++++++--------------------- 1 file changed, 16 insertions(+), 47 deletions(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 0aa4a5941c..591000f842 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -14,7 +14,7 @@ use cap_project::{RecordingMeta, S3UploadMeta, UploadMeta}; use cap_utils::spawn_actor; use ffmpeg::ffi::AV_TIME_BASE; use flume::Receiver; -use futures::{Stream, StreamExt, TryStreamExt, stream}; +use futures::{Stream, StreamExt, TryStreamExt, future::join, stream}; use image::{ImageReader, codecs::jpeg::JpegEncoder}; use reqwest::StatusCode; use serde::{Deserialize, Serialize}; @@ -629,46 +629,31 @@ fn multipart_uploader( try_stream! { let mut stream = pin!(stream); let mut prev_part_number = None; + let mut expected_part_number = 1u32; - let mut optimistic_presigned_url_task: Option>> = Some( - tokio::spawn({ - let app = app.clone(); - let video_id = video_id.clone(); - let upload_id = upload_id.clone(); + loop { + let (Some(item), presigned_url) = join( + stream.next(), + // We generate the presigned URL ahead of time for the part we expect to come next. If it's not + // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, we aren't just doing nothing. + api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number) + ).await else { + break; + }; + let mut presigned_url = presigned_url?; - async move { - api::upload_multipart_presign_part(&app, &video_id, &upload_id, 1) - .await - } - }) - ); - let mut expected_part_number = 1u32; - while let Some(item) = stream.next().await { let Chunk { total_size, part_number, chunk } = item.map_err(|err| format!("uploader/part/{:?}/fs: {err:?}", prev_part_number.map(|p| p + 1)))?; trace!("Uploading chunk {part_number} ({} bytes) for video {video_id:?}", chunk.len()); prev_part_number = Some(part_number); let md5_sum = base64::encode(md5::compute(&chunk).0); let size = chunk.len(); - let presigned_url = if expected_part_number == part_number { - if let Some(task) = optimistic_presigned_url_task.take() { - task.await - .map_err(|e| format!("uploader/part/{part_number}/task_join: {e:?}"))? - .map_err(|e| format!("uploader/part/{part_number}/presign: {e}"))? - } else { - api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) - .await? - } - } else { - // The optimistic presigned URL doesn't match, abort it and generate a new correct one - if let Some(task) = optimistic_presigned_url_task.take() { - task.abort(); - } - debug!("Throwing out optimistic presigned URL for part {expected_part_number} as part {part_number} was requested!"); - api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) + // We prefetched for the wrong chunk. Let's try again. + if expected_part_number == part_number { + presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) .await? - }; + } trace!("Uploading part {part_number}"); @@ -701,23 +686,7 @@ fn multipart_uploader( total_size }; - // We generate the presigned URL ahead of time for the next expected part. - // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, we aren't just doing nothing. expected_part_number = part_number + 1; - optimistic_presigned_url_task = Some(tokio::spawn({ - let app = app.clone(); - let video_id = video_id.clone(); - let upload_id = upload_id.clone(); - - async move { - api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number) - .await - } - })); - } - - if let Some(task) = optimistic_presigned_url_task.take() { - task.abort(); } debug!("Completed multipart upload for {video_id:?} in {:?}", start.elapsed()); From 2f97e1a747cc69ead5fa227ac32278da2b5e54a6 Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Wed, 15 Oct 2025 15:05:35 +0800 Subject: [PATCH 6/7] thanks CodeRabbit --- apps/desktop/src-tauri/src/upload.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index 591000f842..f40c42050c 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -650,7 +650,7 @@ fn multipart_uploader( let size = chunk.len(); // We prefetched for the wrong chunk. Let's try again. - if expected_part_number == part_number { + if expected_part_number != part_number { presigned_url = api::upload_multipart_presign_part(&app, &video_id, &upload_id, part_number) .await? } From dea6aecbc0fd1e8fe927b0f21a97789074693a6d Mon Sep 17 00:00:00 2001 From: Oscar Beaumont Date: Wed, 15 Oct 2025 15:13:13 +0800 Subject: [PATCH 7/7] fix comment --- apps/desktop/src-tauri/src/upload.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/apps/desktop/src-tauri/src/upload.rs b/apps/desktop/src-tauri/src/upload.rs index f40c42050c..4f4527615b 100644 --- a/apps/desktop/src-tauri/src/upload.rs +++ b/apps/desktop/src-tauri/src/upload.rs @@ -634,8 +634,9 @@ fn multipart_uploader( loop { let (Some(item), presigned_url) = join( stream.next(), - // We generate the presigned URL ahead of time for the part we expect to come next. If it's not - // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, we aren't just doing nothing. + // We generate the presigned URL ahead of time for the part we expect to come next. + // If it's not the chunk that actually comes next we just throw it out. + // This means if the filesystem takes a while for the recording to reach previous total + CHUNK_SIZE, which is the common case, we aren't just doing nothing. api::upload_multipart_presign_part(&app, &video_id, &upload_id, expected_part_number) ).await else { break;