diff --git a/pkg/exchange/coinbase/exchage.go b/pkg/exchange/coinbase/exchage.go index 4e13a6527..68f65481d 100644 --- a/pkg/exchange/coinbase/exchage.go +++ b/pkg/exchange/coinbase/exchage.go @@ -39,12 +39,21 @@ var log = logrus.WithField("exchange", ID) type Exchange struct { client *api.RestAPIClient + + // api keys + apiKey string + apiSecret string + apiPassphrase string } func New(key, secret, passphrase string, timeout time.Duration) *Exchange { client := api.NewClient(key, secret, passphrase, timeout) return &Exchange{ client: client, + + apiKey: key, + apiSecret: secret, + apiPassphrase: passphrase, } } @@ -261,8 +270,7 @@ func (e *Exchange) CancelOrders(ctx context.Context, orders ...types.Order) erro // ExchangeMarketDataService func (e *Exchange) NewStream() types.Stream { - // TODO: implement stream - return nil + return NewStream(e, e.apiKey, e.apiPassphrase, e.apiSecret) } func (e *Exchange) QueryMarkets(ctx context.Context) (types.MarketMap, error) { diff --git a/pkg/exchange/coinbase/parse.go b/pkg/exchange/coinbase/parse.go index 89db94452..f48c78b20 100644 --- a/pkg/exchange/coinbase/parse.go +++ b/pkg/exchange/coinbase/parse.go @@ -6,7 +6,7 @@ import ( ) // See https://docs.cdp.coinbase.com/exchange/docs/websocket-channels for message types -func (s *Stream) parseMessage(data []byte) (interface{}, error) { +func parseMessage(data []byte) (interface{}, error) { var baseMsg messageBaseType err := json.Unmarshal(data, &baseMsg) if err != nil { @@ -91,6 +91,27 @@ func (s *Stream) parseMessage(data []byte) (interface{}, error) { return nil, err } return &activeMsg, nil + case "balance": + var balanceMsg BalanceMessage + err = json.Unmarshal(data, &balanceMsg) + if err != nil { + return nil, err + } + return &balanceMsg, nil + case "snapshot": + var snapshotMsg OrderBookSnapshotMessage + err = json.Unmarshal(data, &snapshotMsg) + if err != nil { + return nil, err + } + return &snapshotMsg, nil + case "l2update": + var updateMsg OrderBookUpdateMessage + err = json.Unmarshal(data, &updateMsg) + if err != nil { + return nil, err + } + return &updateMsg, nil } return nil, fmt.Errorf("unknown message type: %s", baseMsg.Type) } diff --git a/pkg/exchange/coinbase/parse_test.go b/pkg/exchange/coinbase/parse_test.go new file mode 100644 index 000000000..8bd1fe5c9 --- /dev/null +++ b/pkg/exchange/coinbase/parse_test.go @@ -0,0 +1,444 @@ +package coinbase + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_ParseHeartbeat(t *testing.T) { + data := []byte(` +{ + "type": "heartbeat", + "sequence": 90, + "last_trade_id": 20, + "product_id": "BTC-USD", + "time": "2014-11-07T08:19:28.464459Z" +} +`) + msg, err := parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &HeartbeatMessage{}, msg) +} + +func Test_ParseStatus(t *testing.T) { + data := []byte(` +{ + "type": "status", + "products": [ + { + "id": "BTC-USD", + "base_currency": "BTC", + "quote_currency": "USD", + "base_increment": "0.00000001", + "quote_increment": "0.01", + "display_name": "BTC-USD", + "status": "online", + "status_message": null, + "min_market_funds": "10", + "post_only": false, + "limit_only": false, + "cancel_only": false, + "fx_stablecoin": false + } + ], + "currencies": [ + { + "id": "USD", + "name": "United States Dollar", + "display_name": "USD", + "min_size": "0.01000000", + "status": "online", + "status_message": null, + "max_precision": "0.01", + "convertible_to": ["USDC"], + "details": {}, + "default_network": "", + "supported_networks": [] + }, + { + "id": "BTC", + "name": "Bitcoin", + "display_name": "BTC", + "min_size": "0.00000001", + "status": "online", + "status_message": null, + "max_precision": "0.00000001", + "convertible_to": [], + "details": {}, + "default_network": "bitcoin", + "supported_networks": [ + { + "id": "bitcoin", + "name": "Bitcoin", + "status": "online", + "contract_address": "", + "crypto_address_link": "https://live.blockcypher.com/btc/address/{{address}}", + "crypto_transaction_link": "https://live.blockcypher.com/btc/tx/{{txId}}", + "min_withdrawal_amount": 0.0001, + "max_withdrawal_amount": 2400, + "network_confirmations": 2, + "processing_time_seconds": 0, + "destination_tag_regex": "" + } + ] + } + ] +} +`) + msg, err := parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &StatusMessage{}, msg) +} + +func Test_ParseAuction(t *testing.T) { + data := []byte(` +{ + "type": "auction", + "product_id": "LTC-USD", + "sequence": 3262786978, + "auction_state": "collection", + "best_bid_price": "333.98", + "best_bid_size": "4.39088265", + "best_ask_price": "333.99", + "best_ask_size": "25.23542881", + "open_price": "333.99", + "open_size": "0.193", + "can_open": "yes", + "timestamp": "2015-11-14T20:46:03.511254Z" +} +`) + msg, err := parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &AuctionMessage{}, msg) +} + +func Test_ParseRfq(t *testing.T) { + data := []byte(` +{ + "type": "rfq_match", + "maker_order_id": "ac928c66-ca53-498f-9c13-a110027a60e8", + "taker_order_id": "132fb6ae-456b-4654-b4e0-d681ac05cea1", + "time": "2014-11-07T08:19:27.028459Z", + "trade_id": 30, + "product_id": "BTC-USD", + "size": "5.23512", + "price": "400.23", + "side": "sell" +} +`) + msg, err := parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &RfqMessage{}, msg) +} + +func Test_ParseTicker(t *testing.T) { + data := []byte(` +{ + "type": "ticker", + "sequence": 37475248783, + "product_id": "ETH-USD", + "price": "1285.22", + "open_24h": "1310.79", + "volume_24h": "245532.79269678", + "low_24h": "1280.52", + "high_24h": "1313.8", + "volume_30d": "9788783.60117027", + "best_bid": "1285.04", + "best_bid_size": "0.46688654", + "best_ask": "1285.27", + "best_ask_size": "1.56637040", + "side": "buy", + "time": "2022-10-19T23:28:22.061769Z", + "trade_id": 370843401, + "last_size": "11.4396987" +} +`) + msg, err := parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &TickerMessage{}, msg) +} + +func Test_ReceivedMessage(t *testing.T) { + // limit order + data := []byte(` +{ + "type": "received", + "time": "2014-11-07T08:19:27.028459Z", + "product_id": "BTC-USD", + "sequence": 10, + "order_id": "d50ec984-77a8-460a-b958-66f114b0de9b", + "size": "1.34", + "price": "502.1", + "side": "buy", + "order_type": "limit", + "client-oid": "d50ec974-76a2-454b-66f135b1ea8c" +} +`) + msg, err := parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &ReceivedMessage{}, msg) + + // market order + data = []byte(` +{ + "type": "received", + "time": "2014-11-09T08:19:27.028459Z", + "product_id": "BTC-USD", + "sequence": 12, + "order_id": "dddec984-77a8-460a-b958-66f114b0de9b", + "funds": "3000.234", + "side": "buy", + "order_type": "market", + "client-oid": "d50ec974-76a2-454b-66f135b1ea8c" +} +`) + msg, err = parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &ReceivedMessage{}, msg) +} + +func Test_ParseOpen(t *testing.T) { + data := []byte(` +{ + "type": "open", + "time": "2014-11-07T08:19:27.028459Z", + "product_id": "BTC-USD", + "sequence": 10, + "order_id": "d50ec984-77a8-460a-b958-66f114b0de9b", + "price": "200.2", + "remaining_size": "1.00", + "side": "sell" +} +`) + msg, err := parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &OpenMessage{}, msg) +} + +func Test_ParseDone(t *testing.T) { + // filled + data := []byte(` +{ + "type": "done", + "time": "2014-11-07T08:19:27.028459Z", + "product_id": "BTC-USD", + "sequence": 10, + "price": "200.2", + "order_id": "d50ec984-77a8-460a-b958-66f114b0de9b", + "reason": "filled", + "side": "sell", + "remaining_size": "0" +} +`) + msg, err := parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &DoneMessage{}, msg) + + // canceled + data = []byte(` +{ + "type": "done", + "time": "2014-11-07T08:19:27.028459Z", + "product_id": "BTC-USD", + "sequence": 10, + "price": "200.2", + "order_id": "d50ec984-77a8-460a-b958-66f114b0de9b", + "reason": "canceled", + "side": "sell", + "remaining_size": "8", + "cancel_reason": "102:Self Trade Prevention" +} +`) + msg, err = parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &DoneMessage{}, msg) +} + +func Test_ParseMatch(t *testing.T) { + data := []byte(` +{ + "type": "match", + "trade_id": 10, + "sequence": 50, + "maker_order_id": "ac928c66-ca53-498f-9c13-a110027a60e8", + "taker_order_id": "132fb6ae-456b-4654-b4e0-d681ac05cea1", + "time": "2014-11-07T08:19:27.028459Z", + "product_id": "BTC-USD", + "size": "5.23512", + "price": "400.23", + "side": "sell" +} +`) + msg, err := parseMessage(data) + matchMsg, ok := msg.(*MatchMessage) + assert.NoError(t, err) + assert.True(t, ok) + assert.False(t, matchMsg.IsAuthMaker()) + + data = []byte(` +{ + "type": "match", + "trade_id": 10, + "sequence": 50, + "maker_order_id": "ac928c66-ca53-498f-9c13-a110027a60e8", + "taker_order_id": "132fb6ae-456b-4654-b4e0-d681ac05cea1", + "time": "2014-11-07T08:19:27.028459Z", + "product_id": "BTC-USD", + "size": "5.23512", + "price": "400.23", + "side": "sell", + "taker_user_id": "5844eceecf7e803e259d0365", + "user_id": "5844eceecf7e803e259d0365", + "taker_profile_id": "765d1549-9660-4be2-97d4-fa2d65fa3352", + "profile_id": "765d1549-9660-4be2-97d4-fa2d65fa3352", + "taker_fee_rate": "0.005" +} +`) + msg, err = parseMessage(data) + matchMsg, ok = msg.(*MatchMessage) + assert.NoError(t, err) + assert.True(t, ok) + assert.False(t, matchMsg.IsAuthMaker()) + + data = []byte(` +{ + "type": "match", + "trade_id": 10, + "sequence": 50, + "maker_order_id": "ac928c66-ca53-498f-9c13-a110027a60e8", + "taker_order_id": "132fb6ae-456b-4654-b4e0-d681ac05cea1", + "time": "2014-11-07T08:19:27.028459Z", + "product_id": "BTC-USD", + "size": "5.23512", + "price": "400.23", + "side": "sell", + "maker_user_id": "5f8a07f17b7a102330be40a3", + "user_id": "5f8a07f17b7a102330be40a3", + "maker_profile_id": "7aa6b75c-0ff1-11eb-adc1-0242ac120002", + "profile_id": "7aa6b75c-0ff1-11eb-adc1-0242ac120002", + "maker_fee_rate": "0.001" +} +`) + msg, err = parseMessage(data) + matchMsg, ok = msg.(*MatchMessage) + assert.NoError(t, err) + assert.True(t, ok) + assert.True(t, matchMsg.IsAuthMaker()) +} + +func Test_ParseChange(t *testing.T) { + data := []byte(` +{ + "type": "change", + "reason":"STP", + "time": "2014-11-07T08:19:27.028459Z", + "sequence": 80, + "order_id": "ac928c66-ca53-498f-9c13-a110027a60e8", + "side": "sell", + "product_id": "BTC-USD", + "old_size": "12.234412", + "new_size": "5.23512", + "price": "400.23" +} +`) + msg, err := parseMessage(data) + changeMsg, ok := msg.(*ChangeMessage) + assert.NoError(t, err) + assert.True(t, ok) + assert.True(t, changeMsg.IsStp()) + + data = []byte(` +{ + "type": "change", + "reason":"modify_order", + "time": "2022-06-06T22:55:43.433114Z", + "sequence": 24753, + "order_id": "c3f16063-77b1-408f-a743-88b7bc20cdcd", + "side": "buy", + "product_id": "ETH-USD", + "old_size": "80", + "new_size": "80", + "old_price": "7", + "new_price": "6" +} +`) + msg, err = parseMessage(data) + changeMsg, ok = msg.(*ChangeMessage) + assert.NoError(t, err) + assert.True(t, ok) + assert.True(t, changeMsg.IsModifyOrder()) +} + +func Test_ParseActive(t *testing.T) { + data := []byte(` +{ + "type": "active", + "time": "2014-11-07T08:19:27.028459Z", + "product_id": "BTC-USD", + "sequence": 10, + "order_id": "d50ec984-77a8-460a-b958-66f114b0de9b", + "side": "sell", + "remaining_size": "1.00" +} +`) + msg, err := parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &ActiveMessage{}, msg) +} + +func Test_ParseBalance(t *testing.T) { + data := []byte(` +{ + "type": "balance", + "account_id": "d50ec984-77a8-460a-b958-66f114b0de9b", + "currency": "USD", + "holds": "1000.23", + "available": "102030.99", + "updated": "2023-10-10T20:42:27.265Z", + "timestamp": "2023-10-10T20:42:29.265Z" +} +`) + msg, err := parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &BalanceMessage{}, msg) +} + +func Test_ParseOrderBookSnapshot(t *testing.T) { + data := []byte(` +{ + "type": "snapshot", + "product_id": "BTC-USD", + "bids": [["10101.10", "0.45054140"]], + "asks": [["10102.55", "0.57753524"]] +} +`) + msg, err := parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &OrderBookSnapshotMessage{}, msg) +} + +func Test_ParseOrderBookUpdate(t *testing.T) { + data := []byte(` +{ + "type": "l2update", + "product_id": "BTC-USD", + "changes": [ + [ + "buy", + "22356.270000", + "0.00000000" + ], + [ + "buy", + "22356.300000", + "1.00000000" + ] + ], + "time": "2022-08-04T15:25:05.010758Z" +} +`) + msg, err := parseMessage(data) + assert.NoError(t, err) + assert.IsType(t, &OrderBookUpdateMessage{}, msg) +} diff --git a/pkg/exchange/coinbase/stream.go b/pkg/exchange/coinbase/stream.go index cf167eec2..6481b81fa 100644 --- a/pkg/exchange/coinbase/stream.go +++ b/pkg/exchange/coinbase/stream.go @@ -6,6 +6,7 @@ import ( "crypto/sha256" "encoding/base64" "strconv" + "sync" "time" "github.com/c9s/bbgo/pkg/types" @@ -17,41 +18,50 @@ const rfqMatchChannel = "rfq_matches" //go:generate callbackgen -type Stream type Stream struct { types.StandardStream + + exchange *Exchange apiKey string passphrase string secretKey string // callbacks - statusMessageCallbacks []func(m *StatusMessage) - auctionMessageCallbacks []func(m *AuctionMessage) - rfqMessageCallbacks []func(m *RfqMessage) - tickerMessageCallbacks []func(m *TickerMessage) - receivedMessageCallbacks []func(m *ReceivedMessage) - openMessageCallbacks []func(m *OpenMessage) - doneMessageCallbacks []func(m *DoneMessage) - matchMessageCallbacks []func(m *MatchMessage) - changeMessageCallbacks []func(m *ChangeMessage) - activeMessageCallbacks []func(m *ActiveMessage) + statusMessageCallbacks []func(m *StatusMessage) + auctionMessageCallbacks []func(m *AuctionMessage) + rfqMessageCallbacks []func(m *RfqMessage) + tickerMessageCallbacks []func(m *TickerMessage) + receivedMessageCallbacks []func(m *ReceivedMessage) + openMessageCallbacks []func(m *OpenMessage) + doneMessageCallbacks []func(m *DoneMessage) + matchMessageCallbacks []func(m *MatchMessage) + changeMessageCallbacks []func(m *ChangeMessage) + activeMessageCallbacks []func(m *ActiveMessage) + balanceMessageCallbacks []func(m *BalanceMessage) + orderbookSnapshotMessageCallbacks []func(m *OrderBookSnapshotMessage) + orderbookUpdateMessageCallbacks []func(m *OrderBookUpdateMessage) + + lock sync.Mutex // lock to protect lastSequenceMsgMap + lastSequenceMsgMap map[MessageType]SequenceNumberType } func NewStream( + exchange *Exchange, apiKey string, passphrase string, secretKey string, ) *Stream { s := Stream{ StandardStream: types.NewStandardStream(), + exchange: exchange, apiKey: apiKey, passphrase: passphrase, secretKey: secretKey, } - s.SetParser(s.parseMessage) + s.SetParser(parseMessage) s.SetDispatcher(s.dispatchEvent) s.SetEndpointCreator(createEndpoint) // public handlers s.OnConnect(s.handleConnect) - return &s } @@ -77,6 +87,12 @@ func (s *Stream) dispatchEvent(e interface{}) { s.EmitChangeMessage(e) case *ActiveMessage: s.EmitActiveMessage(e) + case *BalanceMessage: + s.EmitBalanceMessage(e) + case *OrderBookSnapshotMessage: + s.EmitOrderbookSnapshotMessage(e) + case *OrderBookUpdateMessage: + s.EmitOrderbookUpdateMessage(e) default: log.Warnf("skip dispatching msg due to unknown message type: %T", e) } @@ -86,75 +102,14 @@ func createEndpoint(ctx context.Context) (string, error) { return wsFeedURL, nil } -type channelType struct { - Name string `json:"name"` - ProductIDs []string `json:"product_ids,omitempty"` -} - -type websocketCommand struct { - Type string `json:"type"` - Channels []channelType `json:"channels"` - Signature *string `json:"signature,omitempty"` - Key *string `json:"key,omitempty"` - Passphrase *string `json:"passphrase,omitempty"` - Timestamp *string `json:"timestamp,omitempty"` -} - -func (s *Stream) handleConnect() { - // subscribe to channels - if len(s.Subscriptions) == 0 { - return - } - - subProductsMap := make(map[string][]string) - for _, sub := range s.Subscriptions { - strChannel := string(sub.Channel) - // rfqMatchChannel allow empty symbol - if sub.Channel != rfqMatchChannel && len(sub.Symbol) == 0 { - continue - } - subProductsMap[strChannel] = append(subProductsMap[strChannel], sub.Symbol) - } - subCmds := []websocketCommand{} - signature, ts := s.generateSignature() - for channel, productIDs := range subProductsMap { - var subType string - switch channel { - case "rfq_matches": - subType = "subscriptions" - default: - subType = "subscribe" - } - subCmd := websocketCommand{ - Type: subType, - Channels: []channelType{ - { - Name: channel, - ProductIDs: productIDs, - }, - }, - } - if s.authEnabled() { - subCmd.Signature = &signature - subCmd.Key = &s.apiKey - subCmd.Passphrase = &s.passphrase - subCmd.Timestamp = &ts - } - subCmds = append(subCmds, subCmd) - } - for _, subCmd := range subCmds { - err := s.Conn.WriteJSON(subCmd) - if err != nil { - log.WithError(err).Errorf("subscription error: %v", subCmd) - } - } -} - -func (s *Stream) authEnabled() bool { +func (s *Stream) AuthEnabled() bool { return !s.PublicOnly && len(s.apiKey) > 0 && len(s.passphrase) > 0 && len(s.secretKey) > 0 } func (s *Stream) generateSignature() (string, string) { + if len(s.apiKey) == 0 || len(s.passphrase) == 0 || len(s.secretKey) == 0 { + return "", "" + } // Convert current time to string timestamp ts := strconv.FormatInt(time.Now().Unix(), 10) @@ -177,3 +132,7 @@ func (s *Stream) generateSignature() (string, string) { return signature, ts } + +func (s *Stream) handleConnect() { + // TODO: dummy, will add connection logic later +} diff --git a/pkg/exchange/coinbase/stream_callbacks.go b/pkg/exchange/coinbase/stream_callbacks.go index a311ea2ef..0dd08536b 100644 --- a/pkg/exchange/coinbase/stream_callbacks.go +++ b/pkg/exchange/coinbase/stream_callbacks.go @@ -4,102 +4,132 @@ package coinbase import () -func (S *Stream) OnStatusMessage(cb func(m *StatusMessage)) { - S.statusMessageCallbacks = append(S.statusMessageCallbacks, cb) +func (s *Stream) OnStatusMessage(cb func(m *StatusMessage)) { + s.statusMessageCallbacks = append(s.statusMessageCallbacks, cb) } -func (S *Stream) EmitStatusMessage(m *StatusMessage) { - for _, cb := range S.statusMessageCallbacks { +func (s *Stream) EmitStatusMessage(m *StatusMessage) { + for _, cb := range s.statusMessageCallbacks { cb(m) } } -func (S *Stream) OnAuctionMessage(cb func(m *AuctionMessage)) { - S.auctionMessageCallbacks = append(S.auctionMessageCallbacks, cb) +func (s *Stream) OnAuctionMessage(cb func(m *AuctionMessage)) { + s.auctionMessageCallbacks = append(s.auctionMessageCallbacks, cb) } -func (S *Stream) EmitAuctionMessage(m *AuctionMessage) { - for _, cb := range S.auctionMessageCallbacks { +func (s *Stream) EmitAuctionMessage(m *AuctionMessage) { + for _, cb := range s.auctionMessageCallbacks { cb(m) } } -func (S *Stream) OnRfqMessage(cb func(m *RfqMessage)) { - S.rfqMessageCallbacks = append(S.rfqMessageCallbacks, cb) +func (s *Stream) OnRfqMessage(cb func(m *RfqMessage)) { + s.rfqMessageCallbacks = append(s.rfqMessageCallbacks, cb) } -func (S *Stream) EmitRfqMessage(m *RfqMessage) { - for _, cb := range S.rfqMessageCallbacks { +func (s *Stream) EmitRfqMessage(m *RfqMessage) { + for _, cb := range s.rfqMessageCallbacks { cb(m) } } -func (S *Stream) OnTickerMessage(cb func(m *TickerMessage)) { - S.tickerMessageCallbacks = append(S.tickerMessageCallbacks, cb) +func (s *Stream) OnTickerMessage(cb func(m *TickerMessage)) { + s.tickerMessageCallbacks = append(s.tickerMessageCallbacks, cb) } -func (S *Stream) EmitTickerMessage(m *TickerMessage) { - for _, cb := range S.tickerMessageCallbacks { +func (s *Stream) EmitTickerMessage(m *TickerMessage) { + for _, cb := range s.tickerMessageCallbacks { cb(m) } } -func (S *Stream) OnReceivedMessage(cb func(m *ReceivedMessage)) { - S.receivedMessageCallbacks = append(S.receivedMessageCallbacks, cb) +func (s *Stream) OnReceivedMessage(cb func(m *ReceivedMessage)) { + s.receivedMessageCallbacks = append(s.receivedMessageCallbacks, cb) } -func (S *Stream) EmitReceivedMessage(m *ReceivedMessage) { - for _, cb := range S.receivedMessageCallbacks { +func (s *Stream) EmitReceivedMessage(m *ReceivedMessage) { + for _, cb := range s.receivedMessageCallbacks { cb(m) } } -func (S *Stream) OnOpenMessage(cb func(m *OpenMessage)) { - S.openMessageCallbacks = append(S.openMessageCallbacks, cb) +func (s *Stream) OnOpenMessage(cb func(m *OpenMessage)) { + s.openMessageCallbacks = append(s.openMessageCallbacks, cb) } -func (S *Stream) EmitOpenMessage(m *OpenMessage) { - for _, cb := range S.openMessageCallbacks { +func (s *Stream) EmitOpenMessage(m *OpenMessage) { + for _, cb := range s.openMessageCallbacks { cb(m) } } -func (S *Stream) OnDoneMessage(cb func(m *DoneMessage)) { - S.doneMessageCallbacks = append(S.doneMessageCallbacks, cb) +func (s *Stream) OnDoneMessage(cb func(m *DoneMessage)) { + s.doneMessageCallbacks = append(s.doneMessageCallbacks, cb) } -func (S *Stream) EmitDoneMessage(m *DoneMessage) { - for _, cb := range S.doneMessageCallbacks { +func (s *Stream) EmitDoneMessage(m *DoneMessage) { + for _, cb := range s.doneMessageCallbacks { cb(m) } } -func (S *Stream) OnMatchMessage(cb func(m *MatchMessage)) { - S.matchMessageCallbacks = append(S.matchMessageCallbacks, cb) +func (s *Stream) OnMatchMessage(cb func(m *MatchMessage)) { + s.matchMessageCallbacks = append(s.matchMessageCallbacks, cb) } -func (S *Stream) EmitMatchMessage(m *MatchMessage) { - for _, cb := range S.matchMessageCallbacks { +func (s *Stream) EmitMatchMessage(m *MatchMessage) { + for _, cb := range s.matchMessageCallbacks { cb(m) } } -func (S *Stream) OnChangeMessage(cb func(m *ChangeMessage)) { - S.changeMessageCallbacks = append(S.changeMessageCallbacks, cb) +func (s *Stream) OnChangeMessage(cb func(m *ChangeMessage)) { + s.changeMessageCallbacks = append(s.changeMessageCallbacks, cb) } -func (S *Stream) EmitChangeMessage(m *ChangeMessage) { - for _, cb := range S.changeMessageCallbacks { +func (s *Stream) EmitChangeMessage(m *ChangeMessage) { + for _, cb := range s.changeMessageCallbacks { cb(m) } } -func (S *Stream) OnActiveMessage(cb func(m *ActiveMessage)) { - S.activeMessageCallbacks = append(S.activeMessageCallbacks, cb) +func (s *Stream) OnActiveMessage(cb func(m *ActiveMessage)) { + s.activeMessageCallbacks = append(s.activeMessageCallbacks, cb) } -func (S *Stream) EmitActiveMessage(m *ActiveMessage) { - for _, cb := range S.activeMessageCallbacks { +func (s *Stream) EmitActiveMessage(m *ActiveMessage) { + for _, cb := range s.activeMessageCallbacks { + cb(m) + } +} + +func (s *Stream) OnBalanceMessage(cb func(m *BalanceMessage)) { + s.balanceMessageCallbacks = append(s.balanceMessageCallbacks, cb) +} + +func (s *Stream) EmitBalanceMessage(m *BalanceMessage) { + for _, cb := range s.balanceMessageCallbacks { + cb(m) + } +} + +func (s *Stream) OnOrderbookSnapshotMessage(cb func(m *OrderBookSnapshotMessage)) { + s.orderbookSnapshotMessageCallbacks = append(s.orderbookSnapshotMessageCallbacks, cb) +} + +func (s *Stream) EmitOrderbookSnapshotMessage(m *OrderBookSnapshotMessage) { + for _, cb := range s.orderbookSnapshotMessageCallbacks { + cb(m) + } +} + +func (s *Stream) OnOrderbookUpdateMessage(cb func(m *OrderBookUpdateMessage)) { + s.orderbookUpdateMessageCallbacks = append(s.orderbookUpdateMessageCallbacks, cb) +} + +func (s *Stream) EmitOrderbookUpdateMessage(m *OrderBookUpdateMessage) { + for _, cb := range s.orderbookUpdateMessageCallbacks { cb(m) } } diff --git a/pkg/exchange/coinbase/stream_messages.go b/pkg/exchange/coinbase/stream_messages.go index 28b207cd7..088372e18 100644 --- a/pkg/exchange/coinbase/stream_messages.go +++ b/pkg/exchange/coinbase/stream_messages.go @@ -2,28 +2,40 @@ package coinbase // https://docs.cdp.coinbase.com/exchange/docs/websocket-channels import ( + "strings" "time" api "github.com/c9s/bbgo/pkg/exchange/coinbase/api/v1" "github.com/c9s/bbgo/pkg/fixedpoint" + "github.com/c9s/bbgo/pkg/types" ) -// TODOs: -// - Level2 channels -// - Level3 channels +// TODO: Level3 channels + +type MessageType string +type SequenceNumberType uint64 type messageBaseType struct { - Type string `json:"type"` + Type MessageType `json:"type"` } type seqenceMessageType struct { messageBaseType - ProductID string `json:"product_id"` - Sequence int `json:"sequence"` + ProductID string `json:"product_id"` + Sequence SequenceNumberType `json:"sequence"` +} + +func (s *seqenceMessageType) QuoteCurrency() string { + splits := strings.Split(s.ProductID, "-") + if len(splits) == 2 { + return splits[1] + } + return "" } // Websocket message types +// heartbeat channel type HeartbeatMessage struct { seqenceMessageType @@ -31,6 +43,7 @@ type HeartbeatMessage struct { Time time.Time `json:"time"` } +// status channel type StatusMessage struct { messageBaseType Products []struct { @@ -75,6 +88,7 @@ type StatusMessage struct { } `json:"currencies"` } +// auction channel type AuctionMessage struct { seqenceMessageType @@ -89,6 +103,7 @@ type AuctionMessage struct { Timestamp time.Time `json:"timestamp"` // ex: "2015-11-14T20:46:03.511254Z" } +// rfq_matches channel type RfqMessage struct { messageBaseType @@ -102,6 +117,7 @@ type RfqMessage struct { Side api.SideType `json:"side"` } +// ticker channel type TickerMessage struct { seqenceMessageType @@ -121,6 +137,34 @@ type TickerMessage struct { LastSize fixedpoint.Value `json:"last_size"` } +func (msg *TickerMessage) Trade() types.Trade { + var side types.SideType + switch msg.Side { + case "buy": + side = types.SideTypeBuy + case "sell": + side = types.SideTypeSell + default: + side = types.SideType(msg.Side) + } + isBuyer := side == types.SideTypeBuy + quoteQuantity := msg.Price.Mul(msg.LastSize) + return types.Trade{ + Exchange: types.ExchangeCoinBase, + Price: msg.Price, + Quantity: msg.LastSize, + QuoteQuantity: quoteQuantity, + Side: side, + Symbol: toGlobalSymbol(msg.ProductID), + IsBuyer: isBuyer, + IsMaker: !isBuyer, + Time: types.Time(msg.Time), + FeeCurrency: msg.QuoteCurrency(), + Fee: fixedpoint.Zero, // not available + } +} + +// full channel type ReceivedMessage struct { seqenceMessageType @@ -142,6 +186,16 @@ func (m *ReceivedMessage) IsMarketOrder() bool { return !m.Funds.IsZero() } +func (m *ReceivedMessage) ToGlobalOrder() types.Order { + return types.Order{ + SubmitOrder: types.SubmitOrder{ + ClientOrderID: m.ClientOid, + }, + Exchange: types.ExchangeCoinBase, + IsWorking: false, + } +} + type OpenMessage struct { seqenceMessageType @@ -179,18 +233,64 @@ type MatchMessage struct { ProfileID string `json:"profile_id"` // extra fields for taker - TakerUserID string `json:"taker_user_id,omitempty"` - TakerProfileID string `json:"taker_profile_id,omitempty"` - TakerFeeRate string `json:"taker_fee_rate,omitempty"` + TakerUserID string `json:"taker_user_id,omitempty"` + TakerProfileID string `json:"taker_profile_id,omitempty"` + TakerFeeRate fixedpoint.Value `json:"taker_fee_rate,omitempty"` // extra fields for maker - MakerUserID string `json:"maker_user_id,omitempty"` - MakerProfileID string `json:"maker_profile_id,omitempty"` - MakerFeeRate string `json:"maker_fee_rate,omitempty"` + MakerUserID string `json:"maker_user_id,omitempty"` + MakerProfileID string `json:"maker_profile_id,omitempty"` + MakerFeeRate fixedpoint.Value `json:"maker_fee_rate,omitempty"` +} + +func (msg *MatchMessage) Trade() types.Trade { + var side types.SideType + switch msg.Side { + case "buy": + side = types.SideTypeBuy + case "sell": + side = types.SideTypeSell + default: + side = types.SideType(msg.Side) + } + quoteQuantity := msg.Size.Mul(msg.Price) + return types.Trade{ + Exchange: types.ExchangeCoinBase, + Price: msg.Price, + Quantity: msg.Size, + QuoteQuantity: quoteQuantity, + Side: side, + Symbol: toGlobalSymbol(msg.ProductID), + IsBuyer: side == types.SideTypeBuy, + IsMaker: msg.IsAuthMaker(), + Time: types.Time(msg.Time), + FeeCurrency: msg.QuoteCurrency(), + Fee: quoteQuantity.Mul(msg.FeeRate()), + } +} + +func (m *MatchMessage) isAuth() bool { + return len(m.TakerUserID) > 0 || len(m.MakerUserID) > 0 +} + +func (m *MatchMessage) IsAuthMaker() bool { + // not authenticated + if !m.isAuth() { + return false + } + return len(m.MakerUserID) > 0 } -func (m *MatchMessage) IsAuthTaker() bool { - return len(m.TakerUserID) > 0 +// https://help.coinbase.com/en/exchange/trading-and-funding/exchange-fees +func (m *MatchMessage) FeeRate() fixedpoint.Value { + if !m.isAuth() { + // not available + return fixedpoint.Zero + } + if m.IsAuthMaker() { + return m.MakerFeeRate + } + return m.TakerFeeRate } type ChangeMessage struct { @@ -234,3 +334,32 @@ type ActiveMessage struct { Funds fixedpoint.Value `json:"funds"` Private bool `json:"private"` } + +// balance channel +type BalanceMessage struct { + messageBaseType + + AccountID string `json:"account_id"` + Currency string `json:"currency"` + Holds fixedpoint.Value `json:"holds"` + Available fixedpoint.Value `json:"available"` + Updated types.Time `json:"updated"` + Timestamp types.Time `json:"timestamp"` +} + +// level2 channel +type OrderBookSnapshotMessage struct { + messageBaseType + + ProductID string `json:"product_id"` + Bids [][]fixedpoint.Value `json:"bids"` // [["price", "size"], ...] + Asks [][]fixedpoint.Value `json:"asks"` // [["price", "size"], ...] +} + +type OrderBookUpdateMessage struct { + messageBaseType + + ProductID string `json:"product_id"` + Time time.Time `json:"time"` + Changes [][]string `json:"changes"` // [["side", "price", "size"], ...] +}