Skip to content

Commit b9def4c

Browse files
committed
Initial import
1 parent ebe54c6 commit b9def4c

17 files changed

+982
-0
lines changed

README.md

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
[![Build](https://github.com/elgopher/batch/actions/workflows/build.yml/badge.svg)](https://github.com/elgopher/batch/actions/workflows/build.yml)
2+
[![Go Reference](https://pkg.go.dev/badge/github.com/elgopher/batch.svg)](https://pkg.go.dev/github.com/elgopher/batch)
3+
[![Go Report Card](https://goreportcard.com/badge/github.com/elgopher/batch)](https://goreportcard.com/report/github.com/elgopher/batch)
4+
[![codecov](https://codecov.io/gh/elgopher/batch/branch/master/graph/badge.svg)](https://codecov.io/gh/elgopher/batch)
5+
[![Project Status: Active – The project has reached a stable, usable state and is being actively developed.](https://www.repostatus.org/badges/latest/active.svg)](https://www.repostatus.org/#active)
6+
7+
## What it can be used for?
8+
9+
To speed up application performance without sacrificing *data consistency* or *durability* and making source code or architecture complex.
10+
11+
The **batch** package simplifies writing Go applications that process incoming requests (HTTP, GRPC etc.) in a batch manner:
12+
instead of processing each request separately, group incoming requests to a batch and run whole group at once.
13+
This method of processing can significantly speed up the application and reduce the consumption of disk, network or CPU.
14+
15+
The **batch** package can be used to write any type of *servers* that handle thousands of requests per second.
16+
Thanks to this small library, you can create relatively simple code without the need to use low-level data structures.
17+
18+
## Why batch processing improves performance?
19+
20+
Normally a web application is using following pattern to modify data in the database:
21+
22+
1. **Load resource** from database. Resource is some portion of data
23+
such as record, document etc. Lock the entire resource pessimistically
24+
or optimistically (by reading version number).
25+
2. **Apply change** to data
26+
3. **Save resource** to database. Release the pessimistic lock. Or run
27+
atomic update with version check (optimistic lock).
28+
29+
But such architecture does not scale well if number of requests
30+
for a single resource is very high
31+
(meaning hundreds or thousands of requests per second).
32+
The lock contention in such case is very high and database is significantly
33+
overloaded. Practically, the number of concurrent requests is limited.
34+
35+
One solution to this problem is to reduce the number of costly operations.
36+
Because a single resource is loaded and saved thousands of times per second
37+
we can instead:
38+
39+
1. Load the resource **once** (let's say once per second)
40+
2. Execute all the requests from this period of time on an already loaded resource. Run them all sequentially.
41+
3. Save the resource and send responses to all clients if data was stored successfully.
42+
43+
Such solution could improve the performance by a factor of 1000. And resource is still stored in a consistent state.
44+
45+
The **batch** package does exactly that. You configure the duration of window, provide functions
46+
to load and save resource and once the request comes in - you run a function:
47+
48+
```go
49+
// set up the batch processor:
50+
processor := batch.StartProcessor(
51+
batch.Options[*YourResource]{ // YourResource is your Go struct
52+
MinDuration: 100 * time.Millisecond,
53+
LoadResource: ...,
54+
SaveResource: ...,
55+
},
56+
)
57+
58+
// following code is run from http/grpc handler
59+
// resourceKey uniquely identifies the resource
60+
err := s.BatchProcessor.Run(resourceKey, func(r *YourResource) {
61+
// here is the code which is executed inside batch
62+
})
63+
```
64+
65+
For real-life example see [example web application](_example).
66+
67+
## Installation
68+
69+
```sh
70+
# Add batch to your Go module:
71+
go get github.com/elgopher/batch
72+
```
73+
Please note that at least **Go 1.18** is required.
74+
75+
## Scaling out
76+
77+
Single Go http server is able to handle up to 10-50k of requests per second on a commodity hardware. This is a lot, but very often you also need:
78+
79+
* high availability (if one server goes down you want other to handle the traffic)
80+
* you want to handle hundred thousands or millions of requests per second
81+
82+
For both cases you need to deploy **multiple servers** and put a **load balancer** in front of them.
83+
Please note though, that you have to carefully configure the load balancing algorithm.
84+
Round-robin is not an option here, because sooner or later you will have problems with locking
85+
(multiple server instances will run batches on the same resource).
86+
Ideal solution is to route requests based on parameters or URL.
87+
For example some http parameter could be a resource key. You can instruct load balancer
88+
to calculate hash on this parameter and always route requests with this param value
89+
to the same backend (of course if all backends are still available).

_example/http/http.go

+60
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
// (c) 2022 Jacek Olszak
2+
// This code is licensed under MIT license (see LICENSE for details)
3+
4+
package http
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
"net/http"
10+
"strconv"
11+
12+
"github.com/elgopher/batch/_example/train"
13+
)
14+
15+
type TrainService interface {
16+
Book(train string, seatNumber int, person string) error
17+
}
18+
19+
func ListenAndServe(trainService TrainService) error {
20+
mux := http.NewServeMux()
21+
mux.HandleFunc("/book", bookHandler(trainService))
22+
23+
server := &http.Server{Addr: ":8080", Handler: mux}
24+
return server.ListenAndServe()
25+
}
26+
27+
// example request: /book?train=batchy&person=Jacek&seat=3
28+
func bookHandler(trainService TrainService) func(http.ResponseWriter, *http.Request) {
29+
return func(writer http.ResponseWriter, request *http.Request) {
30+
if err := request.ParseForm(); err != nil {
31+
writer.WriteHeader(http.StatusBadRequest)
32+
return
33+
}
34+
35+
trainKey := request.Form.Get("train")
36+
person := request.Form.Get("person")
37+
seat, err := strconv.Atoi(request.Form.Get("seat"))
38+
if err != nil {
39+
writer.WriteHeader(http.StatusBadRequest)
40+
_, _ = writer.Write([]byte("invalid seat number"))
41+
return
42+
}
43+
44+
err = trainService.Book(trainKey, seat, person)
45+
46+
if errors.Is(err, train.ErrValidation("")) {
47+
writer.WriteHeader(http.StatusBadRequest)
48+
_, _ = writer.Write([]byte(err.Error()))
49+
return
50+
}
51+
52+
if err != nil {
53+
writer.WriteHeader(http.StatusInternalServerError)
54+
fmt.Println("internal server error", err)
55+
return
56+
}
57+
58+
writer.WriteHeader(http.StatusOK)
59+
}
60+
}

_example/main.go

+34
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
// (c) 2022 Jacek Olszak
2+
// This code is licensed under MIT license (see LICENSE for details)
3+
4+
package main
5+
6+
import (
7+
"time"
8+
9+
"github.com/elgopher/batch"
10+
"github.com/elgopher/batch/_example/http"
11+
"github.com/elgopher/batch/_example/store"
12+
"github.com/elgopher/batch/_example/train"
13+
)
14+
15+
func main() {
16+
db := store.File{Dir: "/tmp/"}
17+
18+
processor := batch.StartProcessor(
19+
batch.Options[*train.Train]{
20+
MinDuration: 100 * time.Millisecond,
21+
MaxDuration: 3 * time.Second,
22+
LoadResource: db.LoadTrain,
23+
SaveResource: db.SaveTrain,
24+
},
25+
)
26+
27+
trainService := train.Service{
28+
BatchProcessor: processor,
29+
}
30+
31+
if err := http.ListenAndServe(trainService); err != nil {
32+
panic(err)
33+
}
34+
}

_example/store/file.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// (c) 2022 Jacek Olszak
2+
// This code is licensed under MIT license (see LICENSE for details)
3+
4+
package store
5+
6+
import (
7+
"context"
8+
"encoding/json"
9+
"io/ioutil"
10+
"os"
11+
"path"
12+
13+
"github.com/elgopher/batch/_example/train"
14+
)
15+
16+
// File is database implementation which stores data to a file. Real world app would use real database instead, which supports
17+
// atomic write with some kind of optimistic locking (version-check, compare and swap etc.) to always have data in consistent state.
18+
type File struct {
19+
Dir string
20+
}
21+
22+
func (d File) LoadTrain(_ context.Context, key string) (*train.Train, error) {
23+
jason, err := ioutil.ReadFile(d.filename(key))
24+
if os.IsNotExist(err) {
25+
return train.New(30), nil
26+
}
27+
28+
t := &train.Train{}
29+
err = json.Unmarshal(jason, t)
30+
if err != nil {
31+
return nil, err
32+
}
33+
34+
return t, nil
35+
}
36+
37+
func (d File) filename(key string) string {
38+
return path.Join(d.Dir, key+".json")
39+
}
40+
41+
func (d File) SaveTrain(_ context.Context, key string, t *train.Train) error {
42+
jason, err := json.Marshal(t)
43+
if err != nil {
44+
return err
45+
}
46+
47+
return ioutil.WriteFile(d.filename(key), jason, 0644)
48+
}

_example/train/error.go

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
// (c) 2022 Jacek Olszak
2+
// This code is licensed under MIT license (see LICENSE for details)
3+
4+
package train
5+
6+
type ErrValidation string
7+
8+
func (e ErrValidation) Error() string {
9+
return string(e)
10+
}
11+
12+
func (e ErrValidation) Is(err error) bool {
13+
_, ok := err.(ErrValidation)
14+
return ok
15+
}

_example/train/service.go

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// (c) 2022 Jacek Olszak
2+
// This code is licensed under MIT license (see LICENSE for details)
3+
4+
package train
5+
6+
// BatchProcessor is an optional interface to decouple your code from `batch` package.
7+
type BatchProcessor interface {
8+
Run(key string, operation func(*Train)) error
9+
}
10+
11+
type Service struct {
12+
BatchProcessor BatchProcessor
13+
}
14+
15+
func (s Service) Book(trainName string, seatNumber int, person string) error {
16+
var operationError error
17+
18+
batchError := s.BatchProcessor.Run(trainName, func(train *Train) {
19+
operationError = train.Book(seatNumber, person)
20+
})
21+
22+
if operationError != nil {
23+
return operationError
24+
}
25+
26+
return batchError
27+
}

_example/train/train.go

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
// (c) 2022 Jacek Olszak
2+
// This code is licensed under MIT license (see LICENSE for details)
3+
4+
package train
5+
6+
import (
7+
"fmt"
8+
)
9+
10+
type Train struct {
11+
Seats []string
12+
}
13+
14+
func New(maxSeats int) *Train {
15+
return &Train{
16+
Seats: make([]string, maxSeats),
17+
}
18+
}
19+
20+
func (t *Train) Book(seatNumber int, person string) error {
21+
// first validate if action is possible (`try phase` of a method)
22+
// such validation is needed to always keep Train in a consistent state
23+
// (data consistency is required to properly handle next request in a batch)
24+
if err := t.validateBooking(seatNumber, person); err != nil {
25+
return err
26+
}
27+
28+
// then mutate state (`do phase` of method)
29+
t.Seats[seatNumber] = person
30+
31+
return nil
32+
}
33+
34+
func (t *Train) validateBooking(seatNumber int, person string) error {
35+
if seatNumber < 0 || seatNumber >= len(t.Seats) {
36+
return ErrValidation(fmt.Sprintf("train does not have seat number %d", seatNumber))
37+
}
38+
39+
if person == "" {
40+
return ErrValidation("empty person name")
41+
}
42+
43+
if t.Seats[seatNumber] != "" && t.Seats[seatNumber] != person {
44+
return ErrValidation("seat number is already booked by another person")
45+
}
46+
47+
return nil
48+
}

async_test.go

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
// (c) 2022 Jacek Olszak
2+
// This code is licensed under MIT license (see LICENSE for details)
3+
4+
package batch_test
5+
6+
import (
7+
"testing"
8+
"time"
9+
10+
"github.com/stretchr/testify/assert"
11+
)
12+
13+
func FutureValue[V any]() Value[V] {
14+
return Value[V]{
15+
done: make(chan V, 1),
16+
}
17+
}
18+
19+
type Value[V any] struct {
20+
done chan V
21+
}
22+
23+
func (d Value[V]) Set(result V) {
24+
d.done <- result
25+
}
26+
27+
func (d Value[V]) Get(t *testing.T) V {
28+
select {
29+
case r, _ := <-d.done:
30+
return r
31+
case <-time.After(time.Second):
32+
assert.FailNow(t, "timeout waiting for value")
33+
var r V
34+
return r
35+
}
36+
}

0 commit comments

Comments
 (0)