Skip to content
This repository was archived by the owner on Apr 22, 2025. It is now read-only.

Commit 8f36a57

Browse files
FABN-1607: Start event replay from previous block (#299) (#303)
Ensure that a block is received quickly to avoid the start of the event service blocking waiting for events, which in turn would block the call to Network.addBlockListener() Signed-off-by: Mark S. Lewis <[email protected]>
1 parent 5ffdbb3 commit 8f36a57

File tree

5 files changed

+76
-20
lines changed

5 files changed

+76
-20
lines changed

fabric-network/src/impl/event/blockeventsource.ts

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,25 @@ import { newFilteredBlockEvent } from './filteredblockeventfactory';
1414
import { newFullBlockEvent } from './fullblockeventfactory';
1515
import { OrderedBlockQueue } from './orderedblockqueue';
1616
import { newPrivateBlockEvent } from './privateblockeventfactory';
17+
import { notNullish } from '../gatewayutils';
1718
import Long = require('long');
1819

1920
const logger = Logger.getLogger('BlockEventSource');
2021

2122
const defaultBlockType: EventType = 'filtered';
2223

24+
function newBlockQueue(options: ListenerOptions): OrderedBlockQueue {
25+
const startBlock = asLong(options.startBlock);
26+
return new OrderedBlockQueue(startBlock);
27+
}
28+
29+
function asLong(value?: string | number | Long): Long | undefined {
30+
if (notNullish(value)) {
31+
return Long.fromValue(value);
32+
}
33+
return undefined;
34+
}
35+
2336
export class BlockEventSource {
2437
private readonly eventServiceManager: EventServiceManager;
2538
private eventService?: EventService;
@@ -32,7 +45,7 @@ export class BlockEventSource {
3245

3346
constructor(eventServiceManager: EventServiceManager, options: ListenerOptions = {}) {
3447
this.eventServiceManager = eventServiceManager;
35-
this.blockQueue = this.newBlockQueue(options);
48+
this.blockQueue = newBlockQueue(options);
3649
this.asyncNotifier = new AsyncNotifier(
3750
this.blockQueue.getNextBlock.bind(this.blockQueue),
3851
this.notifyListeners.bind(this)
@@ -56,11 +69,6 @@ export class BlockEventSource {
5669
this.started = false;
5770
}
5871

59-
private newBlockQueue(options: ListenerOptions): OrderedBlockQueue {
60-
const startBlock = options.startBlock ? Long.fromValue(options.startBlock) : undefined;
61-
return new OrderedBlockQueue(startBlock);
62-
}
63-
6472
private async start() {
6573
if (this.started) {
6674
return;
@@ -96,9 +104,17 @@ export class BlockEventSource {
96104
}
97105

98106
private async startEventService() {
107+
let startBlock = this.getNextBlockNumber();
108+
if (startBlock) {
109+
startBlock = startBlock.subtract(Long.ONE);
110+
if (startBlock.isNegative()) {
111+
startBlock = Long.ZERO;
112+
}
113+
}
114+
99115
const options: StartRequestOptions = {
100116
blockType: this.blockType,
101-
startBlock: this.getNextBlockNumber()
117+
startBlock
102118
};
103119
await this.eventServiceManager.startEventService(this.eventService!, options);
104120
}

fabric-network/src/impl/gatewayutils.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,3 +64,12 @@ export function cachedResult<T>(f: () => T): () => T {
6464
return value;
6565
};
6666
}
67+
68+
/**
69+
* Typesafe check that a value is not nullish.
70+
* @private
71+
* @param value Any value, including null and undefined.
72+
*/
73+
export function notNullish<T>(value?: T): value is T {
74+
return value !== null && value !== undefined;
75+
}

fabric-network/src/network.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import { checkpointBlockListener } from './impl/event/listeners';
1616
import { addListener, ListenerSession, removeListener } from './impl/event/listenersession';
1717
import { SharedBlockListenerSession } from './impl/event/sharedblocklistenersession';
1818
import { QueryHandler } from './impl/query/queryhandler';
19+
import { notNullish } from './impl/gatewayutils';
1920
import * as Logger from './logger';
2021

2122
const logger = Logger.getLogger('Network');
@@ -355,7 +356,7 @@ export class NetworkImpl implements Network {
355356
listener = checkpointBlockListener(listener, options.checkpointer);
356357
}
357358

358-
if (options.startBlock) {
359+
if (notNullish(options.startBlock)) {
359360
return this.newIsolatedBlockListenerSession(listener, options);
360361
} else {
361362
return this.newSharedBlockListenerSession(listener, options.type);

fabric-network/test/impl/event/blocklistener.spec.ts

Lines changed: 41 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -263,10 +263,10 @@ describe('block listener', () => {
263263

264264
await startListener.completePromise;
265265
sinon.assert.calledWith(stub, eventService);
266-
sinon.assert.neverCalledWith(stub, sinon.match.any, sinon.match.has('startBlock', sinon.match.number));
266+
sinon.assert.neverCalledWith(stub, sinon.match.any, sinon.match.has('startBlock', sinon.match.defined));
267267
});
268268

269-
it('errors trigger reconnect of event service with next block as start block if events received', async () => {
269+
it('errors trigger reconnect of event service with last received block as start block if events received', async () => {
270270
await network.addBlockListener(listener, listenerOptions);
271271
const startListener = testUtils.newAsyncListener<void>();
272272
const stub = sinon.stub(eventServiceManager, 'startEventService').callsFake(() => startListener());
@@ -275,7 +275,28 @@ describe('block listener', () => {
275275
eventService.sendError(new Error('DISCONNECT'));
276276

277277
await startListener.completePromise;
278-
sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.fromNumber(2)));
278+
sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.ONE));
279+
});
280+
281+
it('listener does not receive old blocks on reconnect', async () => {
282+
listener = testUtils.newAsyncListener<BlockEvent>(2);
283+
const event1 = newFilteredBlockEventInfo(1);
284+
const event2 = newFilteredBlockEventInfo(2);
285+
const startListener = testUtils.newAsyncListener<void>(2);
286+
const stub = sinon.stub(eventServiceManager, 'startEventService').callsFake(() => startListener());
287+
288+
await network.addBlockListener(listener, listenerOptions);
289+
eventService.sendEvent(event1);
290+
eventService.sendError(new Error('DISCONNECT'));
291+
292+
await startListener.completePromise;
293+
294+
eventService.sendEvent(event1);
295+
eventService.sendEvent(event2);
296+
297+
const actual = await listener.completePromise;
298+
const blockNumbers = actual.map((e) => e.blockNumber);
299+
expect(blockNumbers).to.deep.equal([event1.blockNumber, event2.blockNumber]);
279300
});
280301

281302
it('listener changing event data does not affect other listeners', async () => {
@@ -325,13 +346,22 @@ describe('block listener', () => {
325346
});
326347

327348
describe('replay', () => {
328-
it('replay listener sends start block to event service', async () => {
349+
it('replay listener sends (startBlock - 1) to event service', async () => {
329350
const stub = sinon.stub(eventServiceManager, 'startEventService');
330351

331-
listenerOptions.startBlock = 2;
352+
listenerOptions.startBlock = 1;
353+
await network.addBlockListener(listener, listenerOptions);
354+
355+
sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.ZERO));
356+
});
357+
358+
it('replay listener does not send start block less than zero to event service', async () => {
359+
const stub = sinon.stub(eventServiceManager, 'startEventService');
360+
361+
listenerOptions.startBlock = 0;
332362
await network.addBlockListener(listener, listenerOptions);
333363

334-
sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.fromNumber(2)));
364+
sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.ZERO));
335365
});
336366

337367
it('replay listener does not receive events earlier than start block', async () => {
@@ -449,27 +479,27 @@ describe('block listener', () => {
449479
expect(actual.blockNumber).to.equal(event.blockNumber);
450480
});
451481

452-
it('checkpoint listener sends block number to event service', async () => {
482+
it('checkpoint listener sends (block number - 1) to event service', async () => {
453483
const stub = sinon.stub(eventServiceManager, 'startEventService');
454484
const checkpointer = new StubCheckpointer();
455485
await checkpointer.setBlockNumber(Long.fromNumber(2));
456486

457487
listenerOptions.checkpointer = checkpointer;
458488
await network.addBlockListener(listener, listenerOptions);
459489

460-
sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.fromNumber(2)));
490+
sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.ONE));
461491
});
462492

463493
it('checkpoint block number takes precedence over startBlock option', async () => {
464494
const stub = sinon.stub(eventServiceManager, 'startEventService');
465495
const checkpointer = new StubCheckpointer();
466-
await checkpointer.setBlockNumber(Long.fromNumber(2));
496+
await checkpointer.setBlockNumber(Long.ONE);
467497

468498
listenerOptions.checkpointer = checkpointer;
469-
listenerOptions.startBlock = 1;
499+
listenerOptions.startBlock = 10;
470500
await network.addBlockListener(listener, listenerOptions);
471501

472-
sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.fromNumber(2)));
502+
sinon.assert.calledWith(stub, eventService, sinon.match.has('startBlock', Long.ZERO));
473503
});
474504

475505
it('checkpoint listener receives events from checkpoint block number', async () => {

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
"unitTest:all": "export HFC_LOGGING='{\"debug\":\"test/temp/debug.log\"}' && nyc run-s unitTest:common unitTest:ca-client unitTest:network",
3030
"unitTest:common": "mocha --reporter list 'fabric-common/test/**/*.js'",
3131
"unitTest:ca-client": "mocha --reporter list 'fabric-ca-client/test/**/*.js'",
32-
"unitTest:network": "npm run compile && mocha --require ts-node/register --reporter list 'fabric-network/test/**/*.{js,ts}'",
32+
"unitTest:network": "run-s compile \"unitTest -- 'fabric-network/test/**/*.{js,ts}'\"",
3333
"unitTest": "mocha --require ts-node/register --reporter list",
3434
"dockerReady": "npm run dockerClean && (cd test/fixtures/docker-compose && docker-compose -f docker-compose-tls-level-db.yaml -p node up -d && sleep 15 && docker ps -a)",
3535
"tapeIntegration": "./scripts/npm_scripts/runTape.sh",

0 commit comments

Comments
 (0)