From 924e53b83ebeff2adfe86658fd565d66ad5b3bde Mon Sep 17 00:00:00 2001 From: "evgenii.ilichev" Date: Tue, 11 Feb 2020 10:46:48 +0300 Subject: [PATCH 1/5] introduce authorization --- .../dsx/DsxStreamingExchange.java | 31 ++++++-- .../dsx/DsxStreamingService.java | 73 ++++++++++++++++++- .../dsx/DsxStreamingTradeService.java | 52 +++++++++++++ .../dsx/DsxSubscriptionHelper.java | 4 +- .../dsx/dto/enums/DsxEventType.java | 1 + .../dsx/dto/messages/DsxAuthMessage.java | 35 +++++++++ .../DsxWebSocketBookSubscriptionMessage.java | 2 +- ...ebSocketInstrumentSubscriptionMessage.java | 26 +++++++ .../DsxWebSocketSubscriptionMessage.java | 9 +-- .../DsxWebSocketTradeSubscriptionMessage.java | 2 +- .../xchangestream/dsx/DsxManualExample.java | 8 +- 11 files changed, 225 insertions(+), 18 deletions(-) create mode 100644 xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingTradeService.java create mode 100644 xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthMessage.java create mode 100644 xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketInstrumentSubscriptionMessage.java diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingExchange.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingExchange.java index 6215684f9..0aec173ad 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingExchange.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingExchange.java @@ -3,8 +3,10 @@ import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.core.StreamingExchange; import info.bitrich.xchangestream.core.StreamingMarketDataService; +import info.bitrich.xchangestream.core.StreamingTradeService; import io.reactivex.Completable; import io.reactivex.Observable; +import org.apache.commons.lang3.StringUtils; import org.knowm.xchange.ExchangeSpecification; import org.knowm.xchange.hitbtc.v2.HitbtcExchange; @@ -14,17 +16,26 @@ public class DsxStreamingExchange extends HitbtcExchange implements StreamingExchange { private static final String API_URI = "ws://localhost:8080/stream"; - private final DsxStreamingService streamingService; + private DsxStreamingService streamingService; private DsxStreamingMarketDataService streamingMarketDataService; - - public DsxStreamingExchange() { - this.streamingService = new DsxStreamingService(API_URI); - } + private DsxStreamingTradeService streamingTradeService; @Override protected void initServices() { super.initServices(); - streamingMarketDataService = new DsxStreamingMarketDataService(streamingService); + this.streamingService = createStreamingService(); + this.streamingMarketDataService = new DsxStreamingMarketDataService(streamingService); + this.streamingTradeService = new DsxStreamingTradeService(streamingService); + } + + private DsxStreamingService createStreamingService() { + DsxStreamingService streamingService = new DsxStreamingService(API_URI, getNonceFactory()); + applyStreamingSpecification(getExchangeSpecification(), streamingService); + if (StringUtils.isNotEmpty(exchangeSpecification.getApiKey())) { + streamingService.setApiKey(exchangeSpecification.getApiKey()); + streamingService.setApiSecret(exchangeSpecification.getSecretKey()); + } + return streamingService; } @Override @@ -64,6 +75,14 @@ public ExchangeSpecification getDefaultExchangeSpecification() { public StreamingMarketDataService getStreamingMarketDataService() { return streamingMarketDataService; } + + @Override + public StreamingTradeService getStreamingTradeService() { + if (streamingService.isAuthDataProvided()) { + streamingService.authorize(); + } + return streamingTradeService; + } @Override public void useCompressedMessages(boolean compressedMessages) { streamingService.useCompressedMessages(compressedMessages); } diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingService.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingService.java index 54306e248..5ed5a0cac 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingService.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingService.java @@ -3,17 +3,30 @@ import com.fasterxml.jackson.databind.JsonNode; import info.bitrich.xchangestream.dsx.dto.DsxChannelInfo; import info.bitrich.xchangestream.dsx.dto.enums.DsxEventType; +import info.bitrich.xchangestream.dsx.dto.messages.DsxAuthMessage; +import info.bitrich.xchangestream.dsx.dto.messages.DsxTradeMessage; import info.bitrich.xchangestream.dsx.dto.messages.DsxWebSocketSubscriptionMessage; import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler; +import io.reactivex.Observable; +import io.reactivex.subjects.PublishSubject; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import si.mazi.rescu.SynchronizedValueFactory; +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.xml.bind.DatatypeConverter; import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.InvalidKeyException; +import java.security.NoSuchAlgorithmException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import static info.bitrich.xchangestream.dsx.DsxSubscriptionHelper.CHANNEL_DELIMITER; +import static org.knowm.xchange.service.BaseParamsDigest.HMAC_SHA_512; /** * @author rimalon @@ -25,12 +38,16 @@ public class DsxStreamingService extends JsonNettyStreamingService { private static final String JSON_CHANNEL = "channel"; private static final String JSON_INSTRUMENT = "instrument"; private static final String JSON_INSTRUMENT_TYPE = "instrumentType"; + private static final String JSON_ERROR_CODE = "errorCode"; private final Map requests = new ConcurrentHashMap<>(); private final Map lastTradeIds = new ConcurrentHashMap<>(); - public DsxStreamingService(String apiUrl) { + private final PublishSubject subjectTrade = PublishSubject.create(); + + public DsxStreamingService(String apiUrl, SynchronizedValueFactory nonceFactory) { super(apiUrl, Integer.MAX_VALUE); + this.nonceFactory = nonceFactory; } @Override @@ -59,6 +76,10 @@ protected void handleMessage(JsonNode message) { DsxEventType dsxEvent = DsxEventType.getEvent(eventName); if (dsxEvent != null) { switch (dsxEvent) { + case authorize: + LOG.debug("Message received: {}", message.toString()); + processAuthorizeResponse(message); + break; case snapshot: case update: LOG.debug("Message received: {}", message.toString()); @@ -93,6 +114,13 @@ private void processRequestId(JsonNode message, DsxEventType event) { LOG.warn("Unknown request has been successfully processed. Result: '{}'. Message: {}", event, message.toString()); } + private void processAuthorizeResponse(JsonNode message) { + LOG.info("Process authorize msg {}", message); + if (message.get(JSON_ERROR_CODE) == null) { + isAuthorized = true; + } + } + @Override public String getSubscribeMessage(String channelName, Object... args) throws IOException { DsxChannelInfo channelInfo = DsxSubscriptionHelper.parseChannelName(channelName); @@ -111,4 +139,47 @@ public String getUnsubscribeMessage(String channelName) throws IOException { LOG.info("Unsubscription message for channel {} has been generated. RequestId {}", channelName, message.getRid()); return objectMapper.writeValueAsString(message); } + + public Observable getAuthenticatedTrades() { + return subjectTrade.share(); + } + + boolean isAuthDataProvided() { + return StringUtils.isNotEmpty(apiKey) && StringUtils.isNotEmpty(apiSecret); + } + + private String apiKey; + private String apiSecret; + + private boolean isAuthorized = false; + + void setApiKey(String apiKey) { + this.apiKey = apiKey; + } + + void setApiSecret(String apiSecret) { + this.apiSecret = apiSecret; + } + + private final SynchronizedValueFactory nonceFactory; + + public void authorize() { + long nonce = nonceFactory.createValue(); + String payload = "AUTH" + nonce; + String signature; + try { + Mac macEncoder = Mac.getInstance(HMAC_SHA_512); + SecretKeySpec secretKeySpec = new SecretKeySpec(apiSecret.getBytes(StandardCharsets.UTF_8), HMAC_SHA_512); + macEncoder.init(secretKeySpec); + byte[] result = macEncoder.doFinal(payload.getBytes(StandardCharsets.UTF_8)); + signature = DatatypeConverter.printHexBinary(result); + + DsxAuthMessage message = new DsxAuthMessage( + DsxEventType.authorize, apiKey, String.valueOf(nonce), signature.toLowerCase()); + sendObjectMessage(message); + } catch (NoSuchAlgorithmException | InvalidKeyException e) { + LOG.error("auth. Sign failed error={}", e.getMessage()); + } + } + } diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingTradeService.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingTradeService.java new file mode 100644 index 000000000..ea3e06c15 --- /dev/null +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingTradeService.java @@ -0,0 +1,52 @@ +package info.bitrich.xchangestream.dsx; + +import info.bitrich.xchangestream.core.StreamingTradeService; +import info.bitrich.xchangestream.dsx.dto.messages.DsxTradeMessage; +import io.reactivex.Observable; +import org.knowm.xchange.currency.CurrencyPair; +import org.knowm.xchange.dto.Order; +import org.knowm.xchange.dto.trade.UserTrade; +import org.knowm.xchange.exceptions.ExchangeSecurityException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.function.Function; + +public class DsxStreamingTradeService implements StreamingTradeService { + + private static final Logger LOG = LoggerFactory.getLogger(DsxStreamingTradeService.class); + + private final DsxStreamingService service; + + public DsxStreamingTradeService(DsxStreamingService service) { + this.service = service; + } + + @Override + public Observable getOrderChanges(CurrencyPair currencyPair, Object... args) { + return null; + } + + + public Observable getUserTrades() { + return getAuthenticatedTrades() + .map(t -> new UserTrade.Builder().build()); + } + + @Override + public Observable getUserTrades(CurrencyPair currencyPair, Object... args) { + return getUserTrades() + .filter(t -> currencyPair.equals(t.getCurrencyPair())); + } + + public Observable getAuthenticatedTrades() { + return withAuthenticatedService(DsxStreamingService::getAuthenticatedTrades); + } + + private Observable withAuthenticatedService(Function> serviceConsumer) { + if (!service.isAuthDataProvided()) { + throw new ExchangeSecurityException("Not authenticated"); + } + return serviceConsumer.apply(service); + } +} diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxSubscriptionHelper.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxSubscriptionHelper.java index 214539d4f..ece40598e 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxSubscriptionHelper.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxSubscriptionHelper.java @@ -5,6 +5,7 @@ import info.bitrich.xchangestream.dsx.dto.enums.DsxEventType; import info.bitrich.xchangestream.dsx.dto.enums.DsxInstrumentType; import info.bitrich.xchangestream.dsx.dto.messages.DsxWebSocketBookSubscriptionMessage; +import info.bitrich.xchangestream.dsx.dto.messages.DsxWebSocketInstrumentSubscriptionMessage; import info.bitrich.xchangestream.dsx.dto.messages.DsxWebSocketSubscriptionMessage; import info.bitrich.xchangestream.dsx.dto.messages.DsxWebSocketTradeSubscriptionMessage; import org.apache.commons.lang3.StringUtils; @@ -46,8 +47,7 @@ public static DsxWebSocketTradeSubscriptionMessage createTradeSubscriptionMessag } public static DsxWebSocketSubscriptionMessage createBaseSubscriptionMessage(DsxChannelInfo channelInfo, DsxEventType eventType) { - return new DsxWebSocketSubscriptionMessage(generateRequestId(), eventType, channelInfo.getInstrument(), channelInfo.getInstrumentType() - ); + return new DsxWebSocketInstrumentSubscriptionMessage(generateRequestId(), eventType, channelInfo.getInstrument(), channelInfo.getInstrumentType()); } public static DsxChannelInfo parseChannelName(String channelName) { diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/enums/DsxEventType.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/enums/DsxEventType.java index dc78952b3..4863c3959 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/enums/DsxEventType.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/enums/DsxEventType.java @@ -24,6 +24,7 @@ public enum DsxEventType { unsubscribed, unsubscriptionFailed, + authorize, snapshot, update; diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthMessage.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthMessage.java new file mode 100644 index 000000000..a92f983cd --- /dev/null +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthMessage.java @@ -0,0 +1,35 @@ +package info.bitrich.xchangestream.dsx.dto.messages; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import info.bitrich.xchangestream.dsx.dto.enums.DsxEventType; + +public class DsxAuthMessage extends DsxEventMessage { + + private final String apiKey; + private final String authNonce; + private final String authSig; + + @JsonCreator + public DsxAuthMessage(@JsonProperty("event") DsxEventType event, + @JsonProperty("apiKey") String apiKey, + @JsonProperty("authNonce") String authNonce, + @JsonProperty("authSig") String authSig) { + super(event); + this.apiKey = apiKey; + this.authNonce = authNonce; + this.authSig = authSig; + } + + public String getApiKey() { + return apiKey; + } + + public String getAuthNonce() { + return authNonce; + } + + public String getAuthSig() { + return authSig; + } +} diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketBookSubscriptionMessage.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketBookSubscriptionMessage.java index 6e1207034..9b4446d09 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketBookSubscriptionMessage.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketBookSubscriptionMessage.java @@ -8,7 +8,7 @@ /** * @author rimalon */ -public class DsxWebSocketBookSubscriptionMessage extends DsxWebSocketSubscriptionMessage { +public class DsxWebSocketBookSubscriptionMessage extends DsxWebSocketInstrumentSubscriptionMessage { private final Integer limit; @ConstructorProperties({"rid", "event", "instrument", "instrumentType", "limit"}) diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketInstrumentSubscriptionMessage.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketInstrumentSubscriptionMessage.java new file mode 100644 index 000000000..6ec09519a --- /dev/null +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketInstrumentSubscriptionMessage.java @@ -0,0 +1,26 @@ +package info.bitrich.xchangestream.dsx.dto.messages; + +import info.bitrich.xchangestream.dsx.dto.enums.DsxEventType; +import info.bitrich.xchangestream.dsx.dto.enums.DsxInstrumentType; + +import java.beans.ConstructorProperties; + +public class DsxWebSocketInstrumentSubscriptionMessage extends DsxWebSocketSubscriptionMessage { + private final DsxInstrumentType instrumentType; + private final String instrument; + + @ConstructorProperties({"rid", "event", "instrument", "instrumentType"}) + public DsxWebSocketInstrumentSubscriptionMessage(long rid, DsxEventType event, String instrument, DsxInstrumentType instrumentType) { + super(rid, event); + this.instrument = instrument; + this.instrumentType = instrumentType; + } + + public DsxInstrumentType getInstrumentType() { + return instrumentType; + } + + public String getInstrument() { + return instrument; + } +} diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketSubscriptionMessage.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketSubscriptionMessage.java index ed47f8cd3..5873d16a2 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketSubscriptionMessage.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketSubscriptionMessage.java @@ -1,19 +1,18 @@ package info.bitrich.xchangestream.dsx.dto.messages; import info.bitrich.xchangestream.dsx.dto.enums.DsxEventType; -import info.bitrich.xchangestream.dsx.dto.enums.DsxInstrumentType; import java.beans.ConstructorProperties; /** * @author rimalon */ -public class DsxWebSocketSubscriptionMessage extends InstrumentMessage { +public class DsxWebSocketSubscriptionMessage extends DsxEventMessage { private final long rid; - @ConstructorProperties({"rid", "event", "instrument", "instrumentType"}) - public DsxWebSocketSubscriptionMessage(long rid, DsxEventType event, String instrument, DsxInstrumentType instrumentType) { - super(event, instrument, instrumentType); + @ConstructorProperties({"rid", "event"}) + public DsxWebSocketSubscriptionMessage(long rid, DsxEventType event) { + super(event); this.rid = rid; } diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketTradeSubscriptionMessage.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketTradeSubscriptionMessage.java index 8b19d2a3a..aab7ca4cc 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketTradeSubscriptionMessage.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketTradeSubscriptionMessage.java @@ -8,7 +8,7 @@ /** * @author rimalon */ -public class DsxWebSocketTradeSubscriptionMessage extends DsxWebSocketSubscriptionMessage { +public class DsxWebSocketTradeSubscriptionMessage extends DsxWebSocketInstrumentSubscriptionMessage { private final Long prevDealId; @ConstructorProperties({"rid", "event", "instrument", "instrumentType", "prevDealId"}) diff --git a/xchange-dsx/src/test/java/info/bitrich/xchangestream/dsx/DsxManualExample.java b/xchange-dsx/src/test/java/info/bitrich/xchangestream/dsx/DsxManualExample.java index d5ab9dd81..bf038fb7a 100644 --- a/xchange-dsx/src/test/java/info/bitrich/xchangestream/dsx/DsxManualExample.java +++ b/xchange-dsx/src/test/java/info/bitrich/xchangestream/dsx/DsxManualExample.java @@ -4,6 +4,7 @@ import info.bitrich.xchangestream.core.StreamingExchangeFactory; import info.bitrich.xchangestream.dsx.dto.enums.DsxInstrumentType; import io.reactivex.disposables.Disposable; +import org.knowm.xchange.ExchangeSpecification; import org.knowm.xchange.currency.CurrencyPair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,8 +16,11 @@ public class DsxManualExample { private static final Logger LOG = LoggerFactory.getLogger(DsxManualExample.class); public static void main(String[] args) throws InterruptedException { - StreamingExchange exchange = StreamingExchangeFactory.INSTANCE.createExchange(DsxStreamingExchange.class - .getName()); + ExchangeSpecification spec = StreamingExchangeFactory.INSTANCE.createExchange( + DsxStreamingExchange.class.getName()).getDefaultExchangeSpecification(); + spec.setApiKey("dsx-apiKey"); + spec.setSecretKey("dsx-apiSecret"); + DsxStreamingExchange exchange = (DsxStreamingExchange) StreamingExchangeFactory.INSTANCE.createExchange(spec); exchange.connect().blockingAwait(); Disposable orderBookObserver = exchange.getStreamingMarketDataService().getOrderBook(CurrencyPair.BTC_USD, DsxInstrumentType.LIVE, 1).subscribe(orderBook -> { From e46320bfa8d36c3b1812baa9dff5d51d14a97285 Mon Sep 17 00:00:00 2001 From: "evgenii.ilichev" Date: Wed, 12 Feb 2020 17:04:21 +0300 Subject: [PATCH 2/5] fix conflicts --- .../xchangestream/dsx/DsxStreamingExchange.java | 10 ++++------ .../xchangestream/dsx/DsxSubscriptionHelper.java | 2 +- .../DsxWebSocketInstrumentSubscriptionMessage.java | 11 +++++++++-- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingExchange.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingExchange.java index c88b11a7d..5ef0b2d79 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingExchange.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingExchange.java @@ -30,16 +30,14 @@ public DsxStreamingExchange() { protected void initServices() { super.initServices(); Object apiURI = getExchangeSpecification().getExchangeSpecificParametersItem(DSX_SPEC_PARAMS_API_URI); - streamingService = new DsxStreamingService(apiURI == null ? DEFAULT_API_URI : (String) apiURI); + streamingService = createStreamingService(apiURI == null ? DEFAULT_API_URI : (String) apiURI); streamingService.setBeforeConnectionHandler((Runnable) getExchangeSpecification().getExchangeSpecificParametersItem(BEFORE_CONNECTION_HANDLER)); streamingMarketDataService = new DsxStreamingMarketDataService(streamingService); - this.streamingService = createStreamingService(); - this.streamingMarketDataService = new DsxStreamingMarketDataService(streamingService); - this.streamingTradeService = new DsxStreamingTradeService(streamingService); + streamingTradeService = new DsxStreamingTradeService(streamingService); } - private DsxStreamingService createStreamingService() { - DsxStreamingService streamingService = new DsxStreamingService(API_URI, getNonceFactory()); + private DsxStreamingService createStreamingService(String apiURI) { + DsxStreamingService streamingService = new DsxStreamingService(apiURI, getNonceFactory()); applyStreamingSpecification(getExchangeSpecification(), streamingService); if (StringUtils.isNotEmpty(exchangeSpecification.getApiKey())) { streamingService.setApiKey(exchangeSpecification.getApiKey()); diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxSubscriptionHelper.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxSubscriptionHelper.java index e6f5892b8..a3ae8e9a2 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxSubscriptionHelper.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxSubscriptionHelper.java @@ -48,7 +48,7 @@ public static DsxWebSocketTradeSubscriptionMessage createTradeSubscriptionMessag } public static DsxWebSocketSubscriptionMessage createBaseSubscriptionMessage(DsxChannelInfo channelInfo, DsxEventType eventType) { - return new DsxWebSocketInstrumentSubscriptionMessage(generateRequestId(), eventType, channelInfo.getInstrument(), channelInfo.getInstrumentType()); + return new DsxWebSocketInstrumentSubscriptionMessage(generateRequestId(), eventType, channelInfo.getChannel(), channelInfo.getInstrument(), channelInfo.getInstrumentType()); } public static DsxChannelInfo parseChannelName(String channelName) { diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketInstrumentSubscriptionMessage.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketInstrumentSubscriptionMessage.java index 6ec09519a..84579c827 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketInstrumentSubscriptionMessage.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxWebSocketInstrumentSubscriptionMessage.java @@ -1,17 +1,20 @@ package info.bitrich.xchangestream.dsx.dto.messages; +import info.bitrich.xchangestream.dsx.dto.enums.DsxChannel; import info.bitrich.xchangestream.dsx.dto.enums.DsxEventType; import info.bitrich.xchangestream.dsx.dto.enums.DsxInstrumentType; import java.beans.ConstructorProperties; public class DsxWebSocketInstrumentSubscriptionMessage extends DsxWebSocketSubscriptionMessage { + private final DsxChannel channel; private final DsxInstrumentType instrumentType; private final String instrument; - @ConstructorProperties({"rid", "event", "instrument", "instrumentType"}) - public DsxWebSocketInstrumentSubscriptionMessage(long rid, DsxEventType event, String instrument, DsxInstrumentType instrumentType) { + @ConstructorProperties({"rid", "event", "channel", "instrument", "instrumentType"}) + public DsxWebSocketInstrumentSubscriptionMessage(long rid, DsxEventType event, DsxChannel channel, String instrument, DsxInstrumentType instrumentType) { super(rid, event); + this.channel = channel; this.instrument = instrument; this.instrumentType = instrumentType; } @@ -23,4 +26,8 @@ public DsxInstrumentType getInstrumentType() { public String getInstrument() { return instrument; } + + public DsxChannel getChannel() { + return channel; + } } From 26f4b95ca948edb95c80fc611e8ee5a293a3fe73 Mon Sep 17 00:00:00 2001 From: "evgenii.ilichev" Date: Tue, 10 Mar 2020 16:28:29 +0300 Subject: [PATCH 3/5] support authorized channels --- .../netty/StreamingObjectMapperHelper.java | 1 + .../dsx/DsxStreamingAccountService.java | 42 ++++ .../dsx/DsxStreamingExchange.java | 27 ++- .../dsx/DsxStreamingMarketDataService.java | 6 +- .../dsx/DsxStreamingMessageAdapter.java | 172 ++++++++++++++++ .../dsx/DsxStreamingService.java | 183 ++++++++++++++---- .../dsx/DsxStreamingTradeService.java | 14 +- .../dsx/DsxSubscriptionHelper.java | 13 ++ .../xchangestream/dsx/dto/DsxOrderBook.java | 4 +- .../dsx/dto/enums/DsxChannel.java | 17 +- .../dsx/dto/enums/DsxEventType.java | 26 +-- .../dto/messages/DsxAuthBalanceMessage.java | 61 ++++++ .../dsx/dto/messages/DsxAuthMessage.java | 24 ++- .../dsx/dto/messages/DsxOrderbookMessage.java | 2 +- .../DsxStreamingMarketDataServiceTest.java | 2 +- 15 files changed, 525 insertions(+), 69 deletions(-) create mode 100644 xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingAccountService.java create mode 100644 xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingMessageAdapter.java create mode 100644 xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthBalanceMessage.java diff --git a/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/StreamingObjectMapperHelper.java b/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/StreamingObjectMapperHelper.java index 10ce1e5ac..678663dc8 100644 --- a/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/StreamingObjectMapperHelper.java +++ b/service-netty/src/main/java/info/bitrich/xchangestream/service/netty/StreamingObjectMapperHelper.java @@ -19,6 +19,7 @@ public class StreamingObjectMapperHelper { objectMapper.enable(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS); objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); objectMapper.setSerializationInclusion(JsonInclude.Include.NON_DEFAULT); + objectMapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); } private StreamingObjectMapperHelper() { diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingAccountService.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingAccountService.java new file mode 100644 index 000000000..7966c9f4f --- /dev/null +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingAccountService.java @@ -0,0 +1,42 @@ +package info.bitrich.xchangestream.dsx; + +import info.bitrich.xchangestream.core.StreamingAccountService; +import info.bitrich.xchangestream.dsx.dto.messages.DsxAuthBalanceMessage; +import io.reactivex.Observable; +import org.knowm.xchange.currency.Currency; +import org.knowm.xchange.dto.account.Balance; +import org.knowm.xchange.exceptions.ExchangeException; +import org.knowm.xchange.exceptions.ExchangeSecurityException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DsxStreamingAccountService implements StreamingAccountService { + + private static final Logger LOG = LoggerFactory.getLogger(DsxStreamingAccountService.class); + + private final DsxStreamingService service; + + public DsxStreamingAccountService(DsxStreamingService service) { + this.service = service; + } + + @Override + public Observable getBalanceChanges(Currency currency, Object... args) { + if (args.length == 0 || !Integer.class.isInstance(args[0])) { + throw new ExchangeException("Specify faId to monitor balance stream"); + } + Integer faId = (Integer) args[0]; + return getRawAuthenticatedBalances() + .filter(b -> b.getFaId().equals(faId)) + .filter(b -> currency.getCurrencyCode().equals(b.getCurrency())) + .map(DsxStreamingMessageAdapter::adaptBalance); + } + + public Observable getRawAuthenticatedBalances() { + if (!service.isAuthDataProvided()) { + throw new ExchangeSecurityException("Not authenticated"); + } + return service.getAuthenticatedBalances(); + } +} diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingExchange.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingExchange.java index 5ef0b2d79..de969f3c6 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingExchange.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingExchange.java @@ -3,12 +3,12 @@ import info.bitrich.xchangestream.core.ProductSubscription; import info.bitrich.xchangestream.core.StreamingExchange; import info.bitrich.xchangestream.core.StreamingMarketDataService; -import info.bitrich.xchangestream.core.StreamingTradeService; import io.reactivex.Completable; import io.reactivex.Observable; import org.apache.commons.lang3.StringUtils; import org.knowm.xchange.ExchangeSpecification; import org.knowm.xchange.dsx.DSXExchange; +import org.knowm.xchange.exceptions.ExchangeSecurityException; import static info.bitrich.xchangestream.service.ConnectableService.BEFORE_CONNECTION_HANDLER; @@ -22,6 +22,7 @@ public class DsxStreamingExchange extends DSXExchange implements StreamingExchan private DsxStreamingService streamingService; private DsxStreamingMarketDataService streamingMarketDataService; private DsxStreamingTradeService streamingTradeService; + private DsxStreamingAccountService streamingAccountService; public DsxStreamingExchange() { } @@ -34,6 +35,7 @@ protected void initServices() { streamingService.setBeforeConnectionHandler((Runnable) getExchangeSpecification().getExchangeSpecificParametersItem(BEFORE_CONNECTION_HANDLER)); streamingMarketDataService = new DsxStreamingMarketDataService(streamingService); streamingTradeService = new DsxStreamingTradeService(streamingService); + streamingAccountService = new DsxStreamingAccountService(streamingService); } private DsxStreamingService createStreamingService(String apiURI) { @@ -83,14 +85,31 @@ public StreamingMarketDataService getStreamingMarketDataService() { return streamingMarketDataService; } - @Override - public StreamingTradeService getStreamingTradeService() { - if (streamingService.isAuthDataProvided()) { + public void authorize() { + if (!streamingService.isAuthDataProvided()) { + throw new ExchangeSecurityException("authenticated data not provided"); + } + if (!streamingService.isAuthorized()) { streamingService.authorize(); } + } + + @Override + public DsxStreamingTradeService getStreamingTradeService() { + if (!streamingService.isAuthDataProvided()) { + throw new ExchangeSecurityException("No authentication data provided"); + } return streamingTradeService; } + @Override + public DsxStreamingAccountService getStreamingAccountService() { + if (!streamingService.isAuthDataProvided()) { + throw new ExchangeSecurityException("No authentication data provided"); + } + return streamingAccountService; + } + @Override public void useCompressedMessages(boolean compressedMessages) { streamingService.useCompressedMessages(compressedMessages); diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingMarketDataService.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingMarketDataService.java index 38d131816..8204a96c3 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingMarketDataService.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingMarketDataService.java @@ -46,7 +46,7 @@ public DsxStreamingMarketDataService(DsxStreamingService service) { @Override public Observable getOrderBook(CurrencyPair currencyPair, Object... args) { - String channelName = createChannelName(DsxChannel.book, currencyPair, args); + String channelName = createChannelName(DsxChannel.BOOK, currencyPair, args); final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); Observable jsonNodeObservable = service.subscribeChannel(channelName, args); @@ -63,7 +63,7 @@ public Observable getOrderBook(CurrencyPair currencyPair, Object... a @Override public Observable getTrades(CurrencyPair currencyPair, Object... args) { - String channelName = createChannelName(DsxChannel.trade, currencyPair, args); + String channelName = createChannelName(DsxChannel.TRADE, currencyPair, args); final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); Observable jsonNodeObservable = service.subscribeChannel(channelName, args); return jsonNodeObservable @@ -82,7 +82,7 @@ public Observable getTrades(CurrencyPair currencyPair, Object... args) { @Override public Observable getTicker(CurrencyPair currencyPair, Object... args) { - String channelName = createChannelName(DsxChannel.ticker, currencyPair, args); + String channelName = createChannelName(DsxChannel.TICKER, currencyPair, args); final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper(); Observable jsonNodeObservable = service.subscribeChannel(channelName); return jsonNodeObservable diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingMessageAdapter.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingMessageAdapter.java new file mode 100644 index 000000000..1cc464b75 --- /dev/null +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingMessageAdapter.java @@ -0,0 +1,172 @@ +package info.bitrich.xchangestream.dsx; + +import com.fasterxml.jackson.databind.JsonNode; +import info.bitrich.xchangestream.dsx.dto.messages.DsxAuthBalanceMessage; +import io.reactivex.annotations.Nullable; +import org.knowm.xchange.currency.Currency; +import org.knowm.xchange.currency.CurrencyPair; +import org.knowm.xchange.dsx.dto.trade.ClientDeal; +import org.knowm.xchange.dsx.dto.trade.DSXOrder; +import org.knowm.xchange.dto.Order; +import org.knowm.xchange.dto.account.Balance; +import org.knowm.xchange.dto.trade.LimitOrder; +import org.knowm.xchange.dto.trade.MarketOrder; +import org.knowm.xchange.dto.trade.UserTrade; +import org.knowm.xchange.utils.DateUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.util.Iterator; +import java.util.Map; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +import static java.util.stream.StreamSupport.stream; +import static org.knowm.xchange.dto.Order.OrderType.ASK; +import static org.knowm.xchange.dto.Order.OrderType.BID; + +public class DsxStreamingMessageAdapter { + private static final Logger LOG = LoggerFactory.getLogger(DsxStreamingMessageAdapter.class); + + static Stream adaptBalances(JsonNode balances) { + Iterator> iterator = balances.fields(); + return Stream.of(StreamSupport.stream( + Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .map(e -> { + Iterable faBalances = () -> e.getValue().iterator(); + return stream(faBalances.spliterator(), false) + .map(b -> createBalanceMessage(Integer.parseInt(e.getKey()), b)) + .peek(o -> LOG.debug("Balance: {}", o)); + } + ) + ).flatMap(x -> x.flatMap(z -> z)); + } + + @Nullable + static DsxAuthBalanceMessage adaptBalance(JsonNode balance) { + Map.Entry balanceObject = balance.fields().next(); + if (balanceObject == null) { + return null; + } + DsxAuthBalanceMessage balanceMessage = createBalanceMessage(Integer.parseInt(balanceObject.getKey()), balanceObject.getValue().get(0)); + LOG.debug("Balance: {}", balanceMessage); + return balanceMessage; + } + + static Balance adaptBalance(DsxAuthBalanceMessage authBalance) { + return new Balance( + Currency.getInstance(authBalance.getCurrency()), + authBalance.getTotal(), + authBalance.getAvailable() + ); + } + + + static Stream adaptOrders(JsonNode orders) { + Iterable iterator = () -> orders.iterator(); + return stream(iterator.spliterator(), false) + .map(DsxStreamingMessageAdapter::createOrderMessage) + .peek(o -> LOG.debug("New order: {}", o)); + } + + @Nullable + static DSXOrder adaptOrder(JsonNode order) { + JsonNode orderObject = order.get(0); + if (orderObject == null) { + return null; + } + DSXOrder aom = createOrderMessage(orderObject); + LOG.debug("New order: {}", aom); + return aom; + } + + static Order adaptOrder(DSXOrder authOrder) { + + return authOrder.getOrderType().equals(DSXOrder.OrderType.limit) ? + new LimitOrder( + authOrder.getType().equals(DSXOrder.Type.buy) ? Order.OrderType.ASK : Order.OrderType.BID, + authOrder.getVolume(), new CurrencyPair(authOrder.getPair()), null, DateUtils.fromMillisUtc(Long.parseLong(authOrder.getTimestampCreated())), + authOrder.getRate()) : + new MarketOrder(authOrder.getType().equals(DSXOrder.Type.buy) ? Order.OrderType.ASK : Order.OrderType.BID, + authOrder.getVolume(), new CurrencyPair(authOrder.getPair())); + } + + //trades + static Stream adaptTrades(JsonNode trades) { + Iterable iterator = () -> trades.iterator(); + return stream(iterator.spliterator(), false) + .map(DsxStreamingMessageAdapter::createTradeMessage) + .peek(o -> LOG.debug("New trade: {}", o)); + } + + @Nullable + static ClientDeal adaptTrade(JsonNode order) { + JsonNode tradeObject = order.get(0); + if (tradeObject == null) { + return null; + } + ClientDeal atm = createTradeMessage(tradeObject); + LOG.debug("New trade: {}", atm); + return atm; + } + + static UserTrade adaptTrade(ClientDeal tradeMessage) { + return new UserTrade.Builder() + .currencyPair(new CurrencyPair(tradeMessage.getPair())) + .feeAmount(tradeMessage.getCommission()) + .feeCurrency(new Currency(tradeMessage.getCommissionCurrency())) + .id(Long.toString(tradeMessage.getNumber())) + .orderId(Long.toString(tradeMessage.getOrderId())) + .originalAmount(tradeMessage.getVolume()) + .price(tradeMessage.getRate()) + .timestamp(DateUtils.fromMillisUtc(tradeMessage.getTimestamp())) + .type(tradeMessage.getType().equals(DSXOrder.Type.buy) ? ASK : BID) + .build(); + } + + private static DsxAuthBalanceMessage createBalanceMessage(Integer faId, JsonNode balance) { + if (balance.size() < 5) { + LOG.error("createBalanceObject unexpected record size={}, record={}", balance.size(), balance.toString()); + return null; + } + String currency = balance.get("currency").asText(); + BigDecimal total = balance.get("total").decimalValue(); + BigDecimal held = balance.get("held").decimalValue(); + BigDecimal locked = balance.get("locked").decimalValue(); + BigDecimal available = balance.get("available").asText().equals("null") ? total.subtract(held).subtract(locked) : balance.get("available").decimalValue(); + + return new DsxAuthBalanceMessage(faId, currency, total, held, locked, available); + } + + private static ClientDeal createTradeMessage(JsonNode trade) { + + long number = trade.get("number").asLong(); + String pair = trade.get("pair").asText(); + DSXOrder.Type type = DSXOrder.Type.valueOf(trade.get("type").asText().toLowerCase()); + BigDecimal volume = trade.get("volume").decimalValue(); + BigDecimal rate = trade.get("rate").decimalValue(); + long orderId = trade.get("orderId").asLong(); + long timestamp = trade.get("timestamp").asLong(); + BigDecimal commission = trade.get("commission").decimalValue(); + String commissionCurrency = trade.get("commissionCurrency").asText(); + + return new ClientDeal(number, pair, type.toString(), volume, rate, orderId, timestamp, commission, commissionCurrency); + } + + private static DSXOrder createOrderMessage(JsonNode order) { + + String pair = order.get("pair").asText(); + DSXOrder.Type type = DSXOrder.Type.valueOf(order.get("type").asText().toLowerCase()); + BigDecimal volume = order.get("volume").decimalValue(); + BigDecimal remainingVolume = order.get("remainingVolume").decimalValue(); + BigDecimal rate = order.get("rate").decimalValue(); + int status = order.get("status").asInt(); + DSXOrder.OrderType orderType = DSXOrder.OrderType.valueOf(order.get("orderType").asText().toLowerCase()); + String timestampCreated = order.get("timestampCreated").asText(); + + return new DSXOrder(pair, type, volume, remainingVolume, rate, status, orderType, timestampCreated); + } +} diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingService.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingService.java index 4844da88e..679f76967 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingService.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingService.java @@ -2,15 +2,18 @@ import com.fasterxml.jackson.databind.JsonNode; import info.bitrich.xchangestream.dsx.dto.DsxChannelInfo; +import info.bitrich.xchangestream.dsx.dto.enums.DsxChannel; import info.bitrich.xchangestream.dsx.dto.enums.DsxEventType; import info.bitrich.xchangestream.dsx.dto.messages.DsxAuthMessage; -import info.bitrich.xchangestream.dsx.dto.messages.DsxTradeMessage; +import info.bitrich.xchangestream.dsx.dto.messages.DsxAuthBalanceMessage; import info.bitrich.xchangestream.dsx.dto.messages.DsxWebSocketSubscriptionMessage; import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler; import io.reactivex.Observable; import io.reactivex.subjects.PublishSubject; import org.apache.commons.lang3.StringUtils; +import org.knowm.xchange.dsx.dto.trade.ClientDeal; +import org.knowm.xchange.dsx.dto.trade.DSXOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import si.mazi.rescu.SynchronizedValueFactory; @@ -22,9 +25,12 @@ import java.nio.charset.StandardCharsets; import java.security.InvalidKeyException; import java.security.NoSuchAlgorithmException; +import java.util.Arrays; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import static info.bitrich.xchangestream.dsx.DsxStreamingMessageAdapter.adaptBalance; +import static info.bitrich.xchangestream.dsx.DsxStreamingMessageAdapter.adaptBalances; import static info.bitrich.xchangestream.dsx.DsxSubscriptionHelper.CHANNEL_DELIMITER; import static org.knowm.xchange.service.BaseParamsDigest.HMAC_SHA_512; @@ -43,7 +49,27 @@ public class DsxStreamingService extends JsonNettyStreamingService { private final Map requests = new ConcurrentHashMap<>(); private final Map lastTradeIds = new ConcurrentHashMap<>(); - private final PublishSubject subjectTrade = PublishSubject.create(); + private final PublishSubject subjectTrade = PublishSubject.create(); + private final PublishSubject subjectBalance = PublishSubject.create(); + private final PublishSubject subjectOrder = PublishSubject.create(); + + private final SynchronizedValueFactory nonceFactory; + + private String apiKey; + private String apiSecret; + private boolean isAuthorized = false; + + public Observable getAuthenticatedTrades() { + return subjectTrade.share(); + } + + public Observable getAuthenticatedBalances() { + return subjectBalance.share(); + } + + public Observable getAuthenticatedOrders() { + return subjectOrder.share(); + } public DsxStreamingService(String apiUrl, SynchronizedValueFactory nonceFactory) { super(apiUrl, Integer.MAX_VALUE); @@ -59,6 +85,22 @@ public void setLastTradeId(String channelName, long lastTradeId) { this.lastTradeIds.put(channelName, lastTradeId); } + void setApiKey(String apiKey) { + this.apiKey = apiKey; + } + + void setApiSecret(String apiSecret) { + this.apiSecret = apiSecret; + } + + public boolean isAuthorized() { + return isAuthorized; + } + + boolean isAuthDataProvided() { + return StringUtils.isNotEmpty(apiKey) && StringUtils.isNotEmpty(apiSecret); + } + @Override protected String getChannelNameFromMessage(JsonNode message) { if (message.has(JSON_CHANNEL) && message.has(JSON_INSTRUMENT) && message.has(JSON_INSTRUMENT_TYPE)) { @@ -76,26 +118,34 @@ protected void handleMessage(JsonNode message) { DsxEventType dsxEvent = DsxEventType.getEvent(eventName); if (dsxEvent != null) { switch (dsxEvent) { - case authorize: + case AUTHORIZE: LOG.debug("Message received: {}", message.toString()); processAuthorizeResponse(message); break; - case snapshot: - case update: + case SNAPSHOT: + LOG.debug("Snapshot message received: {}", message.toString()); + break; + case UPDATE: LOG.debug("Message received: {}", message.toString()); super.handleMessage(message); break; - case heartbeat: + case HEARTBEAT: LOG.debug("Heartbeat has been received"); break; - case subscribed: - case unsubscribed: - case unsubscriptionFailed: - case subscriptionFailed: + case SUBSCRIBED: + case UNSUBSCRIBED: + case UNSUBSCRIPTION_FAILED: + case SUBSCRIPTION_FAILED: processRequestId(message, dsxEvent); break; default: } + } else { + String channelName = message.get(JSON_CHANNEL).asText(); + DsxChannel channel = DsxChannel.getChannel(channelName); + if (DsxChannel.AUTHORIZED == channel) { + processAuthorizedMessage(message); + } } } else { LOG.debug("Unknown message received: {}", message.toString()); @@ -114,18 +164,11 @@ private void processRequestId(JsonNode message, DsxEventType event) { LOG.warn("Unknown request has been successfully processed. Result: '{}'. Message: {}", event, message.toString()); } - private void processAuthorizeResponse(JsonNode message) { - LOG.info("Process authorize msg {}", message); - if (message.get(JSON_ERROR_CODE) == null) { - isAuthorized = true; - } - } - @Override public String getSubscribeMessage(String channelName, Object... args) throws IOException { DsxChannelInfo channelInfo = DsxSubscriptionHelper.parseChannelName(channelName); channelInfo.setLastTradeId(lastTradeIds.computeIfAbsent(channelName, chName -> 0L)); - DsxWebSocketSubscriptionMessage message = channelInfo.getChannel().subscriptionMessageCreator.apply(channelInfo, DsxEventType.subscribe, args); + DsxWebSocketSubscriptionMessage message = channelInfo.getChannel().subscriptionMessageCreator.apply(channelInfo, DsxEventType.SUBSCRIBE, args); requests.put(message.getRid(), channelInfo); LOG.info("Subscription message for channel {} has been generated. RequestId {}", channelName, message.getRid()); return objectMapper.writeValueAsString(message); @@ -134,34 +177,107 @@ public String getSubscribeMessage(String channelName, Object... args) throws IOE @Override public String getUnsubscribeMessage(String channelName) throws IOException { DsxChannelInfo channelInfo = DsxSubscriptionHelper.parseChannelName(channelName); - DsxWebSocketSubscriptionMessage message = DsxSubscriptionHelper.createBaseSubscriptionMessage(channelInfo, DsxEventType.unsubscribe); + DsxWebSocketSubscriptionMessage message = DsxSubscriptionHelper.createBaseSubscriptionMessage(channelInfo, DsxEventType.UNSUBSCRIBE); requests.put(message.getRid(), channelInfo); LOG.info("Unsubscription message for channel {} has been generated. RequestId {}", channelName, message.getRid()); return objectMapper.writeValueAsString(message); } - public Observable getAuthenticatedTrades() { - return subjectTrade.share(); - } + private enum AuthorizedChannelDataType { + BALANCE, + ORDER, + TRADE; - boolean isAuthDataProvided() { - return StringUtils.isNotEmpty(apiKey) && StringUtils.isNotEmpty(apiSecret); + public static AuthorizedChannelDataType getEvent(String type) { + return Arrays.stream(AuthorizedChannelDataType.values()) + .filter(e -> StringUtils.equalsIgnoreCase(type, e.name())) + .findFirst() + .orElse(null); + } } - private String apiKey; - private String apiSecret; + private void processAuthorizedMessage(JsonNode message) { + AuthorizedChannelDataType infoType = AuthorizedChannelDataType.getEvent(message.get("infoType").asText()); + JsonNode payload = message.get("rawPayload"); - private boolean isAuthorized = false; + switch (infoType) { + case BALANCE: + processAuthorizedMessageBalance(payload); + break; + case ORDER: + processAuthorizedMessageOrder(payload); + break; + case TRADE: + processAuthorizedMessageTrade(payload); + break; + default: + break; + } + } - void setApiKey(String apiKey) { - this.apiKey = apiKey; + private void processAuthorizedMessageBalance(JsonNode rawPayload) { + String eventName = rawPayload.get(JSON_EVENT).asText(); + DsxEventType dsxEvent = DsxEventType.getEvent(eventName); + if (dsxEvent != null) { + switch (dsxEvent) { + case SNAPSHOT: + adaptBalances(rawPayload.get("balances")).forEach(subjectBalance::onNext); + break; + case UPDATE: + DsxAuthBalanceMessage balance = adaptBalance(rawPayload.get("balances")); + if (balance != null) + subjectBalance.onNext(balance); + break; + default: + break; + } + } } - void setApiSecret(String apiSecret) { - this.apiSecret = apiSecret; + private void processAuthorizedMessageOrder(JsonNode rawPayload) { + String eventName = rawPayload.get(JSON_EVENT).asText(); + DsxEventType dsxEvent = DsxEventType.getEvent(eventName); + if (dsxEvent != null) { + switch (dsxEvent) { + case SNAPSHOT: + DsxStreamingMessageAdapter.adaptOrders(rawPayload.get("orders")).forEach(subjectOrder::onNext); + break; + case UPDATE: + DSXOrder order = DsxStreamingMessageAdapter.adaptOrder(rawPayload.get("orders")); + if (order != null) + subjectOrder.onNext(order); + break; + default: + break; + } + } } - private final SynchronizedValueFactory nonceFactory; + private void processAuthorizedMessageTrade(JsonNode rawPayload) { + String eventName = rawPayload.get(JSON_EVENT).asText(); + DsxEventType dsxEvent = DsxEventType.getEvent(eventName); + if (dsxEvent != null) { + switch (dsxEvent) { + case SNAPSHOT: + DsxStreamingMessageAdapter.adaptTrades(rawPayload.get("trades")).forEach(subjectTrade::onNext); + break; + case UPDATE: + ClientDeal trade = DsxStreamingMessageAdapter.adaptTrade(rawPayload.get("trades")); + if (trade != null) + subjectTrade.onNext(trade); + break; + default: + break; + } + } + } + + private void processAuthorizeResponse(JsonNode message) { + LOG.debug("Process authorize msg {}", message); + if (message.get(JSON_ERROR_CODE) == null) { + isAuthorized = true; + } + } public void authorize() { long nonce = nonceFactory.createValue(); @@ -174,8 +290,7 @@ public void authorize() { byte[] result = macEncoder.doFinal(payload.getBytes(StandardCharsets.UTF_8)); signature = DatatypeConverter.printHexBinary(result); - DsxAuthMessage message = new DsxAuthMessage( - DsxEventType.authorize, apiKey, String.valueOf(nonce), signature.toLowerCase()); + DsxAuthMessage message = DsxSubscriptionHelper.createAuthMessage(0L, apiKey, String.valueOf(nonce), signature.toLowerCase()); sendObjectMessage(message); } catch (NoSuchAlgorithmException | InvalidKeyException e) { LOG.error("auth. Sign failed error={}", e.getMessage()); diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingTradeService.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingTradeService.java index ea3e06c15..6fc29c55c 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingTradeService.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingTradeService.java @@ -1,9 +1,10 @@ package info.bitrich.xchangestream.dsx; import info.bitrich.xchangestream.core.StreamingTradeService; -import info.bitrich.xchangestream.dsx.dto.messages.DsxTradeMessage; import io.reactivex.Observable; import org.knowm.xchange.currency.CurrencyPair; +import org.knowm.xchange.dsx.dto.trade.ClientDeal; +import org.knowm.xchange.dsx.dto.trade.DSXOrder; import org.knowm.xchange.dto.Order; import org.knowm.xchange.dto.trade.UserTrade; import org.knowm.xchange.exceptions.ExchangeSecurityException; @@ -24,13 +25,14 @@ public DsxStreamingTradeService(DsxStreamingService service) { @Override public Observable getOrderChanges(CurrencyPair currencyPair, Object... args) { - return null; + return getAuthenticatedOrders() + .map(DsxStreamingMessageAdapter::adaptOrder); } public Observable getUserTrades() { return getAuthenticatedTrades() - .map(t -> new UserTrade.Builder().build()); + .map(DsxStreamingMessageAdapter::adaptTrade); } @Override @@ -39,10 +41,14 @@ public Observable getUserTrades(CurrencyPair currencyPair, Object... .filter(t -> currencyPair.equals(t.getCurrencyPair())); } - public Observable getAuthenticatedTrades() { + public Observable getAuthenticatedTrades() { return withAuthenticatedService(DsxStreamingService::getAuthenticatedTrades); } + public Observable getAuthenticatedOrders() { + return withAuthenticatedService(DsxStreamingService::getAuthenticatedOrders); + } + private Observable withAuthenticatedService(Function> serviceConsumer) { if (!service.isAuthDataProvided()) { throw new ExchangeSecurityException("Not authenticated"); diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxSubscriptionHelper.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxSubscriptionHelper.java index a3ae8e9a2..4d5724aa1 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxSubscriptionHelper.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxSubscriptionHelper.java @@ -4,6 +4,7 @@ import info.bitrich.xchangestream.dsx.dto.enums.DsxChannel; import info.bitrich.xchangestream.dsx.dto.enums.DsxEventType; import info.bitrich.xchangestream.dsx.dto.enums.DsxInstrumentType; +import info.bitrich.xchangestream.dsx.dto.messages.DsxAuthMessage; import info.bitrich.xchangestream.dsx.dto.messages.DsxWebSocketBookSubscriptionMessage; import info.bitrich.xchangestream.dsx.dto.messages.DsxWebSocketInstrumentSubscriptionMessage; import info.bitrich.xchangestream.dsx.dto.messages.DsxWebSocketSubscriptionMessage; @@ -51,6 +52,18 @@ public static DsxWebSocketSubscriptionMessage createBaseSubscriptionMessage(DsxC return new DsxWebSocketInstrumentSubscriptionMessage(generateRequestId(), eventType, channelInfo.getChannel(), channelInfo.getInstrument(), channelInfo.getInstrumentType()); } + public static DsxAuthMessage createAuthMessage(Long dealsFrom, String apiKey, String authNonce, String authSig) { + return new DsxAuthMessage( + generateRequestId(), + DsxEventType.AUTHORIZE, + DsxChannel.AUTHORIZED, + dealsFrom, + apiKey, + authNonce, + authSig + ); + } + public static DsxChannelInfo parseChannelName(String channelName) { String[] channelNameParts = StringUtils.split(channelName, CHANNEL_DELIMITER); if (channelNameParts.length < 3) { diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/DsxOrderBook.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/DsxOrderBook.java index d82239550..35dd5e2df 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/DsxOrderBook.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/DsxOrderBook.java @@ -52,10 +52,10 @@ private void createFromMessage(DsxOrderbookMessage message) { public DsxOrderBook toDSXOrderBook(DsxOrderbookMessage message) { switch (message.getEvent()) { - case snapshot: { + case SNAPSHOT: { return message.getTimestamp() < this.getTimestamp() ? this : new DsxOrderBook(message); } - case update: { + case UPDATE: { return this.updateOrderBook(message); } } diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/enums/DsxChannel.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/enums/DsxChannel.java index 7fff82ec9..989d4cddb 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/enums/DsxChannel.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/enums/DsxChannel.java @@ -4,15 +4,26 @@ import info.bitrich.xchangestream.dsx.DsxSubscriptionHelper.TriFunction; import info.bitrich.xchangestream.dsx.dto.DsxChannelInfo; import info.bitrich.xchangestream.dsx.dto.messages.DsxWebSocketSubscriptionMessage; +import org.apache.commons.lang3.StringUtils; + +import java.util.Arrays; public enum DsxChannel { - book(DsxSubscriptionHelper::createBookSubscriptionMessage), - ticker((channelInfo, event, objects) -> DsxSubscriptionHelper.createBaseSubscriptionMessage(channelInfo, event)), - trade(DsxSubscriptionHelper::createTradeSubscriptionMessage); + BOOK(DsxSubscriptionHelper::createBookSubscriptionMessage), + TICKER((channelInfo, event, objects) -> DsxSubscriptionHelper.createBaseSubscriptionMessage(channelInfo, event)), + TRADE(DsxSubscriptionHelper::createTradeSubscriptionMessage), + AUTHORIZED(null); public final TriFunction subscriptionMessageCreator; DsxChannel(TriFunction subscriptionMessageCreator) { this.subscriptionMessageCreator = subscriptionMessageCreator; } + + public static DsxChannel getChannel(String channel) { + return Arrays.stream(DsxChannel.values()) + .filter(e -> StringUtils.equalsIgnoreCase(channel, e.name())) + .findFirst() + .orElse(null); + } } diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/enums/DsxEventType.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/enums/DsxEventType.java index 472a8ea01..eac270e5f 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/enums/DsxEventType.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/enums/DsxEventType.java @@ -8,23 +8,23 @@ * @author rimalon */ public enum DsxEventType { - heartbeat, + HEARTBEAT, - subscribe, - subscribed, - subscriptionFailed, + SUBSCRIBE, + SUBSCRIBED, + SUBSCRIPTION_FAILED, - unsubscribeBook, - unsubscribeTrade, - unsubscribeTicker, + UNSUBSCRIBE_BOOK, + UNSUBSCRIBE_TRADE, + UNSUBSCRIBE_TICKER, - unsubscribe, - unsubscribed, - unsubscriptionFailed, + UNSUBSCRIBE, + UNSUBSCRIBED, + UNSUBSCRIPTION_FAILED, - authorize, - snapshot, - update; + AUTHORIZE, + SNAPSHOT, + UPDATE; public static DsxEventType getEvent(String event) { return Arrays.stream(DsxEventType.values()) diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthBalanceMessage.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthBalanceMessage.java new file mode 100644 index 000000000..bc9f07852 --- /dev/null +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthBalanceMessage.java @@ -0,0 +1,61 @@ +package info.bitrich.xchangestream.dsx.dto.messages; + +import java.beans.ConstructorProperties; +import java.math.BigDecimal; + +public class DsxAuthBalanceMessage { + + private final Integer faId; + private final String currency; + private final BigDecimal total; + private final BigDecimal held; + private final BigDecimal locked; + private final BigDecimal available; + + + @ConstructorProperties({"faId", "currency", "total", "held", "locked", "available"}) + public DsxAuthBalanceMessage(Integer faId, String currency, BigDecimal total, BigDecimal held, BigDecimal locked, BigDecimal available) { + this.faId = faId; + this.currency = currency; + this.total = total; + this.held = held; + this.locked = locked; + this.available = available; + } + + public String getCurrency() { + return currency; + } + + public BigDecimal getTotal() { + return total; + } + + public BigDecimal getHeld() { + return held; + } + + public BigDecimal getLocked() { + return locked; + } + + public BigDecimal getAvailable() { + return available; + } + + public Integer getFaId() { + return faId; + } + + @Override + public String toString() { + return "DsxBalanceMessage{" + + "faId='" + faId + '\'' + + ", currency='" + currency + '\'' + + ", total=" + total + + ", held=" + held + + ", locked=" + locked + + ", available=" + available + + '}'; + } +} diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthMessage.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthMessage.java index a92f983cd..b1e8f3853 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthMessage.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthMessage.java @@ -2,20 +2,28 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; +import info.bitrich.xchangestream.dsx.dto.enums.DsxChannel; import info.bitrich.xchangestream.dsx.dto.enums.DsxEventType; -public class DsxAuthMessage extends DsxEventMessage { +public class DsxAuthMessage extends DsxWebSocketSubscriptionMessage { private final String apiKey; private final String authNonce; private final String authSig; + private final DsxChannel channel; + private final long dealsFrom; - @JsonCreator - public DsxAuthMessage(@JsonProperty("event") DsxEventType event, + @JsonCreator(mode = JsonCreator.Mode.PROPERTIES) + public DsxAuthMessage(@JsonProperty("rid") long rid, + @JsonProperty("event") DsxEventType event, + @JsonProperty("channel") DsxChannel channel, + @JsonProperty(value = "dealsFrom", defaultValue = "0L") long dealsFrom, @JsonProperty("apiKey") String apiKey, @JsonProperty("authNonce") String authNonce, @JsonProperty("authSig") String authSig) { - super(event); + super(rid, event); + this.channel = channel; + this.dealsFrom = dealsFrom; this.apiKey = apiKey; this.authNonce = authNonce; this.authSig = authSig; @@ -32,4 +40,12 @@ public String getAuthNonce() { public String getAuthSig() { return authSig; } + + public DsxChannel getChannel() { + return channel; + } + + public long getDealsFrom() { + return dealsFrom; + } } diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxOrderbookMessage.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxOrderbookMessage.java index 26c6fdbd1..ca63bda7c 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxOrderbookMessage.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxOrderbookMessage.java @@ -39,7 +39,7 @@ public long getTimestamp() { } public DsxOrderBook toDsxOrderBook(DsxOrderBook orderbook) { - if (getEvent() == DsxEventType.update) { + if (getEvent() == DsxEventType.UPDATE) { return orderbook.updateOrderBook(this); } return new DsxOrderBook(this); diff --git a/xchange-dsx/src/test/java/info/bitrich/xchangestream/dsx/DsxStreamingMarketDataServiceTest.java b/xchange-dsx/src/test/java/info/bitrich/xchangestream/dsx/DsxStreamingMarketDataServiceTest.java index ce19d044e..2ba952b37 100644 --- a/xchange-dsx/src/test/java/info/bitrich/xchangestream/dsx/DsxStreamingMarketDataServiceTest.java +++ b/xchange-dsx/src/test/java/info/bitrich/xchangestream/dsx/DsxStreamingMarketDataServiceTest.java @@ -32,7 +32,7 @@ public void getOrderBook() throws IOException { Assert.assertNotNull(jsonNode); DsxOrderbookMessage parsedSnapshot = mapper.readValue(jsonNode.toString(), DsxOrderbookMessage.class); DsxOrderBook orderBook = new DsxOrderBook(parsedSnapshot); - Assert.assertEquals(DsxEventType.snapshot, parsedSnapshot.getEvent()); + Assert.assertEquals(DsxEventType.SNAPSHOT, parsedSnapshot.getEvent()); List asks = new ArrayList<>(); LimitOrder askDeleteAfterUpdate = new LimitOrder(OrderType.ASK, BigDecimal.valueOf(15), CurrencyPair.BTC_USD, "", null,BigDecimal.valueOf(103.5)); asks.add(askDeleteAfterUpdate); From 737626e484b5eef66b39abaf083ac7d5a1d5b34f Mon Sep 17 00:00:00 2001 From: "evgenii.ilichev" Date: Mon, 16 Mar 2020 10:44:25 +0300 Subject: [PATCH 4/5] messages converting fixes --- .../dsx/DsxStreamingMessageAdapter.java | 41 +++++++++++-------- .../dsx/DsxStreamingService.java | 8 ++-- .../dsx/DsxStreamingTradeService.java | 4 +- .../dsx/dto/messages/DsxAuthOrderMessage.java | 29 +++++++++++++ 4 files changed, 58 insertions(+), 24 deletions(-) create mode 100644 xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthOrderMessage.java diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingMessageAdapter.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingMessageAdapter.java index 1cc464b75..c39df343a 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingMessageAdapter.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingMessageAdapter.java @@ -2,9 +2,10 @@ import com.fasterxml.jackson.databind.JsonNode; import info.bitrich.xchangestream.dsx.dto.messages.DsxAuthBalanceMessage; +import info.bitrich.xchangestream.dsx.dto.messages.DsxAuthOrderMessage; import io.reactivex.annotations.Nullable; import org.knowm.xchange.currency.Currency; -import org.knowm.xchange.currency.CurrencyPair; +import org.knowm.xchange.dsx.DSXAdapters; import org.knowm.xchange.dsx.dto.trade.ClientDeal; import org.knowm.xchange.dsx.dto.trade.DSXOrder; import org.knowm.xchange.dto.Order; @@ -58,14 +59,14 @@ static DsxAuthBalanceMessage adaptBalance(JsonNode balance) { static Balance adaptBalance(DsxAuthBalanceMessage authBalance) { return new Balance( - Currency.getInstance(authBalance.getCurrency()), + DSXAdapters.adaptCurrency(authBalance.getCurrency()), authBalance.getTotal(), authBalance.getAvailable() ); } - static Stream adaptOrders(JsonNode orders) { + static Stream adaptOrders(JsonNode orders) { Iterable iterator = () -> orders.iterator(); return stream(iterator.spliterator(), false) .map(DsxStreamingMessageAdapter::createOrderMessage) @@ -73,25 +74,29 @@ static Stream adaptOrders(JsonNode orders) { } @Nullable - static DSXOrder adaptOrder(JsonNode order) { + static DsxAuthOrderMessage adaptOrder(JsonNode order) { JsonNode orderObject = order.get(0); if (orderObject == null) { return null; } - DSXOrder aom = createOrderMessage(orderObject); + DsxAuthOrderMessage aom = createOrderMessage(orderObject); LOG.debug("New order: {}", aom); return aom; } - static Order adaptOrder(DSXOrder authOrder) { - + static Order adaptOrder(DsxAuthOrderMessage authOrder) { return authOrder.getOrderType().equals(DSXOrder.OrderType.limit) ? new LimitOrder( - authOrder.getType().equals(DSXOrder.Type.buy) ? Order.OrderType.ASK : Order.OrderType.BID, - authOrder.getVolume(), new CurrencyPair(authOrder.getPair()), null, DateUtils.fromMillisUtc(Long.parseLong(authOrder.getTimestampCreated())), - authOrder.getRate()) : + authOrder.getType().equals(DSXOrder.Type.buy) ? Order.OrderType.ASK : Order.OrderType.BID, + authOrder.getVolume(), + DSXAdapters.adaptCurrencyPair(authOrder.getPair()), + Long.toString(authOrder.getNumber()), + DateUtils.fromMillisUtc(Long.parseLong(authOrder.getTimestampCreated())), + authOrder.getRate() + ) : new MarketOrder(authOrder.getType().equals(DSXOrder.Type.buy) ? Order.OrderType.ASK : Order.OrderType.BID, - authOrder.getVolume(), new CurrencyPair(authOrder.getPair())); + authOrder.getVolume(), + DSXAdapters.adaptCurrencyPair(authOrder.getPair())); } //trades @@ -115,7 +120,7 @@ static ClientDeal adaptTrade(JsonNode order) { static UserTrade adaptTrade(ClientDeal tradeMessage) { return new UserTrade.Builder() - .currencyPair(new CurrencyPair(tradeMessage.getPair())) + .currencyPair(DSXAdapters.adaptCurrencyPair(tradeMessage.getPair())) .feeAmount(tradeMessage.getCommission()) .feeCurrency(new Currency(tradeMessage.getCommissionCurrency())) .id(Long.toString(tradeMessage.getNumber())) @@ -123,7 +128,7 @@ static UserTrade adaptTrade(ClientDeal tradeMessage) { .originalAmount(tradeMessage.getVolume()) .price(tradeMessage.getRate()) .timestamp(DateUtils.fromMillisUtc(tradeMessage.getTimestamp())) - .type(tradeMessage.getType().equals(DSXOrder.Type.buy) ? ASK : BID) + .type(tradeMessage.getType().equals(DSXOrder.Type.buy.name()) ? ASK : BID) .build(); } @@ -149,15 +154,15 @@ private static ClientDeal createTradeMessage(JsonNode trade) { BigDecimal volume = trade.get("volume").decimalValue(); BigDecimal rate = trade.get("rate").decimalValue(); long orderId = trade.get("orderId").asLong(); - long timestamp = trade.get("timestamp").asLong(); + long timestamp = trade.get("timestamp").asLong() * 1000; BigDecimal commission = trade.get("commission").decimalValue(); String commissionCurrency = trade.get("commissionCurrency").asText(); return new ClientDeal(number, pair, type.toString(), volume, rate, orderId, timestamp, commission, commissionCurrency); } - private static DSXOrder createOrderMessage(JsonNode order) { - + private static DsxAuthOrderMessage createOrderMessage(JsonNode order) { + Long number = order.get("number").asLong(); String pair = order.get("pair").asText(); DSXOrder.Type type = DSXOrder.Type.valueOf(order.get("type").asText().toLowerCase()); BigDecimal volume = order.get("volume").decimalValue(); @@ -165,8 +170,8 @@ private static DSXOrder createOrderMessage(JsonNode order) { BigDecimal rate = order.get("rate").decimalValue(); int status = order.get("status").asInt(); DSXOrder.OrderType orderType = DSXOrder.OrderType.valueOf(order.get("orderType").asText().toLowerCase()); - String timestampCreated = order.get("timestampCreated").asText(); + long timestampCreated = order.get("timestampCreated").asLong() * 1000; - return new DSXOrder(pair, type, volume, remainingVolume, rate, status, orderType, timestampCreated); + return new DsxAuthOrderMessage(number, pair, type, volume, remainingVolume, rate, status, orderType, Long.toString(timestampCreated)); } } diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingService.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingService.java index 679f76967..89a294791 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingService.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingService.java @@ -6,6 +6,7 @@ import info.bitrich.xchangestream.dsx.dto.enums.DsxEventType; import info.bitrich.xchangestream.dsx.dto.messages.DsxAuthMessage; import info.bitrich.xchangestream.dsx.dto.messages.DsxAuthBalanceMessage; +import info.bitrich.xchangestream.dsx.dto.messages.DsxAuthOrderMessage; import info.bitrich.xchangestream.dsx.dto.messages.DsxWebSocketSubscriptionMessage; import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService; import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler; @@ -13,7 +14,6 @@ import io.reactivex.subjects.PublishSubject; import org.apache.commons.lang3.StringUtils; import org.knowm.xchange.dsx.dto.trade.ClientDeal; -import org.knowm.xchange.dsx.dto.trade.DSXOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import si.mazi.rescu.SynchronizedValueFactory; @@ -51,7 +51,7 @@ public class DsxStreamingService extends JsonNettyStreamingService { private final PublishSubject subjectTrade = PublishSubject.create(); private final PublishSubject subjectBalance = PublishSubject.create(); - private final PublishSubject subjectOrder = PublishSubject.create(); + private final PublishSubject subjectOrder = PublishSubject.create(); private final SynchronizedValueFactory nonceFactory; @@ -67,7 +67,7 @@ public Observable getAuthenticatedBalances() { return subjectBalance.share(); } - public Observable getAuthenticatedOrders() { + public Observable getAuthenticatedOrders() { return subjectOrder.share(); } @@ -243,7 +243,7 @@ private void processAuthorizedMessageOrder(JsonNode rawPayload) { DsxStreamingMessageAdapter.adaptOrders(rawPayload.get("orders")).forEach(subjectOrder::onNext); break; case UPDATE: - DSXOrder order = DsxStreamingMessageAdapter.adaptOrder(rawPayload.get("orders")); + DsxAuthOrderMessage order = DsxStreamingMessageAdapter.adaptOrder(rawPayload.get("orders")); if (order != null) subjectOrder.onNext(order); break; diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingTradeService.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingTradeService.java index 6fc29c55c..124591ea0 100644 --- a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingTradeService.java +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/DsxStreamingTradeService.java @@ -1,10 +1,10 @@ package info.bitrich.xchangestream.dsx; import info.bitrich.xchangestream.core.StreamingTradeService; +import info.bitrich.xchangestream.dsx.dto.messages.DsxAuthOrderMessage; import io.reactivex.Observable; import org.knowm.xchange.currency.CurrencyPair; import org.knowm.xchange.dsx.dto.trade.ClientDeal; -import org.knowm.xchange.dsx.dto.trade.DSXOrder; import org.knowm.xchange.dto.Order; import org.knowm.xchange.dto.trade.UserTrade; import org.knowm.xchange.exceptions.ExchangeSecurityException; @@ -45,7 +45,7 @@ public Observable getAuthenticatedTrades() { return withAuthenticatedService(DsxStreamingService::getAuthenticatedTrades); } - public Observable getAuthenticatedOrders() { + public Observable getAuthenticatedOrders() { return withAuthenticatedService(DsxStreamingService::getAuthenticatedOrders); } diff --git a/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthOrderMessage.java b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthOrderMessage.java new file mode 100644 index 000000000..15d8bd084 --- /dev/null +++ b/xchange-dsx/src/main/java/info/bitrich/xchangestream/dsx/dto/messages/DsxAuthOrderMessage.java @@ -0,0 +1,29 @@ +package info.bitrich.xchangestream.dsx.dto.messages; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.knowm.xchange.dsx.dto.trade.DSXOrder; + +import java.math.BigDecimal; + +public class DsxAuthOrderMessage extends DSXOrder { + + private final Long number; + + public DsxAuthOrderMessage( + @JsonProperty("number") Long number, + @JsonProperty("pair") String pair, + @JsonProperty("type") Type type, + @JsonProperty("volume") BigDecimal volume, + @JsonProperty("remainingVolume") BigDecimal remainingVolume, + @JsonProperty("rate") BigDecimal rate, + @JsonProperty("status") int status, + @JsonProperty("orderType") OrderType orderType, + @JsonProperty("timestampCreated") String timestampCreated) { + super(pair, type, volume, remainingVolume, rate, status, orderType, timestampCreated); + this.number = number; + } + + public Long getNumber() { + return number; + } +} From 19d20b1881291653061f8305fc87de157146a667 Mon Sep 17 00:00:00 2001 From: "evgenii.ilichev" Date: Mon, 16 Mar 2020 10:45:02 +0300 Subject: [PATCH 5/5] update version --- pom.xml | 2 +- service-core/pom.xml | 2 +- service-netty/pom.xml | 2 +- service-pubnub/pom.xml | 2 +- service-pusher/pom.xml | 2 +- service-wamp/pom.xml | 2 +- xchange-bankera/pom.xml | 2 +- xchange-binance/pom.xml | 2 +- xchange-bitfinex/pom.xml | 2 +- xchange-bitflyer/pom.xml | 2 +- xchange-bitmex/pom.xml | 2 +- xchange-bitstamp/pom.xml | 2 +- xchange-cexio/pom.xml | 2 +- xchange-coinbasepro/pom.xml | 2 +- xchange-coinmate/pom.xml | 2 +- xchange-dsx/pom.xml | 2 +- xchange-gemini/pom.xml | 2 +- xchange-hitbtc/pom.xml | 2 +- xchange-kraken/pom.xml | 2 +- xchange-okcoin/pom.xml | 2 +- xchange-poloniex/pom.xml | 2 +- xchange-poloniex2/pom.xml | 2 +- xchange-stream-core/pom.xml | 2 +- 23 files changed, 23 insertions(+), 23 deletions(-) diff --git a/pom.xml b/pom.xml index 48c9c5efa..bfca2420c 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ info.bitrich.xchange-stream xchange-stream-parent pom - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT xchange-stream-core diff --git a/service-core/pom.xml b/service-core/pom.xml index 7bdf0e5c1..32e7d232e 100644 --- a/service-core/pom.xml +++ b/service-core/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/service-netty/pom.xml b/service-netty/pom.xml index 1976bc351..ee8103a8f 100644 --- a/service-netty/pom.xml +++ b/service-netty/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/service-pubnub/pom.xml b/service-pubnub/pom.xml index 25501e302..30794160b 100644 --- a/service-pubnub/pom.xml +++ b/service-pubnub/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/service-pusher/pom.xml b/service-pusher/pom.xml index b9634d19a..6c471a6f4 100644 --- a/service-pusher/pom.xml +++ b/service-pusher/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/service-wamp/pom.xml b/service-wamp/pom.xml index 45dd6dc10..33730ade1 100644 --- a/service-wamp/pom.xml +++ b/service-wamp/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-bankera/pom.xml b/xchange-bankera/pom.xml index 74857e02b..15a68ebf7 100644 --- a/xchange-bankera/pom.xml +++ b/xchange-bankera/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-binance/pom.xml b/xchange-binance/pom.xml index c33eacdde..5fc89c340 100644 --- a/xchange-binance/pom.xml +++ b/xchange-binance/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-bitfinex/pom.xml b/xchange-bitfinex/pom.xml index da83bf782..400153908 100644 --- a/xchange-bitfinex/pom.xml +++ b/xchange-bitfinex/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-bitflyer/pom.xml b/xchange-bitflyer/pom.xml index 946d330c1..3729db997 100644 --- a/xchange-bitflyer/pom.xml +++ b/xchange-bitflyer/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-bitmex/pom.xml b/xchange-bitmex/pom.xml index 96bc3bdad..bd0eb0d7c 100644 --- a/xchange-bitmex/pom.xml +++ b/xchange-bitmex/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-bitstamp/pom.xml b/xchange-bitstamp/pom.xml index b440c69e5..84ab05985 100644 --- a/xchange-bitstamp/pom.xml +++ b/xchange-bitstamp/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-cexio/pom.xml b/xchange-cexio/pom.xml index 421b9da37..b2197fb57 100644 --- a/xchange-cexio/pom.xml +++ b/xchange-cexio/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-coinbasepro/pom.xml b/xchange-coinbasepro/pom.xml index 9b456beef..918a359c5 100644 --- a/xchange-coinbasepro/pom.xml +++ b/xchange-coinbasepro/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-coinmate/pom.xml b/xchange-coinmate/pom.xml index fddc445e3..e988796c1 100644 --- a/xchange-coinmate/pom.xml +++ b/xchange-coinmate/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-dsx/pom.xml b/xchange-dsx/pom.xml index 9a5eff105..3c0eb476b 100644 --- a/xchange-dsx/pom.xml +++ b/xchange-dsx/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-gemini/pom.xml b/xchange-gemini/pom.xml index 83d17a662..15c728474 100644 --- a/xchange-gemini/pom.xml +++ b/xchange-gemini/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-hitbtc/pom.xml b/xchange-hitbtc/pom.xml index 61d9cda2e..1a24446e7 100644 --- a/xchange-hitbtc/pom.xml +++ b/xchange-hitbtc/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-kraken/pom.xml b/xchange-kraken/pom.xml index b15e8106b..11ff0452f 100644 --- a/xchange-kraken/pom.xml +++ b/xchange-kraken/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-okcoin/pom.xml b/xchange-okcoin/pom.xml index 8aae47b5c..d87eb60af 100644 --- a/xchange-okcoin/pom.xml +++ b/xchange-okcoin/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-poloniex/pom.xml b/xchange-poloniex/pom.xml index 8b6d83b98..fc4b2c17d 100644 --- a/xchange-poloniex/pom.xml +++ b/xchange-poloniex/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-poloniex2/pom.xml b/xchange-poloniex2/pom.xml index 6a25e4401..e7536d444 100644 --- a/xchange-poloniex2/pom.xml +++ b/xchange-poloniex2/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0 diff --git a/xchange-stream-core/pom.xml b/xchange-stream-core/pom.xml index 1d9228b5b..233d8d9b7 100644 --- a/xchange-stream-core/pom.xml +++ b/xchange-stream-core/pom.xml @@ -3,7 +3,7 @@ xchange-stream-parent info.bitrich.xchange-stream - 4.4.0.3-dsx-SNAPSHOT + 4.4.0.4-dsx-SNAPSHOT 4.0.0