From 9a98058926007e005f7b42daee6197107cd0fbbf Mon Sep 17 00:00:00 2001 From: Joshua Foster Date: Mon, 15 Sep 2025 12:20:52 -0400 Subject: [PATCH] update to use klauspost compression add zstd support --- .../node_builder/access_node_builder.go | 2 +- .../cmd/export-json-execution-state/cmd.go | 2 +- .../cmd/generate-authorization-fixes/cmd.go | 2 +- .../grpc_compression_benchmark_test.go | 9 +- engine/access/rpc/connection/manager.go | 3 +- engine/collection/rpc/engine.go | 3 +- .../common/grpc/compressor/deflate/deflate.go | 2 +- engine/common/grpc/compressor/gzip/gzip.go | 107 ++++++++++++++++++ .../common/grpc/compressor/snappy/snappy.go | 2 +- engine/common/grpc/compressor/zstd/zstd.go | 89 +++++++++++++++ engine/execution/rpc/engine.go | 3 +- go.mod | 8 +- go.sum | 12 +- module/grpcserver/server.go | 3 +- network/compressor/gzipCompressor.go | 2 +- 15 files changed, 227 insertions(+), 22 deletions(-) create mode 100644 engine/common/grpc/compressor/gzip/gzip.go create mode 100644 engine/common/grpc/compressor/zstd/zstd.go diff --git a/cmd/access/node_builder/access_node_builder.go b/cmd/access/node_builder/access_node_builder.go index 1a4c6d3743e..fbb78f5f190 100644 --- a/cmd/access/node_builder/access_node_builder.go +++ b/cmd/access/node_builder/access_node_builder.go @@ -1273,7 +1273,7 @@ func (builder *FlowAccessNodeBuilder) extraFlags() { flags.StringVar(&builder.rpcConf.CompressorName, "grpc-compressor", defaultConfig.rpcConf.CompressorName, - "name of grpc compressor that will be used for requests to other nodes. One of (gzip, snappy, deflate)") + "name of grpc compressor that will be used for requests to other nodes. One of (gzip, snappy, deflate, zstd)") flags.BoolVar(&builder.logTxTimeToFinalized, "log-tx-time-to-finalized", defaultConfig.logTxTimeToFinalized, "log transaction time to finalized") flags.BoolVar(&builder.logTxTimeToExecuted, "log-tx-time-to-executed", defaultConfig.logTxTimeToExecuted, "log transaction time to executed") flags.BoolVar(&builder.logTxTimeToFinalizedExecuted, diff --git a/cmd/util/cmd/export-json-execution-state/cmd.go b/cmd/util/cmd/export-json-execution-state/cmd.go index f06a38c628a..5051d073dfb 100644 --- a/cmd/util/cmd/export-json-execution-state/cmd.go +++ b/cmd/util/cmd/export-json-execution-state/cmd.go @@ -2,7 +2,6 @@ package exporter import ( "bufio" - "compress/gzip" "encoding/hex" "errors" "fmt" @@ -10,6 +9,7 @@ import ( "os" "path/filepath" + "github.com/klauspost/compress/gzip" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" diff --git a/cmd/util/cmd/generate-authorization-fixes/cmd.go b/cmd/util/cmd/generate-authorization-fixes/cmd.go index a554a396445..d26d741dccb 100644 --- a/cmd/util/cmd/generate-authorization-fixes/cmd.go +++ b/cmd/util/cmd/generate-authorization-fixes/cmd.go @@ -1,13 +1,13 @@ package generate_authorization_fixes import ( - "compress/gzip" "encoding/json" "io" "os" "strings" "sync" + "github.com/klauspost/compress/gzip" "github.com/onflow/cadence/common" "github.com/onflow/cadence/interpreter" "github.com/onflow/cadence/sema" diff --git a/engine/access/rpc/connection/grpc_compression_benchmark_test.go b/engine/access/rpc/connection/grpc_compression_benchmark_test.go index 4fed6759681..32fcb6b7a45 100644 --- a/engine/access/rpc/connection/grpc_compression_benchmark_test.go +++ b/engine/access/rpc/connection/grpc_compression_benchmark_test.go @@ -7,10 +7,10 @@ import ( "github.com/stretchr/testify/assert" testifymock "github.com/stretchr/testify/mock" - "google.golang.org/grpc/encoding/gzip" - "github.com/onflow/flow-go/engine/common/grpc/compressor/deflate" + "github.com/onflow/flow-go/engine/common/grpc/compressor/gzip" "github.com/onflow/flow-go/engine/common/grpc/compressor/snappy" + "github.com/onflow/flow-go/engine/common/grpc/compressor/zstd" "github.com/onflow/flow-go/engine/common/rpc/convert" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/metrics" @@ -34,6 +34,11 @@ func BenchmarkWithDeflateCompression(b *testing.B) { runBenchmark(b, deflate.Name) } +// BenchmarkWithDeflateCompression benchmarks the gRPC request to execution nodes using deflate compressor. +func BenchmarkWithZSTDCompression(b *testing.B) { + runBenchmark(b, zstd.Name) +} + // runBenchmark is a helper function that performs the benchmarking for different compressors. func runBenchmark(b *testing.B, compressorName string) { // create an execution node diff --git a/engine/access/rpc/connection/manager.go b/engine/access/rpc/connection/manager.go index 00a92ccdc6c..322aae80c1c 100644 --- a/engine/access/rpc/connection/manager.go +++ b/engine/access/rpc/connection/manager.go @@ -13,12 +13,13 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" - _ "google.golang.org/grpc/encoding/gzip" //required for gRPC compression "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" _ "github.com/onflow/flow-go/engine/common/grpc/compressor/deflate" //required for gRPC compression + _ "github.com/onflow/flow-go/engine/common/grpc/compressor/gzip" //required for gRPC compression _ "github.com/onflow/flow-go/engine/common/grpc/compressor/snappy" //required for gRPC compression + _ "github.com/onflow/flow-go/engine/common/grpc/compressor/zstd" //required for gRPC compression "github.com/onflow/flow-go/module" "github.com/onflow/flow-go/utils/grpcutils" ) diff --git a/engine/collection/rpc/engine.go b/engine/collection/rpc/engine.go index c1b1f76e9d5..1abb5e57908 100644 --- a/engine/collection/rpc/engine.go +++ b/engine/collection/rpc/engine.go @@ -12,11 +12,12 @@ import ( "github.com/rs/zerolog" "google.golang.org/grpc" "google.golang.org/grpc/codes" - _ "google.golang.org/grpc/encoding/gzip" // required for gRPC compression "google.golang.org/grpc/status" _ "github.com/onflow/flow-go/engine/common/grpc/compressor/deflate" // required for gRPC compression + _ "github.com/onflow/flow-go/engine/common/grpc/compressor/gzip" // required for gRPC compression _ "github.com/onflow/flow-go/engine/common/grpc/compressor/snappy" // required for gRPC compression + _ "github.com/onflow/flow-go/engine/common/grpc/compressor/zstd" // required for gRPC compression "github.com/onflow/flow-go/engine" "github.com/onflow/flow-go/engine/common/rpc/convert" diff --git a/engine/common/grpc/compressor/deflate/deflate.go b/engine/common/grpc/compressor/deflate/deflate.go index 7bbba76f506..cdc23a9528c 100644 --- a/engine/common/grpc/compressor/deflate/deflate.go +++ b/engine/common/grpc/compressor/deflate/deflate.go @@ -3,10 +3,10 @@ package deflate import ( - "compress/flate" "io" "sync" + "github.com/klauspost/compress/flate" "google.golang.org/grpc/encoding" ) diff --git a/engine/common/grpc/compressor/gzip/gzip.go b/engine/common/grpc/compressor/gzip/gzip.go new file mode 100644 index 00000000000..0b026c4f3b5 --- /dev/null +++ b/engine/common/grpc/compressor/gzip/gzip.go @@ -0,0 +1,107 @@ +package gzip + +import ( + "encoding/binary" + "fmt" + "io" + "sync" + + "github.com/klauspost/compress/gzip" + "google.golang.org/grpc/encoding" +) + +// Name is the name registered for the gzip compressor. +const Name = "gzip" + +func init() { + c := &compressor{} + c.poolCompressor.New = func() any { + return &writer{Writer: gzip.NewWriter(io.Discard), pool: &c.poolCompressor} + } + encoding.RegisterCompressor(c) +} + +type writer struct { + *gzip.Writer + pool *sync.Pool +} + +// SetLevel updates the registered gzip compressor to use the compression level specified (gzip.HuffmanOnly is not supported). +// NOTE: this function must only be called during initialization time (i.e. in an init() function), +// and is not thread-safe. +// +// The error returned will be nil if the specified level is valid. +func SetLevel(level int) error { + if level < gzip.DefaultCompression || level > gzip.BestCompression { + return fmt.Errorf("grpc: invalid gzip compression level: %d", level) + } + c := encoding.GetCompressor(Name).(*compressor) + c.poolCompressor.New = func() any { + w, err := gzip.NewWriterLevel(io.Discard, level) + if err != nil { + panic(err) + } + return &writer{Writer: w, pool: &c.poolCompressor} + } + return nil +} + +func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { + z := c.poolCompressor.Get().(*writer) + z.Writer.Reset(w) + return z, nil +} + +func (z *writer) Close() error { + defer z.pool.Put(z) + return z.Writer.Close() +} + +type reader struct { + *gzip.Reader + pool *sync.Pool +} + +func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { + z, inPool := c.poolDecompressor.Get().(*reader) + if !inPool { + newZ, err := gzip.NewReader(r) + if err != nil { + return nil, err + } + return &reader{Reader: newZ, pool: &c.poolDecompressor}, nil + } + if err := z.Reset(r); err != nil { + c.poolDecompressor.Put(z) + return nil, err + } + return z, nil +} + +func (z *reader) Read(p []byte) (n int, err error) { + n, err = z.Reader.Read(p) + if err == io.EOF { + z.pool.Put(z) + } + return n, err +} + +// RFC1952 specifies that the last four bytes "contains the size of +// the original (uncompressed) input data modulo 2^32." +// gRPC has a max message size of 2GB so we don't need to worry about wraparound. +func (c *compressor) DecompressedSize(buf []byte) int { + last := len(buf) + if last < 4 { + return -1 + } + return int(binary.LittleEndian.Uint32(buf[last-4 : last])) +} + +func (c *compressor) Name() string { + return Name +} + +type compressor struct { + poolCompressor sync.Pool + poolDecompressor sync.Pool +} diff --git a/engine/common/grpc/compressor/snappy/snappy.go b/engine/common/grpc/compressor/snappy/snappy.go index cb7dec75853..21da6cfd564 100644 --- a/engine/common/grpc/compressor/snappy/snappy.go +++ b/engine/common/grpc/compressor/snappy/snappy.go @@ -4,7 +4,7 @@ import ( "io" "sync" - "github.com/golang/snappy" + "github.com/klauspost/compress/snappy" "google.golang.org/grpc/encoding" ) diff --git a/engine/common/grpc/compressor/zstd/zstd.go b/engine/common/grpc/compressor/zstd/zstd.go new file mode 100644 index 00000000000..727af792df9 --- /dev/null +++ b/engine/common/grpc/compressor/zstd/zstd.go @@ -0,0 +1,89 @@ +package zstd + +import ( + "io" + "sync" + + "github.com/klauspost/compress/zstd" + "google.golang.org/grpc/encoding" +) + +// Name is the name registered for the zstd compressor. +const Name = "zstd" + +func init() { + c := &compressor{} + c.poolCompressor.New = func() any { + newWriter, _ := zstd.NewWriter(io.Discard) + return &writer{Encoder: newWriter, pool: &c.poolCompressor} + } + encoding.RegisterCompressor(c) +} + +type writer struct { + *zstd.Encoder + pool *sync.Pool +} + +// SetLevel updates the registered zstd compressor to use the compression level specified. +// NOTE: this function must only be called during initialization time (i.e. in an init() function), +// and is not thread-safe. +func SetLevel(level zstd.EncoderLevel) { + c := encoding.GetCompressor(Name).(*compressor) + c.poolCompressor.New = func() any { + w, err := zstd.NewWriter(io.Discard, zstd.WithEncoderLevel(level)) + if err != nil { + panic(err) + } + return &writer{Encoder: w, pool: &c.poolCompressor} + } +} + +func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) { + z := c.poolCompressor.Get().(*writer) + z.Encoder.Reset(w) + return z, nil +} + +func (z *writer) Close() error { + defer z.pool.Put(z) + return z.Encoder.Close() +} + +type reader struct { + *zstd.Decoder + pool *sync.Pool +} + +func (c *compressor) Decompress(r io.Reader) (io.Reader, error) { + z, inPool := c.poolDecompressor.Get().(*reader) + if !inPool { + newZ, err := zstd.NewReader(r) + if err != nil { + return nil, err + } + return &reader{Decoder: newZ, pool: &c.poolDecompressor}, nil + } + if err := z.Reset(r); err != nil { + c.poolDecompressor.Put(z) + return nil, err + } + return z, nil +} + +func (z *reader) Read(p []byte) (n int, err error) { + n, err = z.Decoder.Read(p) + if err == io.EOF { + z.pool.Put(z) + } + return n, err +} + +func (c *compressor) Name() string { + return Name +} + +type compressor struct { + poolCompressor sync.Pool + poolDecompressor sync.Pool +} diff --git a/engine/execution/rpc/engine.go b/engine/execution/rpc/engine.go index d32d093c72d..ece214fe0e4 100644 --- a/engine/execution/rpc/engine.go +++ b/engine/execution/rpc/engine.go @@ -17,11 +17,12 @@ import ( "github.com/rs/zerolog" "google.golang.org/grpc" "google.golang.org/grpc/codes" - _ "google.golang.org/grpc/encoding/gzip" // required for gRPC compression "google.golang.org/grpc/status" _ "github.com/onflow/flow-go/engine/common/grpc/compressor/deflate" // required for gRPC compression + _ "github.com/onflow/flow-go/engine/common/grpc/compressor/gzip" // required for gRPC compression _ "github.com/onflow/flow-go/engine/common/grpc/compressor/snappy" // required for gRPC compression + _ "github.com/onflow/flow-go/engine/common/grpc/compressor/zstd" // required for gRPC compression "github.com/onflow/flow-go/consensus/hotstuff" "github.com/onflow/flow-go/engine" diff --git a/go.mod b/go.mod index 55ea967de04..5eed077e2aa 100644 --- a/go.mod +++ b/go.mod @@ -78,7 +78,7 @@ require ( golang.org/x/crypto v0.41.0 golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 golang.org/x/sync v0.16.0 - golang.org/x/sys v0.35.0 + golang.org/x/sys v0.36.0 golang.org/x/text v0.28.0 golang.org/x/time v0.12.0 golang.org/x/tools v0.36.0 @@ -96,7 +96,6 @@ require ( github.com/docker/go-units v0.5.0 github.com/dustin/go-humanize v1.0.1 github.com/go-playground/validator/v10 v10.19.0 - github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e github.com/gorilla/websocket v1.5.3 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/holiman/uint256 v1.3.2 @@ -123,6 +122,7 @@ require ( github.com/emicklei/dot v1.6.2 // indirect github.com/ethereum/c-kzg-4844/v2 v2.1.0 // indirect github.com/ferranbt/fastssz v0.1.4 // indirect + github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e // indirect ) require ( @@ -231,8 +231,8 @@ require ( github.com/jbenet/goprocess v0.1.4 // indirect github.com/k0kubun/pp/v3 v3.5.0 // indirect github.com/kevinburke/go-bindata v3.24.0+incompatible // indirect - github.com/klauspost/compress v1.17.11 // indirect - github.com/klauspost/cpuid/v2 v2.2.9 // indirect + github.com/klauspost/compress v1.18.0 + github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/koron/go-ssdp v0.0.4 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect diff --git a/go.sum b/go.sum index cd1d866b81e..4ebc9b80992 100644 --- a/go.sum +++ b/go.sum @@ -744,10 +744,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs= github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= -github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= -github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= -github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY= -github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/koron/go-ssdp v0.0.4 h1:1IDwrghSKYM7yLf7XCzbByg2sJ/JcNOZRXS2jczTwz0= @@ -1673,8 +1673,8 @@ golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= -golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= diff --git a/module/grpcserver/server.go b/module/grpcserver/server.go index 4cd2ada4db9..fbf8d5d1224 100644 --- a/module/grpcserver/server.go +++ b/module/grpcserver/server.go @@ -9,10 +9,11 @@ import ( "github.com/rs/zerolog" "google.golang.org/grpc" - _ "google.golang.org/grpc/encoding/gzip" // required for gRPC compression _ "github.com/onflow/flow-go/engine/common/grpc/compressor/deflate" // required for gRPC compression + _ "github.com/onflow/flow-go/engine/common/grpc/compressor/gzip" // required for gRPC compression _ "github.com/onflow/flow-go/engine/common/grpc/compressor/snappy" // required for gRPC compression + _ "github.com/onflow/flow-go/engine/common/grpc/compressor/zstd" // required for gRPC compression "github.com/onflow/flow-go/module/component" "github.com/onflow/flow-go/module/irrecoverable" diff --git a/network/compressor/gzipCompressor.go b/network/compressor/gzipCompressor.go index dcafef92269..623daaf2696 100644 --- a/network/compressor/gzipCompressor.go +++ b/network/compressor/gzipCompressor.go @@ -1,9 +1,9 @@ package compressor import ( - "compress/gzip" "io" + "github.com/klauspost/compress/gzip" "github.com/onflow/flow-go/network" )