Skip to content

Commit 326a663

Browse files
Added zstd support for msgs
Signed-off-by: Athish Pranav D <[email protected]>
1 parent 7f13c7a commit 326a663

File tree

5 files changed

+80
-28
lines changed

5 files changed

+80
-28
lines changed

fluentd.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ Gem::Specification.new do |gem|
3333
gem.add_runtime_dependency("tzinfo-data", ["~> 1.0"])
3434
gem.add_runtime_dependency("strptime", [">= 0.2.4", "< 1.0.0"])
3535
gem.add_runtime_dependency("webrick", ["~> 1.4"])
36+
gem.add_runtime_dependency("zstd-ruby", ["~> 1.5"])
3637

3738
# gems that aren't default gems as of Ruby 3.4
3839
gem.add_runtime_dependency("base64", ["~> 0.2"])

lib/fluent/event.rb

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,11 @@ def to_msgpack_stream(time_int: false, packer: nil)
268268
end
269269

270270
class CompressedMessagePackEventStream < MessagePackEventStream
271-
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil)
272-
super
271+
def initialize(data, cached_unpacker = nil, size = 0, unpacked_times: nil, unpacked_records: nil, compress: :gzip)
272+
super(data, cached_unpacker, size, unpacked_times: unpacked_times, unpacked_records: unpacked_records)
273273
@decompressed_data = nil
274274
@compressed_data = data
275+
@type = compress
275276
end
276277

277278
def empty?
@@ -303,7 +304,7 @@ def to_compressed_msgpack_stream(time_int: false, packer: nil)
303304

304305
def ensure_decompressed!
305306
return if @decompressed_data
306-
@data = @decompressed_data = decompress(@data)
307+
@data = @decompressed_data = decompress(@data, type: @type)
307308
end
308309
end
309310

lib/fluent/plugin/compressable.rb

Lines changed: 68 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,81 +16,127 @@
1616

1717
require 'stringio'
1818
require 'zlib'
19+
require 'zstd-ruby'
1920

2021
module Fluent
2122
module Plugin
2223
module Compressable
23-
def compress(data, **kwargs)
24+
def compress(data, type: :gzip, **kwargs)
2425
output_io = kwargs[:output_io]
2526
io = output_io || StringIO.new
26-
Zlib::GzipWriter.wrap(io) do |gz|
27-
gz.write data
27+
if type == :gzip
28+
writer = Zlib::GzipWriter.new(io)
29+
elsif type == :zstd
30+
writer = Zstd::StreamWriter.new(io)
31+
else
32+
raise ArgumentError, "Unknown compression type: #{type}"
2833
end
29-
34+
writer.write(data)
35+
writer.finish
3036
output_io || io.string
3137
end
3238

3339
# compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)`
3440
# https://www.ruby-forum.com/topic/971591#979503
35-
def decompress(compressed_data = nil, output_io: nil, input_io: nil)
41+
def decompress(compressed_data = nil, output_io: nil, input_io: nil, type: :gzip)
3642
case
3743
when input_io && output_io
38-
io_decompress(input_io, output_io)
44+
io_decompress(input_io, output_io, type)
3945
when input_io
4046
output_io = StringIO.new
41-
io = io_decompress(input_io, output_io)
47+
io = io_decompress(input_io, output_io, type)
4248
io.string
4349
when compressed_data.nil? || compressed_data.empty?
4450
# check compressed_data(String) is 0 length
4551
compressed_data
4652
when output_io
4753
# execute after checking compressed_data is empty or not
4854
io = StringIO.new(compressed_data)
49-
io_decompress(io, output_io)
55+
io_decompress(io, output_io, type)
5056
else
51-
string_decompress(compressed_data)
57+
string_decompress(compressed_data, type)
5258
end
5359
end
5460

5561
private
5662

57-
def string_decompress(compressed_data)
63+
def string_decompress_gzip(compressed_data)
5864
io = StringIO.new(compressed_data)
59-
6065
out = ''
6166
loop do
62-
gz = Zlib::GzipReader.new(io)
63-
out << gz.read
64-
unused = gz.unused
65-
gz.finish
66-
67+
reader = Zlib::GzipReader.new(io)
68+
out << reader.read
69+
unused = reader.unused
70+
reader.finish
6771
unless unused.nil?
6872
adjust = unused.length
6973
io.pos -= adjust
7074
end
7175
break if io.eof?
7276
end
77+
out
78+
end
7379

80+
def string_decompress_zstd(compressed_data)
81+
io = StringIO.new(compressed_data)
82+
out = ''
83+
loop do
84+
reader = Zstd::StreamReader.new(io)
85+
# Zstd::StreamReader needs to specify the size of the buffer
86+
out << reader.read(1024)
87+
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
88+
break if io.eof?
89+
end
7490
out
7591
end
7692

77-
def io_decompress(input, output)
93+
def string_decompress(compressed_data, type = :gzip)
94+
if type == :gzip
95+
string_decompress_gzip(compressed_data)
96+
elsif type == :zstd
97+
string_decompress_zstd(compressed_data)
98+
else
99+
raise ArgumentError, "Unknown compression type: #{type}"
100+
end
101+
end
102+
103+
def io_decompress_gzip(input, output)
78104
loop do
79-
gz = Zlib::GzipReader.new(input)
80-
v = gz.read
105+
reader = Zlib::GzipReader.new(input)
106+
v = reader.read
81107
output.write(v)
82-
unused = gz.unused
83-
gz.finish
84-
108+
unused = reader.unused
109+
reader.finish
85110
unless unused.nil?
86111
adjust = unused.length
87112
input.pos -= adjust
88113
end
89114
break if input.eof?
90115
end
116+
output
117+
end
91118

119+
def io_decompress_zstd(input, output)
120+
loop do
121+
reader = Zstd::StreamReader.new(input)
122+
# Zstd::StreamReader needs to specify the size of the buffer
123+
v = reader.read(1024)
124+
output.write(v)
125+
# Zstd::StreamReader doesn't provide unused data, so we have to manually adjust the position
126+
break if input.eof?
127+
end
92128
output
93129
end
130+
131+
def io_decompress(input, output, type = :gzip)
132+
if type == :gzip
133+
io_decompress_gzip(input, output)
134+
elsif type == :zstd
135+
io_decompress_zstd(input, output)
136+
else
137+
raise ArgumentError, "Unknown compression type: #{type}"
138+
end
139+
end
94140
end
95141
end
96142
end

lib/fluent/plugin/in_forward.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,12 @@ def on_message(msg, chunk_size, conn)
309309
# PackedForward
310310
option = msg[2]
311311
size = (option && option['size']) || 0
312-
es_class = (option && option['compressed'] == 'gzip') ? Fluent::CompressedMessagePackEventStream : Fluent::MessagePackEventStream
313-
es = es_class.new(entries, nil, size.to_i)
312+
313+
if option['compressed'] == 'gzip' && option['compressed'] != 'zstd'
314+
es = Fluent::CompressedMessagePackEventStream.new(entries, nil, size.to_i, compress: option['compressed'].to_sym)
315+
else
316+
es = Fluent::MessagePackEventStream.new(entries, nil, size.to_i)
317+
end
314318
es = check_and_skip_invalid_event(tag, es, conn.remote_host) if @skip_invalid_event
315319
if @enable_field_injection
316320
es = add_source_info(es, conn)

test/plugin/test_in_forward.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -553,7 +553,7 @@ def create_driver(conf=base_config)
553553
chunk = ["tag1", entries, { 'compressed' => 'gzip' }].to_msgpack
554554

555555
# check CompressedMessagePackEventStream is created
556-
mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0)
556+
mock(Fluent::CompressedMessagePackEventStream).new(entries, nil, 0, compress: :gzip)
557557

558558
d.run do
559559
Fluent::MessagePackFactory.msgpack_unpacker.feed_each(chunk) do |obj|

0 commit comments

Comments
 (0)