diff --git a/docs/schema.yaml b/docs/schema.yaml new file mode 100644 index 0000000..9e76f64 --- /dev/null +++ b/docs/schema.yaml @@ -0,0 +1,172 @@ +openapi: 3.0.3 +info: + title: Aggr-server API + version: 1.0.0 +servers: + - url: https://api.aggr.trade/ + description: AGGR Trade server + - url: http://localhost:3000/ + description: Default local server url +paths: + /alert: + post: + summary: Create or update an alert + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/AlertPayload' + responses: + '201': + description: Alert created or updated successfully + content: + application/json: + schema: + $ref: '#/components/schemas/AlertResponse' + '400': + description: Invalid alert payload + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + /products: + get: + summary: Get the list of products + responses: + '200': + description: Successful response + content: + application/json: + schema: + $ref: '#/components/schemas/ProductsResponse' + + /historical/{from}/{to}/{timeframe}/{markets}: + get: + summary: Get historical data + parameters: + - in: path + name: from + schema: + type: integer + required: true + description: Start timestamp of the historical data + - in: path + name: to + schema: + type: integer + required: true + description: End timestamp of the historical data + - in: path + name: timeframe + schema: + type: string + required: true + description: Timeframe of the historical data + - in: path + name: markets + schema: + type: string + description: Markets to fetch historical data for + required: true + responses: + '200': + description: Successful response + content: + application/json: + schema: + $ref: '#/components/schemas/HistoricalDataResponse' + '400': + description: Invalid request or missing interval + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '404': + description: No results found + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' +components: + schemas: + AlertPayload: + type: object + properties: + endpoint: + type: string + keys: + type: array + items: + type: string + market: + type: string + price: + type: number + required: + - endpoint + - keys + - market + - price + AlertResponse: + type: object + + Error: + type: object + properties: + error: + type: string + ProductsResponse: + type: array + items: + type: string + HistoricalDataResponse: + type: object + properties: + format: + type: string + columns: + type: object + properties: + time: + type: integer + cbuy: + type: integer + close: + type: integer + csell: + type: integer + high: + type: integer + lbuy: + type: integer + low: + type: integer + lsell: + type: integer + market: + type: string + open: + type: integer + vbuy: + type: number + vsell: + type: number + nullable: true + results: + type: array + items: + type: array + items: + oneOf: + - type: integer + - type: number + - type: string + nullable: true + \ No newline at end of file diff --git a/lefthook.yml b/lefthook.yml new file mode 100644 index 0000000..6e48c30 --- /dev/null +++ b/lefthook.yml @@ -0,0 +1,12 @@ +# EXAMPLE USAGE +# Refer for explanation to following link: +# https://github.com/evilmartians/lefthook/blob/master/docs/full_guide.md +# + +pre-commit: + parallel: true + commands: + prettier: + glob: '*.{js,jsx,ts,tsx,css,scss,md,html,json,yml}' + run: npx prettier --write "{staged_files}" + stage_fixed: true diff --git a/src/config.js b/src/config.js index c97ec28..33d2b0a 100644 --- a/src/config.js +++ b/src/config.js @@ -232,15 +232,11 @@ if (process.argv.length > 2) { let userSettings = {} -const specificConfigFile = commandSettings.config - ? commandSettings.config - : commandSettings.configFile - ? commandSettings.configFile - : commandSettings.configPath - ? commandSettings.configPath - : null - -let configPath = specificConfigFile || 'config.json' +let configPath = + commandSettings?.config || + commandSettings?.configFile || + commandSettings?.configPath || + 'config.json' try { console.log('[init] using config file ' + configPath) diff --git a/src/exchange.js b/src/exchange.js index 3025411..349db07 100644 --- a/src/exchange.js +++ b/src/exchange.js @@ -140,7 +140,8 @@ class Exchange extends EventEmitter { /** * Link exchange to a pair - * @param {*} pair + * @param {string} pair + * @param {boolean} returnConnectedEvent * @returns {Promise} */ async link(pair, returnConnectedEvent) { @@ -932,10 +933,11 @@ class Exchange extends EventEmitter { } /** - * Unsub + * Unsubscribe * @param {WebSocket} api * @param {string} pair * @param {boolean} skipSending skip sending unsusbribe message + * @returns {boolean} */ async unsubscribe(api, pair, skipSending) { if (!this.markPairAsDisconnected(api, pair)) { diff --git a/src/exchanges/gateio.js b/src/exchanges/gateio.js new file mode 100644 index 0000000..ccca73b --- /dev/null +++ b/src/exchanges/gateio.js @@ -0,0 +1,192 @@ +const Exchange = require('../exchange') +const WebSocket = require('websocket').w3cwebsocket +const axios = require('axios') + +class GateIO extends Exchange { + constructor() { + super() + + this.id = 'GATEIO' + this.endpoints = { + PRODUCTS: [ + 'https://api.gateio.ws/api/v4/spot/currency_pairs', + 'https://api.gateio.ws/api/v4/futures/usdt/contracts' + ] + } + this.types = {} + this.multipliers = {} + } + + async getUrl(pair) { + // https://www.gate.io/docs/developers/apiv4/ws/en/ + const type = this.types[pair] + if (type === 'spot') return 'wss://ws.gate.io/v3/' + if (type === 'futures') return 'wss://fx-ws.gateio.ws/v4/ws/usdt' + } + + formatProducts(response) { + const products = [] + const multipliers = {} + const types = {} + + response.forEach((data, index) => { + const type = ['spot', 'futures'][index] + + data.forEach(product => { + let pair + switch (type) { + case 'spot': + pair = product.id + '_SPOT' + multipliers[pair] = parseFloat(product.min_base_amount) + break + case 'futures': + pair = product.name + '_FUTURES' + multipliers[pair] = parseFloat(product.quanto_multiplier) + break + } + + if (products.find(a => a.toLowerCase() === pair.toLowerCase())) { + throw new Error( + 'Duplicate pair detected on gateio exchange (' + pair + ')' + ) + } + + types[pair] = type + products.push(pair) + }) + }) + return { + products, + multipliers, + types + } + } + + /** + * Sub + * @param {WebSocket} api + * @param {string} pair + */ + async subscribe(api, pair) { + // Public Trades Channel + // https://www.gate.io/docs/developers/apiv4/ws/en/#public-trades-channel + if (!(await super.subscribe(api, pair))) { + return + } + + if (this.types[pair] === 'spot') { + api.send( + JSON.stringify({ + method: `trades.subscribe`, + params: [pair.split('_').slice(0, 2).join('_')] + }) + ) + } + if (this.types[pair] === 'futures') { + api.send( + JSON.stringify({ + time: Date.now(), + channel: `${this.types[pair]}.trades`, + event: 'subscribe', + payload: [pair.split('_').slice(0, 2).join('_')] + }) + ) + } + + return true + } + + /** + * Sub + * @param {WebSocket} api + * @param {string} pair + */ + async unsubscribe(api, pair) { + if (!(await super.unsubscribe(api, pair))) { + return + } + + if (this.types[pair] === 'spot') { + api.send( + JSON.stringify({ + method: `trades.unsubscribe`, + params: [pair.split('_').slice(0, 2).join('_')] + }) + ) + } else if (this.types[pair] === 'futures') { + api.send( + JSON.stringify({ + time: Date.now(), + channel: `${this.types[pair]}.trades`, + event: 'unsubscribe', + payload: [pair.split('_').slice(0, 2).join('_')] + }) + ) + } + + return true + } + + formatPerpTrade(trade, channel) { + return { + exchange: this.id, + pair: trade.contract + '_' + channel, + timestamp: +new Date(trade.create_time_ms), + price: +trade.price, + size: +( + Math.abs(trade.size) * this.multipliers[trade.contract + '_' + channel] + ), + side: trade.size > 0 ? 'buy' : 'sell' + } + } + + formatSpotTrade() + + onMessage(event, api) { + const json = JSON.parse(event.data) + + if (!json) { + return + } + + let trades = [] + + const { channel, event, result, method, params } = json + + isPerpChannel = () => ( + channel && channel?.endsWith('trades') && + event && event === 'update' && + result && result?.length + ) + + + isSpotChannel = () => + method === 'trades.update' && + Array.isArray(params) + + if (isPerpChannel()) { + trades = result.map(trade => this.formatPerpTrade( + trade, + channel.split('.')[0].toUpperCase() + ) + ) + } else if ( isSpotChannel() ) { + const [contract, tradesData] = params + trades = tradesData.map(trade => { + return { + exchange: this.id, + pair: contract + '_SPOT', + timestamp: +new Date( + parseInt((trade.time * 1000).toString().split('.')[0]) + ), + price: +trade.price, + size: +trade.amount, + side: trade.type + } + }) + } + return this.emitTrades(api.id, trades) + } +} + +module.exports = GateIO diff --git a/src/models/index.js b/src/models/index.js new file mode 100644 index 0000000..25c8e6c --- /dev/null +++ b/src/models/index.js @@ -0,0 +1,5 @@ +const Alert = require('./alert.model.js') + +module.exports = { + Alert +} diff --git a/src/server.js b/src/server.js index 6fc9f85..25335d7 100644 --- a/src/server.js +++ b/src/server.js @@ -249,13 +249,10 @@ class Server extends EventEmitter { this.dispatchAggregateTrade.bind(this, exchange.id) ) } else { - exchange.on('trades', this.dispatchRawTrades.bind(this, exchange.id)) + exchange.on('trades', this.dispatchRawTrades.bind(this)) } - exchange.on( - 'liquidations', - this.dispatchRawTrades.bind(this, exchange.id) - ) + exchange.on('liquidations', this.dispatchRawTrades.bind(this)) exchange.on('disconnected', (pair, apiId, apiLength) => { const id = exchange.id + ':' + pair @@ -991,12 +988,16 @@ class Server extends EventEmitter { }) } - dispatchRawTrades(exchange, data) { - for (let i = 0; i < data.length; i++) { - const trade = data[i] + /** + * @param {Trade[]} trades + */ + + dispatchRawTrades(trades) { + for (let i = 0; i < trades.length; i++) { + const trade = trades[i] if (!trade.liquidation) { - const identifier = exchange + ':' + trade.pair + const identifier = trade.exchange + ':' + trade.pair // ping connection connections[identifier].hit++ @@ -1011,9 +1012,9 @@ class Server extends EventEmitter { if (config.broadcast) { if (!config.broadcastAggr && !config.broadcastDebounce) { - this.broadcastTrades(data) + this.broadcastTrades(trades) } else { - Array.prototype.push.apply(this.delayedForBroadcast, data) + Array.prototype.push.apply(this.delayedForBroadcast, trades) } } } diff --git a/src/services/socket.js b/src/services/socket.js index 81ccf07..cacb85a 100644 --- a/src/services/socket.js +++ b/src/services/socket.js @@ -21,7 +21,6 @@ class SocketService extends EventEmitter { */ this.serverSocket = null - this.clusterSocket = null this.clusteredCollectors = [] if (config.influxCollectors) { diff --git a/src/typedef.js b/src/typedef.js index 2e69c63..a6a7667 100644 --- a/src/typedef.js +++ b/src/typedef.js @@ -1,7 +1,13 @@ /** - * A trade - * @typedef Trade - * @type {{exchange: string, pair: string, timestamp: number, price: number, size: number, side: number, liquidation: boolean?}} + * Represents a trade. + * @typedef {Object} Trade + * @property {string} exchange - The exchange where the trade occurred. + * @property {string} pair - The trading pair involved in the trade. + * @property {number} timestamp - The timestamp of the trade. + * @property {number} price - The price at which the trade occurred. + * @property {number} size - The size or quantity of the trade. + * @property {number} side - The side of the trade (1 for buy, 2 for sell, for example). + * @property {boolean} [liquidation] - Optional. Indicates whether the trade was a liquidation. */ /** @@ -49,10 +55,10 @@ * Keep track of a single connection (exchange + pair) * @typedef Connection * @type {{ - * exchange: string, - * pair: string, - * apiId: string, - * hit: number, + * exchange: string, + * pair: string, + * apiId: string, + * hit: number, * ping: number, * bar?: Bar * }}