From 922a7d9d1d4ab070bf6713d6921781db8f0ae87b Mon Sep 17 00:00:00 2001 From: nhz2 Date: Wed, 27 Aug 2025 17:08:48 -0400 Subject: [PATCH 1/5] work --- Project.toml | 4 +++ src/TranscodingStreams.jl | 1 + src/chunkcodecs.jl | 75 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+) create mode 100644 src/chunkcodecs.jl diff --git a/Project.toml b/Project.toml index 61de90ea..93ea0b7d 100644 --- a/Project.toml +++ b/Project.toml @@ -4,5 +4,9 @@ license = "MIT" authors = ["Kenta Sato "] version = "0.11.3" +[deps] +ChunkCodecCore = "0b6fb165-00bc-4d37-ab8b-79f91016dbe1" + [compat] +ChunkCodecCore = "0.6" julia = "1.6" diff --git a/src/TranscodingStreams.jl b/src/TranscodingStreams.jl index 7ebf3534..5015e57f 100644 --- a/src/TranscodingStreams.jl +++ b/src/TranscodingStreams.jl @@ -16,5 +16,6 @@ include("stream.jl") include("io.jl") include("noop.jl") include("transcode.jl") +include("chunkcodecs.jl") end # module diff --git a/src/chunkcodecs.jl b/src/chunkcodecs.jl new file mode 100644 index 00000000..dfd1c0c9 --- /dev/null +++ b/src/chunkcodecs.jl @@ -0,0 +1,75 @@ +# Allow a `TranscodingStreams.Codec` to be used as a decoder. +# Specific `TranscodingStreams.Codec` types may want to specialize `try_find_decoded_size` + +using ChunkCodecCore: + check_in_range, + check_contiguous, + grow_dst!, + MaybeSize, + NOT_SIZE +import ChunkCodecCore: + try_decode!, + try_resize_decode!, + try_find_decoded_size + +function try_find_decoded_size(::Codec, src::AbstractVector{UInt8})::Nothing + nothing +end + +function try_decode!(d::Codec, dst::AbstractVector{UInt8}, src::AbstractVector{UInt8}; kwargs...)::MaybeSize + try_resize_decode!(d, dst, src, Int64(length(dst))) +end + +function try_resize_decode!(d::Codec, dst::AbstractVector{UInt8}, src::AbstractVector{UInt8}, max_size::Int64; kwargs...)::MaybeSize + dst_size::Int64 = length(dst) + src_size::Int64 = length(src) + src_left::Int64 = src_size + dst_left::Int64 = dst_size + check_contiguous(dst) + check_contiguous(src) + cconv_src = Base.cconvert(Ptr{UInt8}, src) + err = Error() + # This outer loop is to decode a concatenation of multiple compressed streams. + while true + code = startproc(codec, :write, err) + if code === :error + @goto handle_error + end + if pledgeinsize(codec, src_left, err) === :error + @goto handle_error + end + # This is the process loop + while true + GC.@preserve cconv_src begin + local src_p = Base.unsafe_convert(Ptr{UInt8}, cconv_src) + local input = Memory(src_p, src_left) + # ensure minoutsize is provided + local dst_space_needed = minoutsize(d, input) + if dst_space_needed > dst_left + local next_size = grow_dst!(dst, max_size) + if isnothing(next_size) || + # Uh oh no more room to grow dst + # but we don't know if the processing is done. + # Allocate some extra output to check. + local scratch_dst = zeros(UInt8, dst_space_needed) + end + dst_left += next_size - dst_size + dst_size = next_size + @assert dst_left > 0 + + end + # dst may get resized, so cconvert needs to be redone on each iteration. + cconv_dst = Base.cconvert(Ptr{UInt8}, dst) + GC.@preserve cconv_dst begin + local dst_p = Base.unsafe_convert(Ptr{UInt8}, cconv_dst) + local output = Memory(dst_p, dst_left) + end + end + end + end + @label handle_error + if !haserror(err) + set_default_error!(err) + end + throw(err[]) +end From c07f640b6c3e14bb840b56f648c1e0ae9772b1e1 Mon Sep 17 00:00:00 2001 From: nhz2 Date: Thu, 28 Aug 2025 09:49:11 -0400 Subject: [PATCH 2/5] try resize decode --- src/chunkcodecs.jl | 77 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 19 deletions(-) diff --git a/src/chunkcodecs.jl b/src/chunkcodecs.jl index dfd1c0c9..48072027 100644 --- a/src/chunkcodecs.jl +++ b/src/chunkcodecs.jl @@ -16,11 +16,11 @@ function try_find_decoded_size(::Codec, src::AbstractVector{UInt8})::Nothing nothing end -function try_decode!(d::Codec, dst::AbstractVector{UInt8}, src::AbstractVector{UInt8}; kwargs...)::MaybeSize - try_resize_decode!(d, dst, src, Int64(length(dst))) +function try_decode!(codec::Codec, dst::AbstractVector{UInt8}, src::AbstractVector{UInt8}; kwargs...)::MaybeSize + try_resize_decode!(codec, dst, src, Int64(length(dst))) end -function try_resize_decode!(d::Codec, dst::AbstractVector{UInt8}, src::AbstractVector{UInt8}, max_size::Int64; kwargs...)::MaybeSize +function try_resize_decode!(codec::Codec, dst::AbstractVector{UInt8}, src::AbstractVector{UInt8}, max_size::Int64; kwargs...)::MaybeSize dst_size::Int64 = length(dst) src_size::Int64 = length(src) src_left::Int64 = src_size @@ -29,40 +29,79 @@ function try_resize_decode!(d::Codec, dst::AbstractVector{UInt8}, src::AbstractV check_contiguous(src) cconv_src = Base.cconvert(Ptr{UInt8}, src) err = Error() - # This outer loop is to decode a concatenation of multiple compressed streams. + # Outer loop to decode a concatenation of multiple compressed streams. while true - code = startproc(codec, :write, err) - if code === :error + if startproc(codec, :write, err) === :error @goto handle_error end if pledgeinsize(codec, src_left, err) === :error @goto handle_error end - # This is the process loop + # The process loop while true GC.@preserve cconv_src begin - local src_p = Base.unsafe_convert(Ptr{UInt8}, cconv_src) + local src_p = Base.unsafe_convert(Ptr{UInt8}, cconv_src) + (src_size - src_left) local input = Memory(src_p, src_left) # ensure minoutsize is provided - local dst_space_needed = minoutsize(d, input) - if dst_space_needed > dst_left + local dst_space_needed = minoutsize(codec, input) + while dst_space_needed > dst_left local next_size = grow_dst!(dst, max_size) - if isnothing(next_size) || - # Uh oh no more room to grow dst - # but we don't know if the processing is done. - # Allocate some extra output to check. - local scratch_dst = zeros(UInt8, dst_space_needed) + if isnothing(next_size) + break end dst_left += next_size - dst_size dst_size = next_size @assert dst_left > 0 - end - # dst may get resized, so cconvert needs to be redone on each iteration. cconv_dst = Base.cconvert(Ptr{UInt8}, dst) GC.@preserve cconv_dst begin - local dst_p = Base.unsafe_convert(Ptr{UInt8}, cconv_dst) - local output = Memory(dst_p, dst_left) + local dst_p = Base.unsafe_convert(Ptr{UInt8}, cconv_dst) + (dst_size - dst_left) + if dst_space_needed > dst_left + # Try to do the decoding into a scratch buffer + # The scratch buffer should typically be just a few bytes. + local scratch_dst = zeros(UInt8, dst_space_needed) + local cconv_scratch_dst = Base.cconvert(Ptr{UInt8}, scratch_dst) + GC.@preserve cconv_scratch_dst let + local scratch_p = Base.unsafe_convert(Ptr{UInt8}, cconv_scratch_dst) + local output = Memory(scratch_p, dst_space_needed) + local Δin, Δout, code = process(codec, input, output, err) + if code === :error + @goto handle_error + end + local valid_Δout = min(Δout, dst_left) + src_left -= Δin + unsafe_copyto!(dst_p, scratch_p, valid_Δout) + if Δout > dst_left + # Ran out of dst space + return NOT_SIZE + end + dst_left -= Δout + if code === :end + if src_left > 0 + break # out of the process loop to decode next stream + else + # Done + return dst_size - dst_left + end + end + end + else + local output = Memory(dst_p, dst_left) + local Δin, Δout, code = process(codec, input, output, err) + if code === :error + @goto handle_error + end + src_left -= Δin + dst_left -= Δout + if code === :end + if src_left > 0 + break # out of the process loop to decode next stream + else + # Done + return dst_size - dst_left + end + end + end end end end From 3190c35492320aa23a6a42e32882806425b02d2d Mon Sep 17 00:00:00 2001 From: nhz2 Date: Thu, 28 Aug 2025 18:46:51 -0400 Subject: [PATCH 3/5] Add tests --- src/chunkcodecs.jl | 6 ++++-- test/Project.toml | 4 +++- test/codecdoubleframe.jl | 25 +++++++++++++++++++++++++ test/codecnoop.jl | 18 ++++++++++++++++++ 4 files changed, 50 insertions(+), 3 deletions(-) diff --git a/src/chunkcodecs.jl b/src/chunkcodecs.jl index 48072027..289564af 100644 --- a/src/chunkcodecs.jl +++ b/src/chunkcodecs.jl @@ -1,5 +1,4 @@ # Allow a `TranscodingStreams.Codec` to be used as a decoder. -# Specific `TranscodingStreams.Codec` types may want to specialize `try_find_decoded_size` using ChunkCodecCore: check_in_range, @@ -12,6 +11,7 @@ import ChunkCodecCore: try_resize_decode!, try_find_decoded_size +# `Codec` subtypes may want to specialize `try_find_decoded_size` function try_find_decoded_size(::Codec, src::AbstractVector{UInt8})::Nothing nothing end @@ -47,7 +47,7 @@ function try_resize_decode!(codec::Codec, dst::AbstractVector{UInt8}, src::Abstr while dst_space_needed > dst_left local next_size = grow_dst!(dst, max_size) if isnothing(next_size) - break + break # reached max_size limit end dst_left += next_size - dst_size dst_size = next_size @@ -59,6 +59,8 @@ function try_resize_decode!(codec::Codec, dst::AbstractVector{UInt8}, src::Abstr if dst_space_needed > dst_left # Try to do the decoding into a scratch buffer # The scratch buffer should typically be just a few bytes. + # This enables handling the `return NOT_SIZE` case + # while respecting the `minoutsize` restrictions. local scratch_dst = zeros(UInt8, dst_space_needed) local cconv_scratch_dst = Base.cconvert(Ptr{UInt8}, scratch_dst) GC.@preserve cconv_scratch_dst let diff --git a/test/Project.toml b/test/Project.toml index afed488a..cb8ebfcc 100644 --- a/test/Project.toml +++ b/test/Project.toml @@ -1,5 +1,7 @@ [deps] Aqua = "4c88cf16-eb10-579e-8560-4a9242c79595" +ChunkCodecCore = "0b6fb165-00bc-4d37-ab8b-79f91016dbe1" +ChunkCodecTests = "06b1ce50-b741-4199-b118-ba5fe1a70fa7" FillArrays = "1a297f60-69ca-5386-bcde-b61e274b549b" OffsetArrays = "6fe1bfb0-de20-5000-8ca7-80f57d26f881" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" @@ -8,5 +10,5 @@ TestsForCodecPackages = "c2e61002-3542-480d-8b3c-5f05cc4f8554" TranscodingStreams = "3bb67fe8-82b1-5028-8e26-92a6c54297fa" [sources] -TranscodingStreams = {path = ".."} TestsForCodecPackages = {path = "../lib/TestsForCodecPackages"} +TranscodingStreams = {path = ".."} diff --git a/test/codecdoubleframe.jl b/test/codecdoubleframe.jl index 385dd15a..ea3ea143 100644 --- a/test/codecdoubleframe.jl +++ b/test/codecdoubleframe.jl @@ -15,6 +15,10 @@ using TestsForCodecPackages: test_chunked_read, test_chunked_write, test_reuse_encoder +import ChunkCodecCore +using ChunkCodecTests: + ChunkCodecTests, + test_encoder_decoder # An insane codec for testing the codec APIs. struct DoubleFrameEncoder <: TranscodingStreams.Codec @@ -454,6 +458,27 @@ DoubleFrameDecoderStream(stream::IO; kwargs...) = TranscodingStream(DoubleFrameD close(stream) end + @testset "chunk codec tests" begin + # create a encoder + function ChunkCodecCore.try_encode!(e::DoubleFrameEncoder, dst::AbstractVector{UInt8}, src::AbstractVector{UInt8}; kwargs...) + ChunkCodecCore.try_decode!(e, dst, src) + end + function ChunkCodecCore.decoded_size_range(e::DoubleFrameEncoder) + Int64(0):Int64(1):typemax(Int64)-Int64(1) + end + function ChunkCodecCore.encode_bound(e::DoubleFrameEncoder, src_size::Int64)::Int64 + clamp(widen(clamp(widemul(src_size, Int64(2)), Int64)) + widen(Int64(4)), Int64) + end + ChunkCodecCore.can_concatenate(::DoubleFrameDecoder) = true + encoder = DoubleFrameEncoder() + TranscodingStreams.initialize(encoder) + decoder = DoubleFrameDecoder() + TranscodingStreams.initialize(decoder) + test_encoder_decoder(encoder, decoder; trials=10) + TranscodingStreams.finalize(encoder) + TranscodingStreams.finalize(decoder) + end + test_roundtrip_read(DoubleFrameEncoderStream, DoubleFrameDecoderStream) test_roundtrip_write(DoubleFrameEncoderStream, DoubleFrameDecoderStream) test_roundtrip_lines(DoubleFrameEncoderStream, DoubleFrameDecoderStream) diff --git a/test/codecnoop.jl b/test/codecnoop.jl index 4ba9674e..ca0bdc25 100644 --- a/test/codecnoop.jl +++ b/test/codecnoop.jl @@ -1,5 +1,10 @@ using OffsetArrays: OffsetArray using FillArrays: Zeros +using TranscodingStreams: + TranscodingStreams, + TranscodingStream, + Noop, + NoopStream using TestsForCodecPackages: test_roundtrip_read, test_roundtrip_write, @@ -9,6 +14,11 @@ using TestsForCodecPackages: test_roundtrip_fileio, test_chunked_read, test_chunked_write +import ChunkCodecCore +using ChunkCodecTests: + ChunkCodecTests, + test_encoder_decoder +using Test @testset "Noop Codec" begin source = IOBuffer("") @@ -584,4 +594,12 @@ using TestsForCodecPackages: close(stream) end + @testset "chunk codec tests" begin + # create a encoder + encoder = ChunkCodecCore.NoopEncodeOptions() + decoder = Noop() + TranscodingStreams.initialize(decoder) + test_encoder_decoder(encoder, decoder; trials=100) + TranscodingStreams.finalize(decoder) + end end From 4d997d253bef4055368429c358bc426a79e55fce Mon Sep 17 00:00:00 2001 From: nhz2 Date: Thu, 28 Aug 2025 23:07:44 -0400 Subject: [PATCH 4/5] make use of `expectedsize` like `transcode` --- src/chunkcodecs.jl | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/src/chunkcodecs.jl b/src/chunkcodecs.jl index 289564af..877df87f 100644 --- a/src/chunkcodecs.jl +++ b/src/chunkcodecs.jl @@ -23,11 +23,23 @@ end function try_resize_decode!(codec::Codec, dst::AbstractVector{UInt8}, src::AbstractVector{UInt8}, max_size::Int64; kwargs...)::MaybeSize dst_size::Int64 = length(dst) src_size::Int64 = length(src) - src_left::Int64 = src_size - dst_left::Int64 = dst_size check_contiguous(dst) check_contiguous(src) cconv_src = Base.cconvert(Ptr{UInt8}, src) + if dst_size < max_size + # Do the equivalent of `_default_output_buffer(codec, input)` in `transcode` + # by resizing `dst` + expected_dst_size::Int64 = GC.@preserve(cconv_src, min(initial_output_size( + codec, + Memory(Base.unsafe_convert(Ptr{UInt8}, cconv_src), src_size), + ), max_size)) + if expected_dst_size > dst_size + resize!(dst, expected_dst_size) + dst_size = expected_dst_size + end + end + src_left::Int64 = src_size + dst_left::Int64 = dst_size err = Error() # Outer loop to decode a concatenation of multiple compressed streams. while true From d42a6f2f299adaf4b07bf9cd70c861cd60c5cdc8 Mon Sep 17 00:00:00 2001 From: nhz2 Date: Sun, 31 Aug 2025 00:27:47 -0400 Subject: [PATCH 5/5] Bump ChunkCodecCore version --- Project.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Project.toml b/Project.toml index 93ea0b7d..40a0a595 100644 --- a/Project.toml +++ b/Project.toml @@ -8,5 +8,5 @@ version = "0.11.3" ChunkCodecCore = "0b6fb165-00bc-4d37-ab8b-79f91016dbe1" [compat] -ChunkCodecCore = "0.6" +ChunkCodecCore = "1" julia = "1.6"