Skip to content

Commit 97d4459

Browse files
committed
refactor: allow parralel broadcasting
1 parent 1a0edd1 commit 97d4459

File tree

1 file changed

+160
-115
lines changed

1 file changed

+160
-115
lines changed

Diff for: apps/price_pusher/src/injective/injective.ts

+160-115
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
import { HexString, HermesClient } from "@pythnetwork/hermes-client";
22
import {
3-
IPricePusher,
3+
PriceItem,
44
PriceInfo,
5+
IPricePusher,
56
ChainPriceListener,
6-
PriceItem,
77
} from "../interface";
88
import { DurationInSeconds } from "../utils";
99
import {
@@ -37,9 +37,11 @@ type PriceQueryResponse = {
3737
};
3838
};
3939

40-
type UpdateFeeResponse = {
41-
denom: string;
42-
amount: string;
40+
type InjectiveConfig = {
41+
chainId: string;
42+
gasMultiplier: number;
43+
gasPrice: number;
44+
priceIdsProcessChunkSize: number;
4345
};
4446

4547
// this use price without leading 0x
@@ -88,16 +90,11 @@ export class InjectivePriceListener extends ChainPriceListener {
8890
}
8991
}
9092

91-
type InjectiveConfig = {
92-
chainId: string;
93-
gasMultiplier: number;
94-
gasPrice: number;
95-
priceIdsProcessChunkSize: number;
96-
};
9793
export class InjectivePricePusher implements IPricePusher {
98-
private wallet: PrivateKey;
94+
private mnemonic: string;
9995
private chainConfig: InjectiveConfig;
100-
private account: Account | null = null;
96+
private accounts: Record<string, Account | undefined> =
97+
{}; /** { address: Account } */
10198

10299
constructor(
103100
private hermesClient: HermesClient,
@@ -107,8 +104,7 @@ export class InjectivePricePusher implements IPricePusher {
107104
mnemonic: string,
108105
chainConfig?: Partial<InjectiveConfig>,
109106
) {
110-
this.wallet = PrivateKey.fromMnemonic(mnemonic);
111-
107+
this.mnemonic = mnemonic;
112108
this.chainConfig = {
113109
chainId: chainConfig?.chainId ?? INJECTIVE_TESTNET_CHAIN_ID,
114110
gasMultiplier: chainConfig?.gasMultiplier ?? DEFAULT_GAS_MULTIPLIER,
@@ -119,90 +115,64 @@ export class InjectivePricePusher implements IPricePusher {
119115
};
120116
}
121117

122-
private injectiveAddress(): string {
123-
return this.wallet.toBech32();
118+
private getWallet(index: number) {
119+
if (
120+
this.chainConfig.priceIdsProcessChunkSize === -1 ||
121+
this.chainConfig.priceIdsProcessChunkSize === undefined
122+
) {
123+
return PrivateKey.fromMnemonic(this.mnemonic);
124+
}
125+
126+
return PrivateKey.fromMnemonic(this.mnemonic, `m/44'/60'/0'/0/${index}`);
124127
}
125128

126-
private async signAndBroadcastMsg(msg: Msgs): Promise<TxResponse> {
129+
private async signAndBroadcastMsg(
130+
msg: Msgs,
131+
index: number,
132+
): Promise<TxResponse> {
127133
const chainGrpcAuthApi = new ChainGrpcAuthApi(this.grpcEndpoint);
134+
const wallet = this.getWallet(index);
135+
const injectiveAddress = wallet.toAddress().toBech32();
128136

129137
// Fetch the latest account details only if it's not stored.
130-
this.account ??= await chainGrpcAuthApi.fetchAccount(
131-
this.injectiveAddress(),
132-
);
138+
this.accounts[injectiveAddress] ??=
139+
await chainGrpcAuthApi.fetchAccount(injectiveAddress);
133140

134-
const { txRaw: simulateTxRaw } = createTransactionFromMsg({
135-
sequence: this.account.baseAccount.sequence,
136-
accountNumber: this.account.baseAccount.accountNumber,
137-
message: msg,
138-
chainId: this.chainConfig.chainId,
139-
pubKey: this.wallet.toPublicKey().toBase64(),
140-
});
141+
const account = this.accounts[injectiveAddress];
141142

142-
const txService = new TxGrpcApi(this.grpcEndpoint);
143-
// simulation
144143
try {
145-
const {
146-
gasInfo: { gasUsed },
147-
} = await txService.simulate(simulateTxRaw);
148-
149-
// simulation returns us the approximate gas used
150-
// gas passed with the transaction should be more than that
151-
// in order for it to be successfully executed
152-
// this multiplier takes care of that
153-
const gas = (gasUsed * this.chainConfig.gasMultiplier).toFixed();
154-
const fee = {
155-
amount: [
156-
{
157-
denom: "inj",
158-
amount: (Number(gas) * this.chainConfig.gasPrice).toFixed(),
159-
},
160-
],
161-
gas,
162-
};
163-
164144
const { signBytes, txRaw } = createTransactionFromMsg({
165-
sequence: this.account.baseAccount.sequence,
166-
accountNumber: this.account.baseAccount.accountNumber,
145+
sequence: account.baseAccount.sequence,
146+
accountNumber: account.baseAccount.accountNumber,
167147
message: msg,
168148
chainId: this.chainConfig.chainId,
169-
fee,
170-
pubKey: this.wallet.toPublicKey().toBase64(),
149+
fee: await this.getStdFee(msg, index),
150+
pubKey: wallet.toPublicKey().toBase64(),
171151
});
172152

173-
const sig = await this.wallet.sign(Buffer.from(signBytes));
174-
175-
this.account.baseAccount.sequence++;
153+
const sig = await wallet.sign(Buffer.from(signBytes));
176154

177155
/** Append Signatures */
178156
txRaw.signatures = [sig];
157+
179158
// this takes approx 5 seconds
180-
const txResponse = await txService.broadcast(txRaw);
159+
const txResponse = await new TxGrpcApi(this.grpcEndpoint).broadcast(
160+
txRaw,
161+
);
162+
163+
account.baseAccount.sequence++;
181164

182165
return txResponse;
183166
} catch (e: any) {
184-
// The sequence number was invalid and hence we will have to fetch it again.
167+
// The sequence number was invalid and hence we will have to fetch it again
185168
if (JSON.stringify(e).match(/account sequence mismatch/) !== null) {
186-
// We need to fetch the account details again.
187-
this.account = null;
169+
this.accounts[injectiveAddress] = undefined;
188170
}
171+
189172
throw e;
190173
}
191174
}
192175

193-
async getPriceFeedUpdateObject(priceIds: string[]): Promise<any> {
194-
const response = await this.hermesClient.getLatestPriceUpdates(priceIds, {
195-
encoding: "base64",
196-
});
197-
const vaas = response.binary.data;
198-
199-
return {
200-
update_price_feeds: {
201-
data: vaas,
202-
},
203-
};
204-
}
205-
206176
async updatePriceFeed(
207177
priceIds: string[],
208178
pubTimesToPush: number[],
@@ -222,69 +192,42 @@ export class InjectivePricePusher implements IPricePusher {
222192
chunkSize: Number(this.chainConfig.priceIdsProcessChunkSize),
223193
});
224194

225-
for (const [chunkIndex, priceIdChunk] of priceIdChunks.entries()) {
226-
await this.updatePriceFeedChunk(priceIdChunk, chunkIndex);
227-
}
195+
await Promise.all(
196+
priceIdChunks.map((priceIdChunk, chunkIndex) =>
197+
this.updatePriceFeedChunk(priceIdChunk, chunkIndex),
198+
),
199+
);
228200
}
229201

230202
private async updatePriceFeedChunk(
231203
priceIds: string[],
232204
chunkIndex: number,
233205
): Promise<void> {
234-
let priceFeedUpdateObject;
235-
236206
try {
237-
// get the latest VAAs for updatePriceFeed and then push them
238-
priceFeedUpdateObject = await this.getPriceFeedUpdateObject(priceIds);
239-
} catch (err) {
240-
this.logger.error(
241-
err,
242-
`Error fetching the latest vaas to push for chunk ${chunkIndex}`,
243-
);
244-
return;
245-
}
246-
247-
let updateFeeQueryResponse: UpdateFeeResponse;
248-
try {
249-
const api = new ChainGrpcWasmApi(this.grpcEndpoint);
250-
const { data } = await api.fetchSmartContractState(
251-
this.pythContractAddress,
252-
Buffer.from(
253-
JSON.stringify({
254-
get_update_fee: {
255-
vaas: priceFeedUpdateObject.update_price_feeds.data,
256-
},
257-
}),
258-
).toString("base64"),
207+
const priceFeedUpdateObject =
208+
await this.getPriceFeedUpdateObject(priceIds);
209+
const updateFeeQueryResponse = await this.getUpdateFee(
210+
priceFeedUpdateObject.update_price_feeds.data,
259211
);
212+
const wallet = this.getWallet(chunkIndex);
260213

261-
const json = Buffer.from(data).toString();
262-
updateFeeQueryResponse = JSON.parse(json);
263-
} catch (err) {
264-
this.logger.error(
265-
err,
266-
`Error fetching update fee for chunk ${chunkIndex}`,
267-
);
268-
// Throwing an error because it is likely an RPC issue
269-
throw err;
270-
}
271-
272-
try {
273-
const executeMsg = MsgExecuteContract.fromJSON({
274-
sender: this.injectiveAddress(),
214+
const msg = MsgExecuteContract.fromJSON({
215+
sender: wallet.toAddress().toBech32(),
275216
contractAddress: this.pythContractAddress,
276217
msg: priceFeedUpdateObject,
277218
funds: [updateFeeQueryResponse],
278219
});
279220

280-
const rs = await this.signAndBroadcastMsg(executeMsg);
221+
const rs = await this.signAndBroadcastMsg(msg, chunkIndex);
222+
281223
this.logger.info(
282224
{ hash: rs.txHash },
283225
`Successfully broadcasted txHash for chunk ${chunkIndex}`,
284226
);
285227
} catch (err: any) {
286228
if (err.message.match(/account inj[a-zA-Z0-9]+ not found/) !== null) {
287229
this.logger.error(err, `Account not found for chunk ${chunkIndex}`);
230+
288231
throw new Error("Please check the mnemonic");
289232
}
290233

@@ -295,10 +238,112 @@ export class InjectivePricePusher implements IPricePusher {
295238
this.logger.error(err, `Insufficient funds for chunk ${chunkIndex}`);
296239
throw new Error("Insufficient funds");
297240
}
241+
298242
this.logger.error(
299243
err,
300244
`Error executing messages for chunk ${chunkIndex}`,
301245
);
302246
}
303247
}
248+
249+
/**
250+
* Get the fee for the transaction (using simulation).
251+
*
252+
* We also apply a multiplier to the gas used to apply a small
253+
* buffer to the gas that'll be used.
254+
*/
255+
private async getStdFee(msg: Msgs, index: number) {
256+
const wallet = this.getWallet(index);
257+
const injectiveAddress = wallet.toAddress().toBech32();
258+
const account = this.accounts[injectiveAddress];
259+
260+
if (!account) {
261+
throw new Error("Account not found");
262+
}
263+
264+
const { txRaw: simulateTxRaw } = createTransactionFromMsg({
265+
sequence: account.baseAccount.sequence,
266+
accountNumber: account.baseAccount.accountNumber,
267+
message: msg,
268+
chainId: this.chainConfig.chainId,
269+
pubKey: wallet.toPublicKey().toBase64(),
270+
});
271+
272+
try {
273+
const result = await new TxGrpcApi(this.grpcEndpoint).simulate(
274+
simulateTxRaw,
275+
);
276+
277+
const gas = (
278+
result.gasInfo.gasUsed * this.chainConfig.gasMultiplier
279+
).toFixed();
280+
const fee = {
281+
amount: [
282+
{
283+
denom: "inj",
284+
amount: (Number(gas) * this.chainConfig.gasPrice).toFixed(),
285+
},
286+
],
287+
gas,
288+
};
289+
290+
return fee;
291+
} catch (err) {
292+
this.logger.error(err, `Error getting std fee`);
293+
throw err;
294+
}
295+
}
296+
297+
/**
298+
* Get the latest VAAs for updatePriceFeed and then push them
299+
*/
300+
private async getPriceFeedUpdateObject(priceIds: string[]) {
301+
try {
302+
const response = await this.hermesClient.getLatestPriceUpdates(priceIds, {
303+
encoding: "base64",
304+
});
305+
const vaas = response.binary.data;
306+
307+
return {
308+
update_price_feeds: {
309+
data: vaas,
310+
},
311+
} as {
312+
update_price_feeds: {
313+
data: string[];
314+
};
315+
};
316+
} catch (err) {
317+
this.logger.error(err, `Error fetching the latest vaas to push`);
318+
throw err;
319+
}
320+
}
321+
322+
/**
323+
* Get the update fee for the given VAAs (i.e the fee that is paid to the pyth contract)
324+
*/
325+
private async getUpdateFee(vaas: string[]) {
326+
try {
327+
const api = new ChainGrpcWasmApi(this.grpcEndpoint);
328+
const { data } = await api.fetchSmartContractState(
329+
this.pythContractAddress,
330+
Buffer.from(
331+
JSON.stringify({
332+
get_update_fee: {
333+
vaas,
334+
},
335+
}),
336+
).toString("base64"),
337+
);
338+
339+
const json = Buffer.from(data).toString();
340+
341+
return JSON.parse(json);
342+
} catch (err) {
343+
this.logger.error(err, `Error fetching update fee.`);
344+
345+
// Throwing an error because it is likely an RPC issue
346+
throw err;
347+
}
348+
}
304349
}

0 commit comments

Comments
 (0)