Skip to content

Commit f6e9ea2

Browse files
authored
Merge pull request #456 from EYBlockchain/lyd/eventListenerFixes
Create event listener for back up data
2 parents 5a4ad7a + 3008f7d commit f6e9ea2

11 files changed

Lines changed: 457 additions & 64 deletions

File tree

src/boilerplate/common/api.mjs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import { ServiceManager } from './api_services.mjs';
44
import { Router } from './api_routes.mjs';
55
import Web3 from './common/web3.mjs';
66
ENCRYPTEDLISTENER_IMPORT
7+
import BackupEncryptedDataEventListener from './common/backup-encrypted-data-listener.mjs';
78

89
function gracefulshutdown() {
910
console.log('Shutting down');
@@ -23,7 +24,8 @@ const web3 = Web3.connection();
2324
const serviceMgr = new ServiceManager(web3);
2425
serviceMgr.init().then(async () => {
2526
ENCRYPTEDLISTENER_CALL
26-
27+
const backupEventListener = new BackupEncryptedDataEventListener(web3);
28+
await backupEventListener.startBackupRecovery();
2729
const router = new Router(serviceMgr);
2830
const r = router.addRoutes();
2931
app.use('/', r);
Lines changed: 370 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,370 @@
1+
import fs from 'fs';
2+
import utils from 'zkp-utils';
3+
import config from 'config';
4+
import { generalise } from 'general-number';
5+
import { getContractAddress, getContractInstance, registerKey } from './contract.mjs';
6+
import { storeCommitment } from './commitment-storage.mjs';
7+
import { decompressStarlightKey, decrypt, poseidonHash, } from './number-theory.mjs';
8+
import { getLeafIndex } from "./timber.mjs";
9+
10+
const keyDb =
11+
process.env.KEY_DB_PATH || '/app/orchestration/common/db/key.json';
12+
13+
export default class BackupEncryptedDataEventListener {
14+
constructor(web3) {
15+
this.web3 = web3;
16+
this.ethAddress = generalise(config.web3.options.defaultAccount);
17+
this.contractMetadata = {};
18+
this.eventSubscription = null; // Store as class property to prevent garbage collection
19+
this.heartbeatInterval = null;
20+
this.lastEventReceived = Date.now();
21+
this.lastProcessedBlock = 0; // Track last block we processed
22+
}
23+
24+
async init() {
25+
try {
26+
this.instance = await getContractInstance('CONTRACT_NAME');
27+
const contractAddr = await getContractAddress('CONTRACT_NAME');
28+
console.log(
29+
'encrypted-data-listener',
30+
'init',
31+
'Contract Address --------->',
32+
contractAddr,
33+
);
34+
35+
if (!fs.existsSync(keyDb)) await registerKey(utils.randomHex(31), 'CONTRACT_NAME', true);
36+
37+
const keys = JSON.parse(fs.readFileSync(keyDb, 'utf-8'));
38+
this.secretKey = generalise(keys.secretKey);
39+
this.publicKey = generalise(keys.publicKey);
40+
this.sharedPublicKey = generalise(keys.sharedPublicKey);
41+
this.sharedSecretKey = generalise(keys.sharedSecretKey);
42+
43+
if (!keys.secretKey || !keys.publicKey) {
44+
throw new Error('Invalid key file: missing required keys');
45+
}
46+
} catch (error) {
47+
console.error(
48+
'encrypted-data-listener',
49+
'init',
50+
'Initialization failed:',
51+
error,
52+
);
53+
throw error;
54+
}
55+
}
56+
57+
async startBackupRecovery() {
58+
try {
59+
await this.init();
60+
61+
const eventName = 'EncryptedBackupData';
62+
const eventJsonInterface = this.instance._jsonInterface.find(
63+
o => o.name === eventName && o.type === 'event',
64+
);
65+
66+
// Determine starting block: use metadata if set, otherwise last processed + 1, otherwise start from block 1
67+
let startBlock;
68+
if (this.contractMetadata.blockNumber) {
69+
startBlock = this.contractMetadata.blockNumber;
70+
} else if (this.lastProcessedBlock) {
71+
startBlock = this.lastProcessedBlock + 1;
72+
} else {
73+
startBlock = 1;
74+
}
75+
76+
console.log(`[BACKUP] Starting backup event listener from block ${startBlock}`);
77+
78+
// Store as class property to prevent garbage collection
79+
this.eventSubscription = this.instance.events[eventName]({
80+
fromBlock: startBlock,
81+
topics: [eventJsonInterface.signature],
82+
});
83+
84+
// Track last event received for health monitoring
85+
this.lastEventReceived = Date.now();
86+
87+
this.eventSubscription.on('connected', subscriptionId => {
88+
console.log(`[BACKUP] Connected, ID: ${subscriptionId}`);
89+
});
90+
91+
this.eventSubscription.on('data', async eventData => {
92+
try {
93+
this.lastEventReceived = Date.now();
94+
this.lastProcessedBlock = Number(eventData.blockNumber);
95+
console.log(`[BACKUP] Event received, block ${eventData.blockNumber}`);
96+
await this.processBackupEventData(eventData);
97+
} catch (error) {
98+
console.error('[BACKUP] Error processing backup event data:', error);
99+
}
100+
});
101+
102+
this.eventSubscription.on('error', async error => {
103+
console.error('[BACKUP] ❌ Subscription error:', error);
104+
await this.reconnect();
105+
});
106+
107+
this.eventSubscription.on('close', async () => {
108+
console.log('[BACKUP] Subscription closed, reconnecting...');
109+
await this.reconnect();
110+
});
111+
112+
// Clear any existing heartbeat
113+
if (this.heartbeatInterval) {
114+
clearInterval(this.heartbeatInterval);
115+
}
116+
117+
// Heartbeat: Check subscription health every 30 seconds
118+
this.heartbeatInterval = setInterval(async () => {
119+
const now = Date.now();
120+
const timeSinceLastEvent = now - this.lastEventReceived;
121+
const minutesSinceLastEvent = Math.floor(timeSinceLastEvent / 60000);
122+
123+
console.log('[BACKUP] ❤️ Heartbeat - Time since last event:', minutesSinceLastEvent, 'minutes');
124+
125+
// Check if subscription object still exists and has an ID
126+
if (!this.eventSubscription || !this.eventSubscription.id) {
127+
console.warn('[BACKUP] ⚠️ WARNING: Event subscription is dead!');
128+
console.log('[BACKUP] Subscription exists?', !!this.eventSubscription);
129+
if (this.eventSubscription) {
130+
console.log('[BACKUP] Subscription ID:', this.eventSubscription.id);
131+
}
132+
await this.reconnect();
133+
} else {
134+
console.log('[BACKUP] Subscription healthy, ID:', this.eventSubscription.id);
135+
console.log('[BACKUP] Last processed block:', this.lastProcessedBlock);
136+
137+
// Check for past events to see if we're missing any
138+
try {
139+
const currentBlock = Number(await this.web3.eth.getBlockNumber());
140+
console.log('[BACKUP] Current block:', currentBlock);
141+
142+
// Only check if we've processed at least one event
143+
if (this.lastProcessedBlock > 0) {
144+
const checkFromBlock = this.lastProcessedBlock + 1; // Start AFTER last processed
145+
146+
// Only check if there are new blocks since last processed
147+
if (checkFromBlock <= currentBlock) {
148+
const pastEvents = await this.instance.getPastEvents('EncryptedBackupData', {
149+
fromBlock: checkFromBlock,
150+
toBlock: 'latest'
151+
});
152+
153+
if (pastEvents.length > 0) {
154+
console.log(`[BACKUP] ⚠️ Found ${pastEvents.length} past events from block ${checkFromBlock} to ${currentBlock} that subscription didn't receive!`);
155+
pastEvents.forEach(evt => {
156+
console.log(`[BACKUP] - Event in block ${evt.blockNumber}, tx: ${evt.transactionHash}`);
157+
});
158+
console.log('[BACKUP] Subscription is broken. Forcing reconnect...');
159+
await this.reconnect();
160+
} else {
161+
console.log(`[BACKUP] No new events from block ${checkFromBlock} to ${currentBlock} - subscription OK`);
162+
}
163+
} else {
164+
console.log(`[BACKUP] No new blocks since last processed (${this.lastProcessedBlock})`);
165+
}
166+
} else {
167+
console.log('[BACKUP] No events processed yet, skipping past event check');
168+
}
169+
} catch (e) {
170+
console.log('[BACKUP] Error checking past events:', e.message);
171+
}
172+
}
173+
}, 30000); // Every 30 seconds
174+
175+
console.log('[BACKUP] Event handlers and health monitor attached, waiting for events...');
176+
} catch (error) {
177+
console.error('[BACKUP] ❌ Listener startup failed:', error);
178+
}
179+
}
180+
181+
async processBackupEventData(eventData) {
182+
const keyPairs = [
183+
{ secretKey: this.secretKey, publicKey: this.publicKey },
184+
{ secretKey: this.sharedSecretKey, publicKey: this.sharedPublicKey },
185+
];
186+
187+
for (const kp of keyPairs) {
188+
if (!kp.secretKey) continue;
189+
for (let i = 0; i < eventData.returnValues.encPreimages.length; i++) {
190+
let { cipherText, ephPublicKey, varName } =
191+
eventData.returnValues.encPreimages[i];
192+
const name = varName.split(' ')[0];
193+
const structProperties = varName.split('props:')[1]?.trim();
194+
varName = varName.split('props:')[0]?.trim();
195+
let isArray = false;
196+
let isStruct = false;
197+
if (varName.includes(' a')) {
198+
isArray = true;
199+
}
200+
if (varName.includes(' s')) {
201+
isStruct = true;
202+
}
203+
const plainText = decrypt(cipherText, kp.secretKey.hex(32), [
204+
decompressStarlightKey(generalise(ephPublicKey))[0].hex(32),
205+
decompressStarlightKey(generalise(ephPublicKey))[1].hex(32),
206+
]);
207+
let mappingKey = null;
208+
let stateVarId;
209+
let value;
210+
console.log(
211+
'Decrypted pre-image of commitment for variable name: ' + name + ': ',
212+
);
213+
const salt = generalise(plainText[0]);
214+
console.log(`\tSalt: ${salt.integer}`);
215+
let count;
216+
if (isArray) {
217+
console.log(`\tState variable StateVarId: ${plainText[2]}`);
218+
mappingKey = generalise(plainText[1]);
219+
console.log(`\tMapping Key: ${mappingKey.integer}`);
220+
const reGenStateVarId = generalise(
221+
utils.mimcHash(
222+
[
223+
generalise(plainText[2]).bigInt,
224+
generalise(plainText[1]).bigInt,
225+
],
226+
'ALT_BN_254',
227+
),
228+
);
229+
stateVarId = reGenStateVarId;
230+
console.log(`Regenerated StateVarId: ${reGenStateVarId.bigInt}`);
231+
count = 3;
232+
} else {
233+
stateVarId = generalise(plainText[1]);
234+
console.log(`\tStateVarId: ${plainText[1]}`);
235+
count = 2;
236+
}
237+
if (isStruct) {
238+
value = {};
239+
let count = isArray ? 3 : 2;
240+
for (const prop of structProperties.split(' ')) {
241+
value[prop] = plainText[count];
242+
count++;
243+
}
244+
console.log(`\tValue: ${value}`);
245+
} else {
246+
value = generalise(plainText[count]);
247+
console.log(`\tValue: ${value.integer}`);
248+
}
249+
let newCommitment;
250+
if (isStruct) {
251+
const hashInput = [BigInt(stateVarId.hex(32))];
252+
const start = isArray ? 3 : 2;
253+
for (let i = start; i < plainText.length; i++) {
254+
hashInput.push(BigInt(generalise(plainText[i]).hex(32)));
255+
}
256+
hashInput.push(BigInt(kp.publicKey.hex(32)));
257+
hashInput.push(BigInt(salt.hex(32)));
258+
newCommitment = generalise(poseidonHash(hashInput));
259+
} else {
260+
newCommitment = generalise(
261+
poseidonHash([
262+
BigInt(stateVarId.hex(32)),
263+
BigInt(value.hex(32)),
264+
BigInt(kp.publicKey.hex(32)),
265+
BigInt(salt.hex(32)),
266+
]),
267+
);
268+
}
269+
const index = await getLeafIndex(
270+
'CONTRACT_NAME',
271+
newCommitment.integer,
272+
undefined,
273+
1,
274+
);
275+
if (index === undefined) {
276+
console.log(index, 'index');
277+
console.warn(
278+
'Could not find leaf index for',
279+
newCommitment.integer,
280+
', Possibly this commitment has a different public key and so decryption failed.',
281+
);
282+
continue;
283+
}
284+
const nullifier = poseidonHash([
285+
BigInt(stateVarId.hex(32)),
286+
BigInt(kp.secretKey.hex(32)),
287+
BigInt(salt.hex(32)),
288+
]);
289+
let isNullified = false;
290+
// Check if nullifiers method exists on the contract
291+
if (this.instance.methods.nullifiers) {
292+
let nullification = await this.instance.methods
293+
.nullifiers(nullifier.integer)
294+
.call();
295+
if (nullification === 0n) {
296+
isNullified = false;
297+
} else if (nullification === BigInt(nullifier.integer)) {
298+
isNullified = true;
299+
} else {
300+
throw new Error("The nullifier value: " + nullifier.integer +
301+
' does not match the on-chain nullifier: ' +
302+
nullification,
303+
);
304+
}
305+
} else {
306+
console.log(
307+
'Contract does not have nullifiers method, assuming not nullified',
308+
);
309+
isNullified = false;
310+
}
311+
try {
312+
await storeCommitment({
313+
hash: newCommitment,
314+
name: name,
315+
mappingKey: mappingKey?.integer,
316+
preimage: {
317+
stateVarId: stateVarId,
318+
value: value,
319+
salt: salt,
320+
publicKey: kp.publicKey,
321+
},
322+
secretKey: kp.secretKey,
323+
isNullified: isNullified,
324+
});
325+
console.log("Added commitment", newCommitment.hex(32));
326+
} catch (e) {
327+
if (e.toString().includes("E11000 duplicate key")) {
328+
console.log(
329+
"encrypted-data-listener -",
330+
"This commitment already exists. Ignore it."
331+
);
332+
}
333+
}
334+
}
335+
}
336+
}
337+
338+
async reconnect() {
339+
console.log('[BACKUP] Reconnecting...');
340+
341+
// Clear heartbeat interval
342+
if (this.heartbeatInterval) {
343+
clearInterval(this.heartbeatInterval);
344+
this.heartbeatInterval = null;
345+
}
346+
347+
// Clean up old subscription if it exists
348+
if (this.eventSubscription) {
349+
try {
350+
console.log('[BACKUP] Unsubscribing from old subscription...');
351+
await this.eventSubscription.unsubscribe();
352+
} catch (e) {
353+
console.log('[BACKUP] Error unsubscribing (may already be dead):', e.message);
354+
}
355+
this.eventSubscription = null;
356+
}
357+
358+
// Reset last event timestamp
359+
this.lastEventReceived = Date.now();
360+
361+
try {
362+
await this.startBackupRecovery();
363+
console.log('[BACKUP] Reconnected successfully');
364+
} catch (error) {
365+
console.error('[BACKUP] ❌ Reconnection attempt failed:', error);
366+
setTimeout(() => this.reconnect(), 5000); // Retry after 5 seconds
367+
}
368+
}
369+
370+
}

0 commit comments

Comments
 (0)