Skip to content

Commit

Permalink
all: move trade collector to pkg/core
Browse files Browse the repository at this point in the history
  • Loading branch information
c9s committed Jul 5, 2023
1 parent ff727ae commit 1ad10a9
Show file tree
Hide file tree
Showing 14 changed files with 328 additions and 325 deletions.
15 changes: 8 additions & 7 deletions pkg/bbgo/mocks/mock_order_executor_extended.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pkg/bbgo/order_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
log "github.com/sirupsen/logrus"
"go.uber.org/multierr"

"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
"github.com/c9s/bbgo/pkg/util"
Expand All @@ -32,7 +33,7 @@ type OrderExecutor interface {
type OrderExecutorExtended interface {
SubmitOrders(ctx context.Context, orders ...types.SubmitOrder) (createdOrders types.OrderSlice, err error)
CancelOrders(ctx context.Context, orders ...types.Order) error
TradeCollector() *TradeCollector
TradeCollector() *core.TradeCollector
Position() *types.Position
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/bbgo/order_executor_general.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type GeneralOrderExecutor struct {
position *types.Position
activeMakerOrders *ActiveOrderBook
orderStore *core.OrderStore
tradeCollector *TradeCollector
tradeCollector *core.TradeCollector

logger log.FieldLogger

Expand All @@ -60,7 +60,7 @@ func NewGeneralOrderExecutor(session *ExchangeSession, symbol, strategy, strateg
position: position,
activeMakerOrders: NewActiveOrderBook(symbol),
orderStore: orderStore,
tradeCollector: NewTradeCollector(symbol, position, orderStore),
tradeCollector: core.NewTradeCollector(symbol, position, orderStore),
}

if session != nil && session.Margin {
Expand Down Expand Up @@ -517,7 +517,7 @@ func (e *GeneralOrderExecutor) ClosePosition(ctx context.Context, percentage fix
return nil
}

func (e *GeneralOrderExecutor) TradeCollector() *TradeCollector {
func (e *GeneralOrderExecutor) TradeCollector() *core.TradeCollector {
return e.tradeCollector
}

Expand Down
237 changes: 0 additions & 237 deletions pkg/bbgo/tradecollector.go
Original file line number Diff line number Diff line change
@@ -1,238 +1 @@
package bbgo

import (
"context"
"sync"
"time"

log "github.com/sirupsen/logrus"

"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/sigchan"
"github.com/c9s/bbgo/pkg/types"
)

//go:generate callbackgen -type TradeCollector
type TradeCollector struct {
Symbol string
orderSig sigchan.Chan

tradeStore *core.TradeStore
tradeC chan types.Trade
position *types.Position
orderStore *core.OrderStore
doneTrades map[types.TradeKey]struct{}

mu sync.Mutex

recoverCallbacks []func(trade types.Trade)

tradeCallbacks []func(trade types.Trade, profit, netProfit fixedpoint.Value)

positionUpdateCallbacks []func(position *types.Position)
profitCallbacks []func(trade types.Trade, profit *types.Profit)
}

func NewTradeCollector(symbol string, position *types.Position, orderStore *core.OrderStore) *TradeCollector {
return &TradeCollector{
Symbol: symbol,
orderSig: sigchan.New(1),

tradeC: make(chan types.Trade, 100),
tradeStore: core.NewTradeStore(),
doneTrades: make(map[types.TradeKey]struct{}),
position: position,
orderStore: orderStore,
}
}

// OrderStore returns the order store used by the trade collector
func (c *TradeCollector) OrderStore() *core.OrderStore {
return c.orderStore
}

// Position returns the position used by the trade collector
func (c *TradeCollector) Position() *types.Position {
return c.position
}

func (c *TradeCollector) TradeStore() *core.TradeStore {
return c.tradeStore
}

func (c *TradeCollector) SetPosition(position *types.Position) {
c.position = position
}

// QueueTrade sends the trade object to the trade channel,
// so that the goroutine can receive the trade and process in the background.
func (c *TradeCollector) QueueTrade(trade types.Trade) {
c.tradeC <- trade
}

// BindStreamForBackground bind the stream callback for background processing
func (c *TradeCollector) BindStreamForBackground(stream types.Stream) {
stream.OnTradeUpdate(c.QueueTrade)
}

func (c *TradeCollector) BindStream(stream types.Stream) {
stream.OnTradeUpdate(func(trade types.Trade) {
c.ProcessTrade(trade)
})
}

// Emit triggers the trade processing (position update)
// If you sent order, and the order store is updated, you can call this method
// so that trades will be processed in the next round of the goroutine loop
func (c *TradeCollector) Emit() {
c.orderSig.Emit()
}

func (c *TradeCollector) Recover(ctx context.Context, ex types.ExchangeTradeHistoryService, symbol string, from time.Time) error {
trades, err := ex.QueryTrades(ctx, symbol, &types.TradeQueryOptions{
StartTime: &from,
})

if err != nil {
return err
}

for _, td := range trades {
log.Debugf("processing trade: %s", td.String())
if c.ProcessTrade(td) {
log.Infof("recovered trade: %s", td.String())
c.EmitRecover(td)
}
}
return nil
}

func (c *TradeCollector) setDone(key types.TradeKey) {
c.mu.Lock()
c.doneTrades[key] = struct{}{}
c.mu.Unlock()
}

// Process filters the received trades and see if there are orders matching the trades
// if we have the order in the order store, then the trade will be considered for the position.
// profit will also be calculated.
func (c *TradeCollector) Process() bool {
positionChanged := false

c.tradeStore.Filter(func(trade types.Trade) bool {
key := trade.Key()

c.mu.Lock()
defer c.mu.Unlock()

// if it's already done, remove the trade from the trade store
if _, done := c.doneTrades[key]; done {
return true
}

if c.orderStore.Exists(trade.OrderID) {
if c.position != nil {
profit, netProfit, madeProfit := c.position.AddTrade(trade)
if madeProfit {
p := c.position.NewProfit(trade, profit, netProfit)
c.EmitTrade(trade, profit, netProfit)
c.EmitProfit(trade, &p)
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
c.EmitProfit(trade, nil)
}
positionChanged = true
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
}

c.doneTrades[key] = struct{}{}
return true
}
return false
})

if positionChanged && c.position != nil {
c.EmitPositionUpdate(c.position)
}

return positionChanged
}

// processTrade takes a trade and see if there is a matched order
// if the order is found, then we add the trade to the position
// return true when the given trade is added
// return false when the given trade is not added
func (c *TradeCollector) processTrade(trade types.Trade) bool {
c.mu.Lock()
defer c.mu.Unlock()

key := trade.Key()

// if it's already done, remove the trade from the trade store
if _, done := c.doneTrades[key]; done {
return false
}

if c.orderStore.Exists(trade.OrderID) {
if c.position != nil {
profit, netProfit, madeProfit := c.position.AddTrade(trade)
if madeProfit {
p := c.position.NewProfit(trade, profit, netProfit)
c.EmitTrade(trade, profit, netProfit)
c.EmitProfit(trade, &p)
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
c.EmitProfit(trade, nil)
}
c.EmitPositionUpdate(c.position)
} else {
c.EmitTrade(trade, fixedpoint.Zero, fixedpoint.Zero)
}

c.doneTrades[key] = struct{}{}
return true
}
return false
}

// return true when the given trade is added
// return false when the given trade is not added
func (c *TradeCollector) ProcessTrade(trade types.Trade) bool {
key := trade.Key()
// if it's already done, remove the trade from the trade store
c.mu.Lock()
if _, done := c.doneTrades[key]; done {
return false
}
c.mu.Unlock()

if c.processTrade(trade) {
return true
}

c.tradeStore.Add(trade)
return false
}

// Run is a goroutine executed in the background
// Do not use this function if you need back-testing
func (c *TradeCollector) Run(ctx context.Context) {
var ticker = time.NewTicker(3 * time.Second)
for {
select {
case <-ctx.Done():
return

case <-ticker.C:
c.Process()

case <-c.orderSig:
c.Process()

case trade := <-c.tradeC:
c.ProcessTrade(trade)
}
}
}
65 changes: 0 additions & 65 deletions pkg/bbgo/tradecollector_test.go
Original file line number Diff line number Diff line change
@@ -1,66 +1 @@
package bbgo

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/c9s/bbgo/pkg/core"
"github.com/c9s/bbgo/pkg/fixedpoint"
"github.com/c9s/bbgo/pkg/types"
)

func TestTradeCollector_ShouldNotCountDuplicatedTrade(t *testing.T) {
symbol := "BTCUSDT"
position := types.NewPosition(symbol, "BTC", "USDT")
orderStore := core.NewOrderStore(symbol)
collector := NewTradeCollector(symbol, position, orderStore)
assert.NotNil(t, collector)

matched := collector.ProcessTrade(types.Trade{
ID: 1,
OrderID: 399,
Exchange: types.ExchangeBinance,
Price: fixedpoint.NewFromInt(40000),
Quantity: fixedpoint.One,
QuoteQuantity: fixedpoint.NewFromInt(40000),
Symbol: "BTCUSDT",
Side: types.SideTypeBuy,
IsBuyer: true,
})
assert.False(t, matched, "should be added to the trade store")
assert.Equal(t, 1, len(collector.tradeStore.Trades()), "should have one trade in the trade store")

orderStore.Add(types.Order{
SubmitOrder: types.SubmitOrder{
Symbol: "BTCUSDT",
Side: types.SideTypeBuy,
Type: types.OrderTypeLimit,
Quantity: fixedpoint.One,
Price: fixedpoint.NewFromInt(40000),
},
Exchange: types.ExchangeBinance,
OrderID: 399,
Status: types.OrderStatusFilled,
ExecutedQuantity: fixedpoint.One,
IsWorking: false,
})

matched = collector.Process()
assert.True(t, matched)
assert.Equal(t, 0, len(collector.tradeStore.Trades()), "the found trade should be removed from the trade store")

matched = collector.ProcessTrade(types.Trade{
ID: 1,
OrderID: 399,
Exchange: types.ExchangeBinance,
Price: fixedpoint.NewFromInt(40000),
Quantity: fixedpoint.One,
QuoteQuantity: fixedpoint.NewFromInt(40000),
Symbol: "BTCUSDT",
Side: types.SideTypeBuy,
IsBuyer: true,
})
assert.False(t, matched, "the same trade should not match")
assert.Equal(t, 0, len(collector.tradeStore.Trades()), "the same trade should not be added to the trade store")
}
4 changes: 2 additions & 2 deletions pkg/cmd/backtest.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ var BacktestCmd = &cobra.Command{
var reportDir = outputDirectory
var sessionTradeStats = make(map[string]map[string]*types.TradeStats)

var tradeCollectorList []*bbgo.TradeCollector
var tradeCollectorList []*core.TradeCollector
for _, exSource := range exchangeSources {
sessionName := exSource.Session.Name
tradeStatsMap := make(map[string]*types.TradeStats)
Expand All @@ -317,7 +317,7 @@ var BacktestCmd = &cobra.Command{
position := types.NewPositionFromMarket(market)
orderStore := core.NewOrderStore(usedSymbol)
orderStore.AddOrderUpdate = true
tradeCollector := bbgo.NewTradeCollector(usedSymbol, position, orderStore)
tradeCollector := core.NewTradeCollector(usedSymbol, position, orderStore)

tradeStats := types.NewTradeStats(usedSymbol)
tradeStats.SetIntervalProfitCollector(types.NewIntervalProfitCollector(types.Interval1d, startTime))
Expand Down
Loading

0 comments on commit 1ad10a9

Please sign in to comment.