Skip to content

Commit

Permalink
Use pure ZStd decompression is CGo is disabled (#3)
Browse files Browse the repository at this point in the history
  • Loading branch information
merlimat authored May 15, 2019
1 parent d6a6187 commit f562db9
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 5 deletions.
2 changes: 2 additions & 0 deletions pkg/compression/compression.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package compression

type Provider interface {
CanCompress() bool

Compress(data []byte) []byte

Decompress(compressedData []byte, originalSize int) ([]byte, error)
Expand Down
6 changes: 5 additions & 1 deletion pkg/compression/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ var providers = []testProvider{
func TestCompression(t *testing.T) {
for _, p := range providers {
t.Run(p.name, func(t *testing.T) {
if !p.provider.CanCompress() {
return
}

hello := []byte("test compression data")
compressed := p.provider.Compress(hello)
uncompressed, err := p.provider.Decompress(compressed, len(hello))
Expand All @@ -64,7 +68,7 @@ func TestJavaCompatibility(t *testing.T) {
func TestDecompressionError(t *testing.T) {
for _, p := range providers {
t.Run(p.name, func(t *testing.T) {
_, err := p.provider.Decompress([]byte{0x05}, 0)
_, err := p.provider.Decompress([]byte{0x05}, 10)
assert.NotNil(t, err)
})
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/compression/lz4.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func NewLz4Provider() Provider {
return &lz4Provider{}
}

func (lz4Provider) CanCompress() bool {
return true
}

func (lz4Provider) Compress(data []byte) []byte {
const tableSize = 1 << 16
hashTable := make([]int, tableSize)
Expand Down
4 changes: 4 additions & 0 deletions pkg/compression/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ func NewNoopProvider() Provider {
return &noopProvider{}
}

func (noopProvider) CanCompress() bool {
return true
}

func (noopProvider) Compress(data []byte) []byte {
return data
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/compression/zlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func NewZLibProvider() Provider {
return &zlibProvider{}
}

func (zlibProvider) CanCompress() bool {
return true
}

func (zlibProvider) Compress(data []byte) []byte {
var b bytes.Buffer
w := zlib.NewWriter(&b)
Expand Down
15 changes: 14 additions & 1 deletion pkg/compression/zstd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,31 @@
// under the License.
//

// +build cgo

// If CGO is enabled, use ZSTD library that links with official
// C based zstd. This can perform both compression and decompression.

package compression

import (
zstd "github.com/valyala/gozstd"
)

func NewZStdProvider() Provider {
return newCGoZStdProvider()
}

type zstdProvider struct {}

func NewZStdProvider() Provider {
func newCGoZStdProvider() Provider {
return &zstdProvider{}
}

func (zstdProvider) CanCompress() bool {
return true
}

func (zstdProvider) Compress(data []byte) []byte {
return zstd.Compress(nil, data)
}
Expand Down
66 changes: 66 additions & 0 deletions pkg/compression/zstd_native.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

// +build !cgo

// Pure GO ZStd library only supports decompression

package compression

import (
"bytes"
"github.com/klauspost/compress/zstd"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

// Pure GO ZStd library only supports decompression

type zstdProvider struct {
}

func NewZStdProvider() Provider {
return &zstdProvider{}
}

func (zstdProvider) CanCompress() bool {
return false
}

func (zstdProvider) Compress(data []byte) []byte {
log.Panic("ZSTD compression is not supported if CGo is disabled")
return nil
}

func (zstdProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
d, err := zstd.NewReader(bytes.NewReader(compressedData))
if err != nil {
return nil, err
}

uncompressed := make([]byte, originalSize)
size, err := d.Read(uncompressed)
if err != nil {
return nil, err
} else if size != originalSize {
return nil, errors.New("Invalid uncompressed size")
} else {
return uncompressed, nil
}
}
5 changes: 4 additions & 1 deletion pulsar/impl_partition_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,11 @@ func (p *partitionProducer) grabCnx() error {

p.producerName = res.Response.ProducerSuccess.ProducerName
if p.batchBuilder == nil {
p.batchBuilder = internal.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName,
p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName,
p.producerId, pb.CompressionType(p.options.CompressionType))
if err != nil {
return err
}
}
if p.sequenceIdGenerator == nil {
nextSequenceId := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
Expand Down
10 changes: 8 additions & 2 deletions pulsar/internal/batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package internal

import (
"errors"
"fmt"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
"github.com/apache/pulsar-client-go/pkg/compression"
Expand Down Expand Up @@ -53,7 +55,7 @@ type BatchBuilder struct {
}

func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64,
compressionType pb.CompressionType) *BatchBuilder {
compressionType pb.CompressionType) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
Expand All @@ -78,7 +80,11 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64,
bb.msgMetadata.Compression = &compressionType
}

return bb
if !bb.compressionProvider.CanCompress() {
return nil, errors.New(fmt.Sprintf("Compression provider %v can only decompress data", compressionType))
}

return bb, nil
}

func (bb *BatchBuilder) IsFull() bool {
Expand Down

0 comments on commit f562db9

Please sign in to comment.