Skip to content

Commit

Permalink
feat(lrdd): Add a lrdd.Row slice pool
Browse files Browse the repository at this point in the history
- Lrmr pooling lrdd.RawRow slice and lrmr users will pool lrdd.Row.Value.
  • Loading branch information
hueypark committed Dec 12, 2022
1 parent 9e2d4c9 commit 58c37db
Show file tree
Hide file tree
Showing 41 changed files with 154 additions and 125 deletions.
4 changes: 2 additions & 2 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (m *Remote) RunAttached(ctx context.Context) (Result, error) {
asyncCtx, asyncCtxCancel := context.WithCancel(ctx)

res := &result{
rowChan: make(chan *lrdd.Row, m.rowChanLen),
rowChan: make(chan lrdd.Row, m.rowChanLen),
jobManager: job.NewLocalManager(m.Job),
cancel: asyncCtxCancel,
}
Expand Down Expand Up @@ -157,7 +157,7 @@ func (m *Remote) RunAttached(ctx context.Context) (Result, error) {
res.addErr(err)
break pushLoop
}
marshalUnmarshalerRow := &lrdd.Row{
marshalUnmarshalerRow := lrdd.Row{
Key: row.Key,
Value: value,
}
Expand Down
6 changes: 3 additions & 3 deletions driver/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@ import (
// Err returns the error that occurred during the run.
// Cancel cancels the run.
type Result interface {
Outputs() <-chan *lrdd.Row
Outputs() <-chan lrdd.Row
Metrics() (lrmrmetric.Metrics, error)
Err() error
Cancel()
}

type result struct {
rowChan chan *lrdd.Row
rowChan chan lrdd.Row
err *multierror.Error
jobManager job.Manager
mux sync.Mutex
cancel context.CancelFunc
}

// Outputs returns the output row channel of the job.
func (r *result) Outputs() <-chan *lrdd.Row {
func (r *result) Outputs() <-chan lrdd.Row {
return r.rowChan
}

Expand Down
2 changes: 1 addition & 1 deletion emit.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ package lrmr

import "github.com/ab180/lrmr/lrdd"

type EmitFunc func(rows []*lrdd.Row)
type EmitFunc func(rows []lrdd.Row)
4 changes: 2 additions & 2 deletions executor/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewCollector(reporter StatusReporter) *Collector {
}
}

func (c *Collector) Write(rows []*lrdd.Row) error {
func (c *Collector) Write(rows []lrdd.Row) error {
return c.reporter.Collect(rows)
}

Expand All @@ -35,7 +35,7 @@ func (collectPartitioner) PlanNext(int) []partitions.Partition {
return partitions.PlanForNumberOf(1)
}

func (collectPartitioner) DeterminePartition(partitions.Context, *lrdd.Row, int) (id string, err error) {
func (collectPartitioner) DeterminePartition(partitions.Context, lrdd.Row, int) (id string, err error) {
return collectPartitionID, nil
}

Expand Down
2 changes: 1 addition & 1 deletion executor/local_pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func (l *LocalPipe) CloseWithStatus(s job.Status) error {
return nil
}

func (l *LocalPipe) Write(rows []*lrdd.Row) error {
func (l *LocalPipe) Write(rows []lrdd.Row) error {
l.nextStageReader.Write(rows)
return nil
}
Expand Down
6 changes: 3 additions & 3 deletions executor/status_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

type StatusReporter interface {
JobContext() context.Context
Collect([]*lrdd.Row) error
Collect([]lrdd.Row) error
ReportTaskSuccess(context.Context, job.TaskID, lrmrmetric.Metrics) error
ReportTaskFailure(context.Context, job.TaskID, error, lrmrmetric.Metrics) error
}
Expand Down Expand Up @@ -61,7 +61,7 @@ func (f *attachedStatusReporter) JobContext() context.Context {
return f.stream.Context()
}

func (f *attachedStatusReporter) Collect(marshalUnmarshalerRows []*lrdd.Row) error {
func (f *attachedStatusReporter) Collect(marshalUnmarshalerRows []lrdd.Row) error {
if len(marshalUnmarshalerRows) == 0 {
return nil
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func (b *detachedStatusReporter) JobContext() context.Context {
return b.jobContext
}

func (b *detachedStatusReporter) Collect(rows []*lrdd.Row) error {
func (b *detachedStatusReporter) Collect(rows []lrdd.Row) error {
panic("collect not supported on detachedStatusReporter")
}

Expand Down
4 changes: 2 additions & 2 deletions executor/task_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func (e *TaskExecutor) Run() {
defer e.reportStatus(ctx)

// pipe input.Reader.C to function input channel
funcInputChan := make(chan []*lrdd.Row, e.Output.NumOutputs())
funcInputChan := make(chan []lrdd.Row, e.Output.NumOutputs())
go pipeAndFlattenInputs(ctx, e.Input.C, funcInputChan)

// hard copy. TODO: a better way to do it!
Expand Down Expand Up @@ -109,7 +109,7 @@ func (e *TaskExecutor) reportStatus(ctx context.Context) {
e.Input = nil
}

func pipeAndFlattenInputs(ctx context.Context, in chan []*lrdd.Row, out chan []*lrdd.Row) {
func pipeAndFlattenInputs(ctx context.Context, in chan []lrdd.Row, out chan []lrdd.Row) {
defer close(out)

for rows := range in {
Expand Down
4 changes: 2 additions & 2 deletions input.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ func (l localInput) FeedInput(out output.Output) error {
if info.IsDir() {
return nil
}
return out.Write([]*lrdd.Row{{Value: lrdd.NewBytes(path)}})
return out.Write([]lrdd.Row{{Value: lrdd.NewBytes(path)}})
})
}

type parallelizedInput struct {
data []*lrdd.Row
data []lrdd.Row
}

func (p parallelizedInput) FeedInput(out output.Output) error {
Expand Down
12 changes: 6 additions & 6 deletions input/push_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,21 @@ func (p *PushStream) Dispatch() error {
return errors.Wrap(err, "stream dispatch")
}

rows := make([]*lrdd.Row, len(req.Data))
// The user should manually return the []lrdd.Row to the pool.
rows := lrdd.GetRows(len(req.Data))
for i, row := range req.Data {
value := lrdd.GetValue(p.reader.RowType())
_, err := value.UnmarshalMsg(row.Value)
if err != nil {
return err
}

rows[i] = &lrdd.Row{
Key: row.Key,
Value: value,
}
(*rows)[i].Key = row.Key
(*rows)[i].Value = value
}

p.reader.Write(rows)
//p.reader.Write(rows)
p.reader.Write(*rows)

// lrdd.RawRow will use in PushDataRequest.UnmarshalVT later
for _, row := range req.Data {
Expand Down
6 changes: 3 additions & 3 deletions input/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type Reader struct {
C chan []*lrdd.Row
C chan []lrdd.Row
rowType lrdd.RowType

activeCnt atomic.Int64
Expand All @@ -15,7 +15,7 @@ type Reader struct {

func NewReader(queueLen int, rowType lrdd.RowType) *Reader {
return &Reader{
C: make(chan []*lrdd.Row, queueLen),
C: make(chan []lrdd.Row, queueLen),
rowType: rowType,
}
}
Expand All @@ -24,7 +24,7 @@ func (p *Reader) Add() {
p.activeCnt.Inc()
}

func (p *Reader) Write(chunk []*lrdd.Row) {
func (p *Reader) Write(chunk []lrdd.Row) {
if p.closed.Load() {
return
}
Expand Down
38 changes: 19 additions & 19 deletions lrdd/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,77 +10,77 @@ import (
const durationSize = unsafe.Sizeof(time.Duration(0)) // #nosec G103

// FromStrings converts string values to a Row slice.
func FromStrings(vals ...string) []*Row {
rows := make([]*Row, len(vals))
func FromStrings(vals ...string) []Row {
rows := make([]Row, len(vals))
for i, val := range vals {
rows[i] = &Row{Value: NewBytes(val)}
rows[i] = Row{Value: NewBytes(val)}
}

return rows
}

// FromStringMap converts a map of string values to a Row slice.
func FromStringMap(vals map[string]string) []*Row {
var rows []*Row
func FromStringMap(vals map[string]string) []Row {
var rows []Row
for key, val := range vals {
rows = append(rows, &Row{Key: key, Value: NewBytes(val)})
rows = append(rows, Row{Key: key, Value: NewBytes(val)})
}

return rows
}

// FromStringSliceMap converts a map of string slices to a Row slice.
func FromStringSliceMap(vals map[string][]string) []*Row {
var rows []*Row
func FromStringSliceMap(vals map[string][]string) []Row {
var rows []Row
for key, slice := range vals {
for _, val := range slice {
rows = append(rows, &Row{Key: key, Value: NewBytes(val)})
rows = append(rows, Row{Key: key, Value: NewBytes(val)})
}
}

return rows
}

// FromInts converts int values to a Row slice.
func FromInts(vals ...int) []*Row {
rows := make([]*Row, len(vals))
func FromInts(vals ...int) []Row {
rows := make([]Row, len(vals))
for i, val := range vals {
intVal := Uint64(val)
rows[i] = &Row{Value: &intVal}
rows[i] = Row{Value: &intVal}
}

return rows
}

// FromIntSliceMap converts a map of int slices to a Row slice.
func FromIntSliceMap(vals map[string][]int) []*Row {
var rows []*Row
func FromIntSliceMap(vals map[string][]int) []Row {
var rows []Row
for key, slice := range vals {
for _, val := range slice {
rows = append(rows, &Row{Key: key, Value: NewBytes(fmt.Sprintf("%d", val))})
rows = append(rows, Row{Key: key, Value: NewBytes(fmt.Sprintf("%d", val))})
}
}

return rows
}

// FromDurations converts duration values to a Row slice.
func FromDurations(vals ...time.Duration) []*Row {
rows := make([]*Row, len(vals))
func FromDurations(vals ...time.Duration) []Row {
rows := make([]Row, len(vals))
for i, val := range vals {
bs := make([]byte, durationSize)
binary.LittleEndian.PutUint64(bs, uint64(val))
lrddBs := Bytes(bs)

rows[i] = &Row{
rows[i] = Row{
Value: &lrddBs,
}
}
return rows
}

// ToDuration converts a Row to a duration.
func ToDuration(row *Row) time.Duration {
func ToDuration(row Row) time.Duration {
bs := row.Value.(*Bytes)

return time.Duration(binary.LittleEndian.Uint64([]byte(*bs)))
Expand Down
41 changes: 35 additions & 6 deletions lrdd/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,47 @@ package lrdd
import sync "sync"

func GetRawRow() *RawRow {
return pool.Get().(*RawRow)
return rawRowPool.Get().(*RawRow)
}

func PutRawRow(row *RawRow) {
value := row.Value[:0]
row.Reset()
row.Value = value
pool.Put(row)

rawRowPool.Put(row)
}

func GetRows(size int) *[]Row {
rows := rowsPool.Get().(*[]Row)
if size <= cap(*rows) {
*rows = (*rows)[:size]
} else {
*rows = make([]Row, size)
}

return rows
}

var pool = sync.Pool{
New: func() any {
return &RawRow{}
},
func PutRows(rows *[]Row) {
for _, row := range *rows {
row.Key = row.Key[:0]
row.Value = nil
}
*rows = (*rows)[:0]

rowsPool.Put(rows)
}

var (
rawRowPool = sync.Pool{
New: func() any {
return &RawRow{}
},
}
rowsPool = sync.Pool{
New: func() any {
return &[]Row{}
},
}
)
2 changes: 1 addition & 1 deletion lrmr.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var (
)

// Parallelize creates new Pipeline with given value as an input.
func Parallelize(data []*lrdd.Row, options ...PipelineOption) *Pipeline {
func Parallelize(data []lrdd.Row, options ...PipelineOption) *Pipeline {
return NewPipeline(&parallelizedInput{data: data}, options...)
}

Expand Down
6 changes: 3 additions & 3 deletions output/buffered_output.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

// BufferedOutput wraps Output with buffering.
type BufferedOutput struct {
buf []*lrdd.Row
buf []lrdd.Row
offset int
output Output
}
Expand All @@ -19,11 +19,11 @@ func NewBufferedOutput(output Output, size int) *BufferedOutput {

return &BufferedOutput{
output: output,
buf: make([]*lrdd.Row, size),
buf: make([]lrdd.Row, size),
}
}

func (b *BufferedOutput) Write(d []*lrdd.Row) error {
func (b *BufferedOutput) Write(d []lrdd.Row) error {
for len(d) > 0 {
writeLen := min(len(d), len(b.buf)-b.offset)
b.offset += copy((b.buf)[b.offset:], d[:writeLen])
Expand Down
6 changes: 3 additions & 3 deletions output/buffered_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestBufferedOutput_Write(t *testing.T) {
})

Convey("When writing items wrapped to the buffer size", func() {
var it []*lrdd.Row
var it []lrdd.Row

So(o.Write(items(bufSize/2)), ShouldBeNil)
it = append(it, items(bufSize/2)...)
Expand Down Expand Up @@ -108,9 +108,9 @@ func TestBufferedOutput_Flush(t *testing.T) {
})
}

func items(length int) (rr []*lrdd.Row) {
func items(length int) (rr []lrdd.Row) {
for i := 0; i < length; i++ {
rr = append(rr, &lrdd.Row{Value: lrdd.NewBytes(strconv.Itoa(i))})
rr = append(rr, lrdd.Row{Value: lrdd.NewBytes(strconv.Itoa(i))})
}
return
}
Loading

0 comments on commit 58c37db

Please sign in to comment.