Skip to content

Commit eeafb69

Browse files
feat: pub-sub for hashflow
1 parent 43ae44a commit eeafb69

File tree

4 files changed

+119
-78
lines changed

4 files changed

+119
-78
lines changed

src/dex/hashflow/constants.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@ export const HASHFLOW_BLACKLIST_TTL_S = 60 * 60 * 24 * 7; // 7 days
44

55
export const HASHFLOW_MM_RESTRICT_TTL_S = 60 * 60;
66

7-
export const HASHFLOW_PRICES_CACHES_TTL_S = 3;
7+
export const HASHFLOW_PRICES_CACHES_TTL_S = 5;
88

99
export const HASHFLOW_MARKET_MAKERS_CACHES_TTL_S = 30;
1010

11-
export const HASHFLOW_API_PRICES_POLLING_INTERVAL_MS = 1000;
11+
export const HASHFLOW_API_PRICES_POLLING_INTERVAL_MS = 2000;
1212

1313
export const HASHFLOW_API_MARKET_MAKERS_POLLING_INTERVAL_MS = 28 * 1000; // 28 secs
1414

src/dex/hashflow/hashflow.ts

Lines changed: 11 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
11
import { ChainId } from '@hashflow/sdk';
22
import { Chain, ChainType, HashflowApi } from '@hashflow/taker-js';
3-
import {
4-
MarketMakersResponse,
5-
PriceLevelsResponse,
6-
RfqResponse,
7-
} from '@hashflow/taker-js/dist/types/rest';
3+
import { RfqResponse } from '@hashflow/taker-js/dist/types/rest';
84
import BigNumber from 'bignumber.js';
95
import { Interface } from 'ethers/lib/utils';
106
import { assert } from 'ts-essentials';
@@ -145,7 +141,6 @@ export class Hashflow extends SimpleExchange implements IDex<HashflowData> {
145141
},
146142
headers: { Authorization: this.hashFlowAuthToken },
147143
},
148-
getCachedMarketMakers: this.getCachedMarketMakers.bind(this),
149144
filterMarketMakers: this.getFilteredMarketMakers.bind(this),
150145
pricesCacheKey: this.pricesCacheKey,
151146
pricesCacheTTLSecs: HASHFLOW_PRICES_CACHES_TTL_S,
@@ -157,9 +152,7 @@ export class Hashflow extends SimpleExchange implements IDex<HashflowData> {
157152
}
158153

159154
async initializePricing(blockNumber: number): Promise<void> {
160-
if (!this.dexHelper.config.isSlave) {
161-
this.rateFetcher.start();
162-
}
155+
this.rateFetcher.start();
163156

164157
return;
165158
}
@@ -198,7 +191,7 @@ export class Hashflow extends SimpleExchange implements IDex<HashflowData> {
198191
return [];
199192
}
200193

201-
const levels = (await this.getCachedLevels()) || {};
194+
const levels = (await this.rateFetcher.getCachedLevels()) || {};
202195
const makers = Object.keys(levels);
203196

204197
return makers
@@ -381,32 +374,6 @@ export class Hashflow extends SimpleExchange implements IDex<HashflowData> {
381374
return undefined;
382375
}
383376

384-
async getCachedMarketMakers(): Promise<
385-
MarketMakersResponse['marketMakers'] | null
386-
> {
387-
const cachedMarketMakers = await this.dexHelper.cache.rawget(
388-
this.marketMakersCacheKey,
389-
);
390-
391-
if (cachedMarketMakers) {
392-
return JSON.parse(
393-
cachedMarketMakers,
394-
) as MarketMakersResponse['marketMakers'];
395-
}
396-
397-
return null;
398-
}
399-
400-
async getCachedLevels(): Promise<PriceLevelsResponse['levels'] | null> {
401-
const cachedLevels = await this.dexHelper.cache.rawget(this.pricesCacheKey);
402-
403-
if (cachedLevels) {
404-
return JSON.parse(cachedLevels) as PriceLevelsResponse['levels'];
405-
}
406-
407-
return null;
408-
}
409-
410377
async getPricesVolume(
411378
srcToken: Token,
412379
destToken: Token,
@@ -434,7 +401,7 @@ export class Hashflow extends SimpleExchange implements IDex<HashflowData> {
434401

435402
const marketMakersToUse = pools.map(p => p.split(`${prefix}_`).pop());
436403

437-
const levelsMap = (await this.getCachedLevels()) || {};
404+
const levelsMap = (await this.rateFetcher.getCachedLevels()) || {};
438405

439406
Object.keys(levelsMap).forEach(mmKey => {
440407
if (!marketMakersToUse.includes(mmKey)) {
@@ -715,7 +682,7 @@ export class Hashflow extends SimpleExchange implements IDex<HashflowData> {
715682
this.logger.warn(
716683
`${this.dexKey}-${this.network}: Encountered restricted user=${options.userAddress}. Adding to local blacklist cache`,
717684
);
718-
await this.setBlacklist(options.userAddress);
685+
await this.rateFetcher.setBlacklist(options.userAddress);
719686
} else {
720687
if (e instanceof TooStrictSlippageCheckError) {
721688
this.logger.warn(
@@ -911,33 +878,6 @@ export class Hashflow extends SimpleExchange implements IDex<HashflowData> {
911878
};
912879
}
913880

914-
getBlackListKey(address: Address) {
915-
return `blacklist_${address}`.toLowerCase();
916-
}
917-
918-
async isBlacklisted(txOrigin: Address): Promise<boolean> {
919-
const result = await this.dexHelper.cache.get(
920-
this.dexKey,
921-
this.network,
922-
this.getBlackListKey(txOrigin),
923-
);
924-
return result === 'blacklisted';
925-
}
926-
927-
async setBlacklist(
928-
txOrigin: Address,
929-
ttl: number = HASHFLOW_BLACKLIST_TTL_S,
930-
) {
931-
await this.dexHelper.cache.setex(
932-
this.dexKey,
933-
this.network,
934-
this.getBlackListKey(txOrigin),
935-
ttl,
936-
'blacklisted',
937-
);
938-
return true;
939-
}
940-
941881
async getSimpleParam(
942882
srcToken: string,
943883
destToken: string,
@@ -1058,9 +998,9 @@ export class Hashflow extends SimpleExchange implements IDex<HashflowData> {
1058998
.wrapETH(tokenAddress)
1059999
.toLowerCase();
10601000

1061-
const makers = (await this.getCachedMarketMakers()) || [];
1001+
const makers = (await this.rateFetcher.getCachedMarketMakers()) || [];
10621002
const filteredMakers = await this.getFilteredMarketMakers(makers);
1063-
const pLevels = (await this.getCachedLevels()) || {};
1003+
const pLevels = (await this.rateFetcher.getCachedLevels()) || {};
10641004

10651005
let baseToken: Token | undefined = undefined;
10661006
// TODO: Improve efficiency of this part. Quite inefficient way to determine
@@ -1120,4 +1060,8 @@ export class Hashflow extends SimpleExchange implements IDex<HashflowData> {
11201060
this.rateFetcher.stop();
11211061
}
11221062
}
1063+
1064+
async isBlacklisted(txOrigin: Address): Promise<boolean> {
1065+
return this.rateFetcher.isBlacklisted(txOrigin);
1066+
}
11231067
}

src/dex/hashflow/rate-fetcher.ts

Lines changed: 106 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,33 @@
1+
import {
2+
MarketMakersResponse,
3+
PriceLevelsResponse,
4+
} from '@hashflow/taker-js/dist/types/rest';
15
import { Network } from '../../constants';
26
import { IDexHelper } from '../../dex-helper';
37
import { Fetcher, SkippingRequest } from '../../lib/fetcher/fetcher';
48
import { validateAndCast } from '../../lib/validators';
5-
import { Logger } from '../../types';
9+
import { Address, Logger } from '../../types';
610
import {
711
HashflowMarketMakersResponse,
812
HashflowRateFetcherConfig,
913
HashflowRatesResponse,
1014
} from './types';
1115
import { marketMakersValidator, pricesResponseValidator } from './validators';
16+
import { HASHFLOW_BLACKLIST_TTL_S } from './constants';
17+
import { ExpKeyValuePubSub, NonExpSetPubSub } from '../../lib/pub-sub';
1218

1319
export class RateFetcher {
1420
private rateFetcher: Fetcher<HashflowRatesResponse>;
21+
private ratePubSub: ExpKeyValuePubSub;
1522
private pricesCacheKey: string;
1623
private pricesCacheTTL: number;
1724

1825
private marketMakersFetcher: Fetcher<HashflowMarketMakersResponse>;
1926
private marketMakersCacheKey: string;
2027
private marketMakersCacheTTL: number;
2128

29+
private blacklistedPubSub: NonExpSetPubSub;
30+
2231
constructor(
2332
private dexHelper: IDexHelper,
2433
private dexKey: string,
@@ -48,16 +57,18 @@ export class RateFetcher {
4857
logger,
4958
);
5059

60+
this.ratePubSub = new ExpKeyValuePubSub(dexHelper, dexKey, 'rates');
61+
5162
this.rateFetcher = new Fetcher<HashflowRatesResponse>(
5263
dexHelper.httpRequest,
5364
{
5465
info: {
5566
requestOptions: config.rateConfig.pricesReqParams,
5667
requestFunc: async options => {
57-
const { filterMarketMakers, getCachedMarketMakers } =
58-
config.rateConfig;
68+
const { filterMarketMakers } = config.rateConfig;
5969

60-
const cachedMarketMakers = (await getCachedMarketMakers()) || [];
70+
const cachedMarketMakers =
71+
(await this.getCachedMarketMakers()) || [];
6172
const filteredMarketMakers = await filterMarketMakers(
6273
cachedMarketMakers,
6374
);
@@ -86,11 +97,24 @@ export class RateFetcher {
8697
config.rateConfig.pricesIntervalMs,
8798
logger,
8899
);
100+
101+
this.blacklistedPubSub = new NonExpSetPubSub(
102+
dexHelper,
103+
dexKey,
104+
'blacklisted',
105+
);
89106
}
90107

91-
start() {
92-
this.marketMakersFetcher.startPolling();
93-
this.rateFetcher.startPolling();
108+
async start() {
109+
if (!this.dexHelper.config.isSlave) {
110+
this.marketMakersFetcher.startPolling();
111+
this.rateFetcher.startPolling();
112+
} else {
113+
this.ratePubSub.subscribe();
114+
115+
const allBlacklisted = await this.getAllBlacklisted();
116+
this.blacklistedPubSub.initializeAndSubscribe(allBlacklisted);
117+
}
94118
}
95119

96120
stop() {
@@ -109,10 +133,84 @@ export class RateFetcher {
109133

110134
private handleRatesResponse(resp: HashflowRatesResponse): void {
111135
const { levels } = resp;
112-
this.dexHelper.cache.rawset(
136+
this.dexHelper.cache.setex(
137+
this.dexKey,
138+
this.network,
113139
this.pricesCacheKey,
140+
this.pricesCacheTTL,
114141
JSON.stringify(levels),
142+
);
143+
144+
this.ratePubSub.publish(
145+
{ [this.pricesCacheKey]: levels },
115146
this.pricesCacheTTL,
116147
);
117148
}
149+
150+
async getCachedMarketMakers(): Promise<
151+
MarketMakersResponse['marketMakers'] | null
152+
> {
153+
const cachedMarketMakers = await this.dexHelper.cache.rawget(
154+
this.marketMakersCacheKey,
155+
);
156+
157+
if (cachedMarketMakers) {
158+
return JSON.parse(
159+
cachedMarketMakers,
160+
) as MarketMakersResponse['marketMakers'];
161+
}
162+
163+
return null;
164+
}
165+
166+
async getCachedLevels(): Promise<PriceLevelsResponse['levels'] | null> {
167+
const cachedLevels = await this.ratePubSub.getAndCache(this.pricesCacheKey);
168+
169+
if (cachedLevels) {
170+
return cachedLevels as PriceLevelsResponse['levels'];
171+
}
172+
173+
return null;
174+
}
175+
176+
async isBlacklisted(txOrigin: Address): Promise<boolean> {
177+
return this.blacklistedPubSub.has(txOrigin.toLowerCase());
178+
}
179+
180+
async setBlacklist(
181+
txOrigin: Address,
182+
ttl: number = HASHFLOW_BLACKLIST_TTL_S,
183+
) {
184+
await this.dexHelper.cache.setex(
185+
this.dexKey,
186+
this.network,
187+
this.getBlackListKey(txOrigin),
188+
ttl,
189+
'blacklisted',
190+
);
191+
192+
this.blacklistedPubSub.publish([txOrigin.toLowerCase()]);
193+
194+
return true;
195+
}
196+
197+
async getAllBlacklisted(): Promise<Address[]> {
198+
const defaultKey = this.getBlackListKey('');
199+
const pattern = `${defaultKey}*`;
200+
const allBlacklisted = await this.dexHelper.cache.keys(
201+
this.dexKey,
202+
this.network,
203+
pattern,
204+
);
205+
206+
return allBlacklisted.map(t => this.getAddressFromBlackListKey(t));
207+
}
208+
209+
getBlackListKey(address: Address) {
210+
return `blacklist_${address}`.toLowerCase();
211+
}
212+
213+
getAddressFromBlackListKey(key: Address) {
214+
return (key.split('blacklist_')[1] ?? '').toLowerCase();
215+
}
118216
}

src/dex/hashflow/types.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ export type HashflowRateFetcherConfig = {
6464
};
6565
pricesIntervalMs: number;
6666
markerMakersIntervalMs: number;
67-
getCachedMarketMakers: () => Promise<string[] | null>;
6867
filterMarketMakers: (makers: string[]) => Promise<string[]>;
6968
pricesCacheKey: string;
7069
marketMakersCacheKey: string;

0 commit comments

Comments
 (0)