Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/access/node_builder/access_node_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,7 @@ func (builder *FlowAccessNodeBuilder) extraFlags() {
flags.StringVar(&builder.rpcConf.CompressorName,
"grpc-compressor",
defaultConfig.rpcConf.CompressorName,
"name of grpc compressor that will be used for requests to other nodes. One of (gzip, snappy, deflate)")
"name of grpc compressor that will be used for requests to other nodes. One of (gzip, snappy, deflate, zstd)")
flags.BoolVar(&builder.logTxTimeToFinalized, "log-tx-time-to-finalized", defaultConfig.logTxTimeToFinalized, "log transaction time to finalized")
flags.BoolVar(&builder.logTxTimeToExecuted, "log-tx-time-to-executed", defaultConfig.logTxTimeToExecuted, "log transaction time to executed")
flags.BoolVar(&builder.logTxTimeToFinalizedExecuted,
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/cmd/export-json-execution-state/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package exporter

import (
"bufio"
"compress/gzip"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
"path/filepath"

"github.com/klauspost/compress/gzip"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
Expand Down
2 changes: 1 addition & 1 deletion cmd/util/cmd/generate-authorization-fixes/cmd.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package generate_authorization_fixes

import (
"compress/gzip"
"encoding/json"
"io"
"os"
"strings"
"sync"

"github.com/klauspost/compress/gzip"
"github.com/onflow/cadence/common"
"github.com/onflow/cadence/interpreter"
"github.com/onflow/cadence/sema"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ import (
"github.com/stretchr/testify/assert"
testifymock "github.com/stretchr/testify/mock"

"google.golang.org/grpc/encoding/gzip"

"github.com/onflow/flow-go/engine/common/grpc/compressor/deflate"
"github.com/onflow/flow-go/engine/common/grpc/compressor/gzip"
"github.com/onflow/flow-go/engine/common/grpc/compressor/snappy"
"github.com/onflow/flow-go/engine/common/grpc/compressor/zstd"
"github.com/onflow/flow-go/engine/common/rpc/convert"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module/metrics"
Expand All @@ -34,6 +34,11 @@ func BenchmarkWithDeflateCompression(b *testing.B) {
runBenchmark(b, deflate.Name)
}

// BenchmarkWithDeflateCompression benchmarks the gRPC request to execution nodes using deflate compressor.
func BenchmarkWithZSTDCompression(b *testing.B) {
runBenchmark(b, zstd.Name)
}

// runBenchmark is a helper function that performs the benchmarking for different compressors.
func runBenchmark(b *testing.B, compressorName string) {
// create an execution node
Expand Down
3 changes: 2 additions & 1 deletion engine/access/rpc/connection/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,13 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
_ "google.golang.org/grpc/encoding/gzip" //required for gRPC compression
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"

_ "github.com/onflow/flow-go/engine/common/grpc/compressor/deflate" //required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/gzip" //required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/snappy" //required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/zstd" //required for gRPC compression
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/utils/grpcutils"
)
Expand Down
3 changes: 2 additions & 1 deletion engine/collection/rpc/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip" // required for gRPC compression
"google.golang.org/grpc/status"

_ "github.com/onflow/flow-go/engine/common/grpc/compressor/deflate" // required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/gzip" // required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/snappy" // required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/zstd" // required for gRPC compression

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/common/rpc/convert"
Expand Down
2 changes: 1 addition & 1 deletion engine/common/grpc/compressor/deflate/deflate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
package deflate

import (
"compress/flate"
"io"
"sync"

"github.com/klauspost/compress/flate"
"google.golang.org/grpc/encoding"
)

Expand Down
107 changes: 107 additions & 0 deletions engine/common/grpc/compressor/gzip/gzip.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package gzip

import (
"encoding/binary"
"fmt"
"io"
"sync"

"github.com/klauspost/compress/gzip"
"google.golang.org/grpc/encoding"
)

// Name is the name registered for the gzip compressor.
const Name = "gzip"

func init() {
c := &compressor{}
c.poolCompressor.New = func() any {
return &writer{Writer: gzip.NewWriter(io.Discard), pool: &c.poolCompressor}
}
encoding.RegisterCompressor(c)
}

type writer struct {
*gzip.Writer
pool *sync.Pool
}

// SetLevel updates the registered gzip compressor to use the compression level specified (gzip.HuffmanOnly is not supported).
// NOTE: this function must only be called during initialization time (i.e. in an init() function),
// and is not thread-safe.
//
// The error returned will be nil if the specified level is valid.
func SetLevel(level int) error {
if level < gzip.DefaultCompression || level > gzip.BestCompression {
return fmt.Errorf("grpc: invalid gzip compression level: %d", level)
}
c := encoding.GetCompressor(Name).(*compressor)
c.poolCompressor.New = func() any {
w, err := gzip.NewWriterLevel(io.Discard, level)
if err != nil {
panic(err)
}
return &writer{Writer: w, pool: &c.poolCompressor}
}
return nil
}

func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
z := c.poolCompressor.Get().(*writer)
z.Writer.Reset(w)
return z, nil
}

func (z *writer) Close() error {
defer z.pool.Put(z)
return z.Writer.Close()
}

type reader struct {
*gzip.Reader
pool *sync.Pool
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
z, inPool := c.poolDecompressor.Get().(*reader)
if !inPool {
newZ, err := gzip.NewReader(r)
if err != nil {
return nil, err
}
return &reader{Reader: newZ, pool: &c.poolDecompressor}, nil
}
if err := z.Reset(r); err != nil {
c.poolDecompressor.Put(z)
return nil, err
}
return z, nil
}

func (z *reader) Read(p []byte) (n int, err error) {
n, err = z.Reader.Read(p)
if err == io.EOF {
z.pool.Put(z)
}
return n, err
}

// RFC1952 specifies that the last four bytes "contains the size of
// the original (uncompressed) input data modulo 2^32."
// gRPC has a max message size of 2GB so we don't need to worry about wraparound.
func (c *compressor) DecompressedSize(buf []byte) int {
last := len(buf)
if last < 4 {
return -1
}
return int(binary.LittleEndian.Uint32(buf[last-4 : last]))
}

func (c *compressor) Name() string {
return Name
}

type compressor struct {
poolCompressor sync.Pool
poolDecompressor sync.Pool
}
2 changes: 1 addition & 1 deletion engine/common/grpc/compressor/snappy/snappy.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"io"
"sync"

"github.com/golang/snappy"
"github.com/klauspost/compress/snappy"
"google.golang.org/grpc/encoding"
)

Expand Down
89 changes: 89 additions & 0 deletions engine/common/grpc/compressor/zstd/zstd.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package zstd

import (
"io"
"sync"

"github.com/klauspost/compress/zstd"
"google.golang.org/grpc/encoding"
)

// Name is the name registered for the zstd compressor.
const Name = "zstd"

func init() {
c := &compressor{}
c.poolCompressor.New = func() any {
newWriter, _ := zstd.NewWriter(io.Discard)
return &writer{Encoder: newWriter, pool: &c.poolCompressor}
}
encoding.RegisterCompressor(c)
}

type writer struct {
*zstd.Encoder
pool *sync.Pool
}

// SetLevel updates the registered zstd compressor to use the compression level specified.
// NOTE: this function must only be called during initialization time (i.e. in an init() function),
// and is not thread-safe.
func SetLevel(level zstd.EncoderLevel) {
c := encoding.GetCompressor(Name).(*compressor)
c.poolCompressor.New = func() any {
w, err := zstd.NewWriter(io.Discard, zstd.WithEncoderLevel(level))
if err != nil {
panic(err)
}
return &writer{Encoder: w, pool: &c.poolCompressor}
}
}

func (c *compressor) Compress(w io.Writer) (io.WriteCloser, error) {
z := c.poolCompressor.Get().(*writer)
z.Encoder.Reset(w)
return z, nil
}

func (z *writer) Close() error {
defer z.pool.Put(z)
return z.Encoder.Close()
}

type reader struct {
*zstd.Decoder
pool *sync.Pool
}

func (c *compressor) Decompress(r io.Reader) (io.Reader, error) {
z, inPool := c.poolDecompressor.Get().(*reader)
if !inPool {
newZ, err := zstd.NewReader(r)
if err != nil {
return nil, err
}
return &reader{Decoder: newZ, pool: &c.poolDecompressor}, nil
}
if err := z.Reset(r); err != nil {
c.poolDecompressor.Put(z)
return nil, err
}
return z, nil
}

func (z *reader) Read(p []byte) (n int, err error) {
n, err = z.Decoder.Read(p)
if err == io.EOF {
z.pool.Put(z)
}
return n, err
}

func (c *compressor) Name() string {
return Name
}

type compressor struct {
poolCompressor sync.Pool
poolDecompressor sync.Pool
}
3 changes: 2 additions & 1 deletion engine/execution/rpc/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
"github.com/rs/zerolog"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
_ "google.golang.org/grpc/encoding/gzip" // required for gRPC compression
"google.golang.org/grpc/status"

_ "github.com/onflow/flow-go/engine/common/grpc/compressor/deflate" // required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/gzip" // required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/snappy" // required for gRPC compression
_ "github.com/onflow/flow-go/engine/common/grpc/compressor/zstd" // required for gRPC compression

"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/engine"
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ require (
golang.org/x/crypto v0.41.0
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
golang.org/x/sync v0.16.0
golang.org/x/sys v0.35.0
golang.org/x/sys v0.36.0
golang.org/x/text v0.28.0
golang.org/x/time v0.12.0
golang.org/x/tools v0.36.0
Expand All @@ -96,7 +96,6 @@ require (
github.com/docker/go-units v0.5.0
github.com/dustin/go-humanize v1.0.1
github.com/go-playground/validator/v10 v10.19.0
github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e
github.com/gorilla/websocket v1.5.3
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/holiman/uint256 v1.3.2
Expand All @@ -123,6 +122,7 @@ require (
github.com/emicklei/dot v1.6.2 // indirect
github.com/ethereum/c-kzg-4844/v2 v2.1.0 // indirect
github.com/ferranbt/fastssz v0.1.4 // indirect
github.com/golang/snappy v0.0.5-0.20231225225746-43d5d4cd4e0e // indirect
)

require (
Expand Down Expand Up @@ -231,8 +231,8 @@ require (
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/k0kubun/pp/v3 v3.5.0 // indirect
github.com/kevinburke/go-bindata v3.24.0+incompatible // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/klauspost/cpuid/v2 v2.2.9 // indirect
github.com/klauspost/compress v1.18.0
github.com/klauspost/cpuid/v2 v2.3.0 // indirect
github.com/koron/go-ssdp v0.0.4 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -744,10 +744,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/compress v1.10.3/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.11.7/go.mod h1:aoV0uJVorq1K+umq18yTdKaF57EivdYsUV+/s2qKfXs=
github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/cpuid/v2 v2.2.9 h1:66ze0taIn2H33fBvCkXuv9BmCwDfafmiIVpKV9kKGuY=
github.com/klauspost/cpuid/v2 v2.2.9/go.mod h1:rqkxqrZ1EhYM9G+hXH7YdowN5R5RGN6NK4QwQ3WMXF8=
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y=
github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/koron/go-ssdp v0.0.4 h1:1IDwrghSKYM7yLf7XCzbByg2sJ/JcNOZRXS2jczTwz0=
Expand Down Expand Up @@ -1673,8 +1673,8 @@ golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=
golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k=
golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
Expand Down
Loading