Skip to content

Code for review #3

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
124 changes: 59 additions & 65 deletions 11_lesson_live_coding_practise/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,94 +2,88 @@ package main

import (
"errors"
"fmt"
"sync"
"time"
)

type Batcher struct {
mutex sync.Mutex
messages []string
items []string
size int
timeout time.Duration
mutex sync.Mutex
cond *sync.Cond
counter int
}

size int
action func([]string)
messagesCh chan []string
func NewBatcher(size int, timeout time.Duration) (*Batcher, error) {
if size <= 0 {
return nil, errors.New("invalid argument")
}

closeCh chan struct{}
closeDoneCh chan struct{}
bt := &Batcher{
items: make([]string, 0, size),
size: size,
timeout: timeout,
}

bt.cond = sync.NewCond(&bt.mutex)
go bt.runBatcher()
return bt, nil
}

func NewBatcher(action func([]string), size int) (*Batcher, error) {
if action == nil || size <= 0 {
return nil, errors.New("invalid arguments")
}
func (b *Batcher) runBatcher() {
ticker := time.NewTicker(b.timeout)
defer ticker.Stop()

return &Batcher{
action: action,
size: size,
messagesCh: make(chan []string, 1),
closeCh: make(chan struct{}),
closeDoneCh: make(chan struct{}),
}, nil
for {
b.mutex.Lock()
for len(b.items) < b.size {
b.cond.Wait()
}

batch := b.items[:b.size]
b.items = b.items[b.size:]

<-ticker.C
b.counter++
fmt.Printf("Batch %d: %s\n", b.counter, batch)
b.mutex.Unlock()
}
}

// Append add message to batch
func (b *Batcher) Append(message string) {
func (b *Batcher) append(item string) {
b.mutex.Lock()
defer b.mutex.Unlock()

b.messages = append(b.messages, message)
if b.size == len(b.messages) {
b.messagesCh <- b.messages
b.messages = nil
b.items = append(b.items, item)
if len(b.items) >= b.size {
b.cond.Signal()
}
}

// Run start worker for periodic flushing
func (b *Batcher) Run(interval time.Duration) {
go func() {
ticker := time.NewTicker(interval)
defer func() {
ticker.Stop()
close(b.closeDoneCh)
}()

for {
select {
case <-b.closeCh:
b.flush()
default:
}

select {
case <-b.closeCh:
b.flush()
case messages := <-b.messagesCh:
b.flushMessages(messages)
ticker.Reset(interval)
case <-ticker.C:
b.flush()
}
}
}()
}

func (b *Batcher) flush() {
b.mutex.Lock()
messages := b.messages
b.messages = nil
b.mutex.Unlock()
defer b.mutex.Unlock()

b.flushMessages(messages)
if len(b.items) > 0 {
batch := b.items
b.items = nil
b.counter++
fmt.Printf("Batch %d: %s\n", b.counter, batch)
}
}

func (b *Batcher) flushMessages(messages []string) {
if len(messages) != 0 {
b.action(messages)
func main() {
batcher, err := NewBatcher(4, 2*time.Second)
if err != nil {
fmt.Printf("Error: %v", err)
}

for i := 1; i <= 10; i++ {
batcher.append(fmt.Sprintf("Item %d", i))
time.Sleep(100 * time.Millisecond)
}
}

// Shutdown wait worker and flush buffer before closing
func (b *Batcher) Shutdown() {
close(b.closeCh)
<-b.closeDoneCh
batcher.flush()
}
Loading