diff --git a/crates/hyperqueue/src/common/serialization.rs b/crates/hyperqueue/src/common/serialization.rs index afc18b5ea..b6dd75a18 100644 --- a/crates/hyperqueue/src/common/serialization.rs +++ b/crates/hyperqueue/src/common/serialization.rs @@ -13,7 +13,7 @@ pub struct DefaultConfig; impl SerializationConfig for DefaultConfig { fn config() -> impl Options { - bincode::DefaultOptions::new() + bincode::DefaultOptions::new().with_limit(tako::MAX_FRAME_SIZE as u64) } } @@ -21,7 +21,9 @@ pub struct TrailingAllowedConfig; impl SerializationConfig for TrailingAllowedConfig { fn config() -> impl Options { - bincode::DefaultOptions::new().allow_trailing_bytes() + bincode::DefaultOptions::new() + .allow_trailing_bytes() + .with_limit(tako::MAX_FRAME_SIZE as u64) } } diff --git a/crates/hyperqueue/src/server/event/journal/read.rs b/crates/hyperqueue/src/server/event/journal/read.rs index 305d951ed..8c626ec00 100644 --- a/crates/hyperqueue/src/server/event/journal/read.rs +++ b/crates/hyperqueue/src/server/event/journal/read.rs @@ -59,7 +59,7 @@ impl Iterator for &mut JournalReader { fn next(&mut self) -> Option { self.position = self.source.stream_position().unwrap(); - if self.position == self.size { + if self.position >= self.size { return None; } match EventSerializationConfig::config().deserialize_from(&mut self.source) { diff --git a/crates/hyperqueue/src/transfer/protocol.rs b/crates/hyperqueue/src/transfer/protocol.rs index ab90c9103..29cc70936 100644 --- a/crates/hyperqueue/src/transfer/protocol.rs +++ b/crates/hyperqueue/src/transfer/protocol.rs @@ -4,5 +4,5 @@ use tokio_util::codec::length_delimited::Builder; pub fn make_protocol_builder() -> Builder { *LengthDelimitedCodec::builder() .little_endian() - .max_frame_length(128 * 1024 * 1024) + .max_frame_length(tako::MAX_FRAME_SIZE) } diff --git a/crates/tako/src/internal/tests/integration/utils/task.rs b/crates/tako/src/internal/tests/integration/utils/task.rs index 60dfce6ac..5d0e6a71f 100644 --- a/crates/tako/src/internal/tests/integration/utils/task.rs +++ b/crates/tako/src/internal/tests/integration/utils/task.rs @@ -100,6 +100,7 @@ pub fn build_task_def_from_config( cwd, }; let body = bincode::DefaultOptions::new() + .with_limit(crate::MAX_FRAME_SIZE as u64) .serialize(&program_def) .unwrap(); diff --git a/crates/tako/src/internal/tests/integration/utils/worker.rs b/crates/tako/src/internal/tests/integration/utils/worker.rs index 5bbe3d445..b789ca4a3 100644 --- a/crates/tako/src/internal/tests/integration/utils/worker.rs +++ b/crates/tako/src/internal/tests/integration/utils/worker.rs @@ -266,6 +266,7 @@ impl TaskLauncher for TestTaskLauncher { ctx.body().len(), ); bincode::DefaultOptions::new() + .with_limit(crate::MAX_FRAME_SIZE as u64) .deserialize(ctx.body()) .map_err(|_| DsError::GenericError("Body deserialization failed".into()))? }; diff --git a/crates/tako/src/internal/transfer/auth.rs b/crates/tako/src/internal/transfer/auth.rs index 9f5dc2a14..dc11956b9 100644 --- a/crates/tako/src/internal/transfer/auth.rs +++ b/crates/tako/src/internal/transfer/auth.rs @@ -244,6 +244,7 @@ where T: serde::Serialize + ?Sized, { DefaultOptions::new() + .with_limit(crate::MAX_FRAME_SIZE as u64) .with_fixint_encoding() .serialize(value) .map_err(|e| format!("Serialization failed: {e:?}").into()) @@ -255,6 +256,7 @@ where T: Deserialize<'a>, { DefaultOptions::new() + .with_limit(crate::MAX_FRAME_SIZE as u64) .with_fixint_encoding() .deserialize(bytes) .map_err(|e| format!("Deserialization failed: {e:?}, data {bytes:?}").into()) diff --git a/crates/tako/src/internal/transfer/transport.rs b/crates/tako/src/internal/transfer/transport.rs index aa3d898ae..13e69dd78 100644 --- a/crates/tako/src/internal/transfer/transport.rs +++ b/crates/tako/src/internal/transfer/transport.rs @@ -3,5 +3,5 @@ use tokio_util::codec::length_delimited::{Builder, LengthDelimitedCodec}; pub(crate) fn make_protocol_builder() -> Builder { *LengthDelimitedCodec::builder() .little_endian() - .max_frame_length(128 * 1024 * 1024) + .max_frame_length(crate::MAX_FRAME_SIZE) } diff --git a/crates/tako/src/lib.rs b/crates/tako/src/lib.rs index 165276861..c40034d07 100644 --- a/crates/tako/src/lib.rs +++ b/crates/tako/src/lib.rs @@ -26,6 +26,8 @@ pub type PriorityTuple = (Priority, Priority); // user priority, scheduler prior pub type Error = internal::common::error::DsError; pub type Result = std::result::Result; +pub const MAX_FRAME_SIZE: usize = 128 * 1024 * 1024; + pub mod resources { pub use crate::internal::common::resources::{ AMD_GPU_RESOURCE_NAME, Allocation, AllocationRequest, CPU_RESOURCE_ID, CPU_RESOURCE_NAME,