Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/kernel/src/file.erl
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ An IO device as returned by `open/2`.
Size :: non_neg_integer(),
Delay :: non_neg_integer()}
| 'delayed_write' | {'read_ahead', Size :: pos_integer()}
| 'read_ahead' | 'compressed' | 'compressed_one'
| 'read_ahead' | 'compressed' | 'compressed_one' |
{zstd, zstd:compress_parameters() |
zstd:decompress_parameters()}
| {'encoding', unicode:encoding()}
| sync.
-type deep_list() :: [char() | atom() | deep_list()].
Expand Down
1 change: 1 addition & 0 deletions lib/kernel/src/raw_file_io.erl
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ match_list(_Other) -> false.

match_compressed(compressed) -> true;
match_compressed(compressed_one) -> true;
match_compressed({zstd, _}) -> true;
match_compressed(_Other) -> false.

match_delayed({delayed_write, _Size, _Timeout}) -> true;
Expand Down
32 changes: 26 additions & 6 deletions lib/kernel/src/raw_file_io_deflate.erl
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,28 @@ init({Owner, Secret, [compressed]}) ->
monitor => Monitor,
secret => Secret,
position => 0,
zlib => Z },
{ok, opening, Data}.
deflate => fun(Data) -> zlib:deflate(Z, Data) end,
flush => fun() -> zlib:deflate(Z, [], finish) end },
{ok, opening, Data};
init({Owner, Secret, [{zstd, Parameters}]}) ->
Monitor = monitor(process, Owner),
try zstd:context(compress, Parameters) of
{ok, Z} ->
Data =
#{ owner => Owner,
monitor => Monitor,
secret => Secret,
position => 0,
deflate => fun(Data) -> zstd:compress(Data, Z) end,
flush => fun() ->
{done, Data} = zstd:finish(Z, []),
Data
end},
{ok, opening, Data}
catch
_:_ ->
{error, badarg}
end.

opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) ->
case raw_file_io:open(Filename, Modes) of
Expand Down Expand Up @@ -113,9 +133,9 @@ opened(_Event, _Request, _Data) ->
keep_state_and_data.

write(Data, IOVec) ->
#{ handle := PrivateFd, position := Position, zlib := Z } = Data,
#{ handle := PrivateFd, position := Position, deflate := Deflate } = Data,
UncompressedSize = iolist_size(IOVec),
case ?CALL_FD(PrivateFd, write, [zlib:deflate(Z, IOVec)]) of
case ?CALL_FD(PrivateFd, write, [Deflate(IOVec)]) of
ok -> {ok, Data#{ position := (Position + UncompressedSize) }};
Other -> Other
end.
Expand Down Expand Up @@ -152,8 +172,8 @@ position_1(#{ position := Current } = Data, Desired) when Current < Desired ->
position_1(#{ position := Current }, Desired) when Current > Desired ->
{error, einval}.

flush_deflate_state(#{ handle := PrivateFd, zlib := Z }) ->
case ?CALL_FD(PrivateFd, write, [zlib:deflate(Z, [], finish)]) of
flush_deflate_state(#{ handle := PrivateFd, flush := Flush }) ->
case ?CALL_FD(PrivateFd, write, [Flush()]) of
ok -> ok;
Other -> Other
end.
Expand Down
96 changes: 63 additions & 33 deletions lib/kernel/src/raw_file_io_inflate.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
-behavior(gen_statem).

-export([init/1, callback_mode/0, terminate/3]).
-export([opening/3, opened_gzip/3, opened_passthrough/3]).
-export([opening/3, opened_active/3, opened_passthrough/3]).

-include("file_int.hrl").

Expand All @@ -37,41 +37,71 @@ callback_mode() -> state_functions.
init({Owner, Secret, [compressed]}) ->
%% 'reset mode', which resets the inflate state at the end of every stream,
%% allowing us to read concatenated gzip files.
init(Owner, Secret, reset);
init_zlib(Owner, Secret, reset);
init({Owner, Secret, [compressed_one]}) ->
%% 'cut mode', which stops the inflate after one member
%% allowing us to read gzipped tar files
init(Owner, Secret, cut).
init_zlib(Owner, Secret, cut);
init({Owner, Secret, [{zstd, Parameters}]}) ->
%% 'cut mode', which stops the inflate after one member
%% allowing us to read gzipped tar files
init_zstd(Owner, Secret, Parameters).

init(Owner, Secret, Mode) ->
init_zlib(Owner, Secret, Mode) ->
Monitor = monitor(process, Owner),
%% We're using the undocumented inflateInit/3 to set the mode
Z = zlib:open(),
ok = zlib:inflateInit(Z, ?GZIP_WBITS, Mode),

Data =
#{ owner => Owner,
monitor => Monitor,
secret => Secret,
choose_state => fun(PrivateFd) ->
%% The old driver fell back to plain reads
%% if the file didn't start with the magic
%% gzip bytes.
State =
case ?CALL_FD(PrivateFd, read, [2]) of
{ok, <<16#1F, 16#8B>>} ->
opened_active;
_Other ->
opened_passthrough
end,
{ok, 0} = ?CALL_FD(PrivateFd, position, [0]),
State
end,
position => 0,
buffer => prim_buffer:new(),
zlib => Z },
inflate => fun(Data) -> zlib:inflate(Z, Data) end,
reset => fun() -> zlib:inflateReset(Z) end },
{ok, opening, Data}.

%% The old driver fell back to plain reads if the file didn't start with the
%% magic gzip bytes.
choose_decompression_state(PrivateFd) ->
State =
case ?CALL_FD(PrivateFd, read, [2]) of
{ok, <<16#1F, 16#8B>>} -> opened_gzip;
_Other -> opened_passthrough
end,
{ok, 0} = ?CALL_FD(PrivateFd, position, [0]),
State.
init_zstd(Owner, Secret, Parameters) ->
Monitor = monitor(process, Owner),
try zstd:context(decompress, Parameters) of
{ok, Z} ->
Data =
#{ owner => Owner,
monitor => Monitor,
secret => Secret,
choose_state => fun(_) -> opened_active end,
position => 0,
buffer => prim_buffer:new(),
inflate => fun(Data) -> zstd:decompress(Data, Z) end,
reset => fun() -> zstd:reset(Z) end},
{ok, opening, Data}
catch
_:_ ->
{error, badarg}
end.

opening({call, From}, {'$open', Secret, Filename, Modes}, #{ secret := Secret } = Data) ->
opening({call, From},
{'$open', Secret, Filename, Modes},
#{ secret := Secret, choose_state := ChooseState } = Data) ->
case raw_file_io:open(Filename, Modes) of
{ok, PrivateFd} ->
NextState = choose_decompression_state(PrivateFd),
NextState = ChooseState(PrivateFd),
NewData = Data#{ handle => PrivateFd },
{next_state, NextState, NewData, [{reply, From, ok}]};
Other ->
Expand Down Expand Up @@ -109,16 +139,16 @@ opened_passthrough(_Event, _Request, _Data) ->

%%

opened_gzip(info, {'DOWN', Monitor, process, _Owner, _Reason}, #{ monitor := Monitor }) ->
opened_active(info, {'DOWN', Monitor, process, _Owner, _Reason}, #{ monitor := Monitor }) ->
{stop, shutdown};

opened_gzip(info, _Message, _Data) ->
opened_active(info, _Message, _Data) ->
keep_state_and_data;

opened_gzip({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
opened_active({call, {Owner, _Tag} = From}, [close], #{ owner := Owner } = Data) ->
internal_close(From, Data);

opened_gzip({call, {Owner, _Tag} = From}, [position, Mark], #{ owner := Owner } = Data) ->
opened_active({call, {Owner, _Tag} = From}, [position, Mark], #{ owner := Owner } = Data) ->
case position(Data, Mark) of
{ok, NewData, Result} ->
Response = {ok, Result},
Expand All @@ -127,7 +157,7 @@ opened_gzip({call, {Owner, _Tag} = From}, [position, Mark], #{ owner := Owner }
{keep_state_and_data, [{reply, From, Other}]}
end;

opened_gzip({call, {Owner, _Tag} = From}, [read, Size], #{ owner := Owner } = Data) ->
opened_active({call, {Owner, _Tag} = From}, [read, Size], #{ owner := Owner } = Data) ->
case read(Data, Size) of
{ok, NewData, Result} ->
Response = {ok, Result},
Expand All @@ -136,7 +166,7 @@ opened_gzip({call, {Owner, _Tag} = From}, [read, Size], #{ owner := Owner } = Da
{keep_state_and_data, [{reply, From, Other}]}
end;

opened_gzip({call, {Owner, _Tag} = From}, [read_line], #{ owner := Owner } = Data) ->
opened_active({call, {Owner, _Tag} = From}, [read_line], #{ owner := Owner } = Data) ->
case read_line(Data) of
{ok, NewData, Result} ->
Response = {ok, Result},
Expand All @@ -145,20 +175,20 @@ opened_gzip({call, {Owner, _Tag} = From}, [read_line], #{ owner := Owner } = Dat
{keep_state_and_data, [{reply, From, Other}]}
end;

opened_gzip({call, {Owner, _Tag} = From}, [write, _IOData], #{ owner := Owner }) ->
opened_active({call, {Owner, _Tag} = From}, [write, _IOData], #{ owner := Owner }) ->
Response = {error, ebadf},
{keep_state_and_data, [{reply, From, Response}]};

opened_gzip({call, {Owner, _Tag} = From}, _Request, #{ owner := Owner }) ->
opened_active({call, {Owner, _Tag} = From}, _Request, #{ owner := Owner }) ->
Response = {error, enotsup},
{keep_state_and_data, [{reply, From, Response}]};

opened_gzip({call, _From}, _Request, _Data) ->
opened_active({call, _From}, _Request, _Data) ->
%% The client functions filter this out, so we'll crash if the user does
%% anything stupid on purpose.
{shutdown, protocol_violation};

opened_gzip(_Event, _Request, _Data) ->
opened_active(_Event, _Request, _Data) ->
keep_state_and_data.

%%
Expand All @@ -178,8 +208,8 @@ read_1(Data, Buffer, BufferSize, ReadSize) when BufferSize < ReadSize ->
#{ handle := PrivateFd } = Data,
case ?CALL_FD(PrivateFd, read, [?INFLATE_CHUNK_SIZE]) of
{ok, Compressed} ->
#{ zlib := Z } = Data,
Uncompressed = erlang:iolist_to_iovec(zlib:inflate(Z, Compressed)),
#{ inflate := Inflate } = Data,
Uncompressed = erlang:iolist_to_iovec(Inflate(Compressed)),
prim_buffer:write(Buffer, Uncompressed),
read_1(Data, Buffer, prim_buffer:size(Buffer), ReadSize);
eof when BufferSize > 0 ->
Expand All @@ -198,10 +228,10 @@ read_line(#{ buffer := Buffer } = Data) ->
end.

read_line_1(Data, Buffer, not_found) ->
#{ handle := PrivateFd, zlib := Z } = Data,
#{ handle := PrivateFd, inflate := Inflate } = Data,
case ?CALL_FD(PrivateFd, read, [?INFLATE_CHUNK_SIZE]) of
{ok, Compressed} ->
Uncompressed = erlang:iolist_to_iovec(zlib:inflate(Z, Compressed)),
Uncompressed = erlang:iolist_to_iovec(Inflate(Compressed)),
prim_buffer:write(Buffer, Uncompressed),
read_line_1(Data, Buffer, prim_buffer:find_byte_index(Buffer, $\n));
eof ->
Expand Down Expand Up @@ -257,10 +287,10 @@ position_1(#{ position := Current } = Data, Desired) when Current < Desired ->
Other -> Other
end;
position_1(#{ position := Current } = Data, Desired) when Current > Desired ->
#{ handle := PrivateFd, buffer := Buffer, zlib := Z } = Data,
#{ handle := PrivateFd, buffer := Buffer, reset := Reset } = Data,
case ?CALL_FD(PrivateFd, position, [bof]) of
{ok, 0} ->
ok = zlib:inflateReset(Z),
ok = Reset(),
prim_buffer:wipe(Buffer),
position_1(Data#{ position => 0 }, Desired);
Other ->
Expand Down
25 changes: 23 additions & 2 deletions lib/kernel/test/file_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@
read_compressed_cooked/1, read_compressed_cooked_binary/1,
read_cooked_tar_problem/1,
write_compressed/1, compress_errors/1, catenated_gzips/1,
compress_async_crash/1]).
compress_async_crash/1,
zstd/1]).

-export([ make_link/1, read_link_info_for_non_link/1, symlinks/1]).

Expand Down Expand Up @@ -171,7 +172,8 @@ groups() ->
[read_compressed_cooked, read_compressed_cooked_binary,
read_cooked_tar_problem, read_not_really_compressed,
write_compressed, compress_errors, catenated_gzips,
compress_async_crash]},
compress_async_crash,
zstd]},
{links, [],
[make_link, read_link_info_for_non_link, symlinks]},
{bench, [],
Expand Down Expand Up @@ -3026,6 +3028,25 @@ compress_async_crash_loop(N, Path, ExpectedData) ->
end,
compress_async_crash_loop(N - 1, Path, ExpectedData).

zstd(Config) when is_list(Config) ->
DataDir = proplists:get_value(data_dir, Config),
Path = filename:join(DataDir, "test.zstd"),
ExpectedData = <<"qwerty">>,

_ = ?FILE_MODULE:delete(Path),
{ok, FdW} = ?FILE_MODULE:open(Path, [write, binary, {zstd, #{}}]),
ok = ?FILE_MODULE:write(FdW, ExpectedData),
ok = ?FILE_MODULE:close(FdW),

{ok, FdR} = ?FILE_MODULE:open(Path, [read, binary, {zstd, #{}}]),
{ok, ExpectedData} = ?FILE_MODULE:read(FdR, 1 bsl 10),
ok = ?FILE_MODULE:close(FdR),

{ok, Compressed} = ?FILE_MODULE:read_file(Path),
ExpectedData = iolist_to_binary(zstd:decompress(Compressed)),

ok.

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

unicode(Config) when is_list(Config) ->
Expand Down
5 changes: 4 additions & 1 deletion lib/stdlib/src/zstd.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ did not create it.
""".
-moduledoc #{ since => "OTP 28.0" }.

-export_type([context/0, dict/0]).
-export_type([context/0,
dict/0,
compress_parameters/0,
decompress_parameters/0]).

-doc """
A compression or decompression context that can be used
Expand Down
Loading