Skip to content

feat(jsonrpc): optimize event log query #6370

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: release_v4.8.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -121,44 +125,76 @@ private BitSet subMatch(int[][] bitIndexes) throws ExecutionException, Interrupt
}

/**
* every section has a compound query of sectionBloomStore, works parallel
* "and" condition in second dimension of query, "or" condition in first dimension
* return a BitSet whose capacity is blockPerSection
* Match blocks using optimized bloom filter operations. This method reduces database queries
* and BitSet operations by handling duplicate bit indexes and skipping invalid groups.
*
* @param bitIndexes A 2D array where:
* - First dimension represents different topic/address (OR)
* - Second dimension contains bit indexes within each topic/address (AND)
* Example: [[1,2,3], [4,5,6]] means (1 AND 2 AND 3) OR (4 AND 5 AND 6)
* @param section The section number in the bloom filter store to query
* @return A BitSet representing the matching blocks in this section
* @throws ExecutionException If there's an error in concurrent execution
* @throws InterruptedException If the concurrent execution is interrupted
*/
private BitSet partialMatch(final int[][] bitIndexes, int section)
throws ExecutionException, InterruptedException {
List<List<Future<BitSet>>> bitSetList = new ArrayList<>();

// 1. Collect all unique bitIndexes
Set<Integer> uniqueBitIndexes = new HashSet<>();
for (int[] index : bitIndexes) {
List<Future<BitSet>> futureList = new ArrayList<>();
for (final int bitIndex : index) { //must be 3
Future<BitSet> bitSetFuture =
sectionExecutor.submit(() -> sectionBloomStore.get(section, bitIndex));
futureList.add(bitSetFuture);
for (int bitIndex : index) { //normally 3, but could be less due to hash collisions
uniqueBitIndexes.add(bitIndex);
}
}

// 2. Submit concurrent requests for all unique bitIndexes
Map<Integer, Future<BitSet>> bitIndexResults = new HashMap<>();
for (int bitIndex : uniqueBitIndexes) {
Future<BitSet> future
= sectionExecutor.submit(() -> sectionBloomStore.get(section, bitIndex));
bitIndexResults.put(bitIndex, future);
}

// 3. Wait for all results and cache them
Map<Integer, BitSet> resultCache = new HashMap<>();
for (Map.Entry<Integer, Future<BitSet>> entry : bitIndexResults.entrySet()) {
BitSet result = entry.getValue().get();
if (result != null) {
resultCache.put(entry.getKey(), result);
}
bitSetList.add(futureList);
}

BitSet bitSet = new BitSet(SectionBloomStore.BLOCK_PER_SECTION);

for (List<Future<BitSet>> futureList : bitSetList) {
// initial a BitSet with all 1
BitSet subBitSet = new BitSet(SectionBloomStore.BLOCK_PER_SECTION);
subBitSet.set(0, SectionBloomStore.BLOCK_PER_SECTION);
// 4. Process valid groups with reused BitSet objects
BitSet finalResult = new BitSet(SectionBloomStore.BLOCK_PER_SECTION);
BitSet tempBitSet = new BitSet(SectionBloomStore.BLOCK_PER_SECTION);

for (int[] index : bitIndexes) {

// init tempBitSet with all 1
tempBitSet.set(0, SectionBloomStore.BLOCK_PER_SECTION);

// and condition in second dimension
for (Future<BitSet> future : futureList) {
BitSet one = future.get();
if (one == null) { //match nothing
subBitSet.clear();
for (int bitIndex : index) {
BitSet cached = resultCache.get(bitIndex);
if (cached == null) { //match nothing
tempBitSet.clear();
break;
}
// "and" condition in second dimension
subBitSet.and(one);
tempBitSet.and(cached);
if (tempBitSet.isEmpty()) {
break;
}
}

// "or" condition in first dimension
bitSet.or(subBitSet);
if (!tempBitSet.isEmpty()) {
finalResult.or(tempBitSet);
}
}
return bitSet;

return finalResult;
}

/**
Expand Down
100 changes: 100 additions & 0 deletions framework/src/test/java/org/tron/core/jsonrpc/LogBlockQueryTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package org.tron.core.jsonrpc;

import java.lang.reflect.Method;
import java.util.BitSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javax.annotation.Resource;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.tron.common.BaseTest;
import org.tron.core.Constant;
import org.tron.core.config.args.Args;
import org.tron.core.services.jsonrpc.TronJsonRpc.FilterRequest;
import org.tron.core.services.jsonrpc.filters.LogBlockQuery;
import org.tron.core.services.jsonrpc.filters.LogFilterWrapper;
import org.tron.core.store.SectionBloomStore;

public class LogBlockQueryTest extends BaseTest {

@Resource
SectionBloomStore sectionBloomStore;
private ExecutorService sectionExecutor;
private Method partialMatchMethod;
private static final long CURRENT_MAX_BLOCK_NUM = 50000L;

static {
Args.setParam(new String[] {"--output-directory", dbPath()}, Constant.TEST_CONF);
}

@Before
public void setup() throws Exception {
sectionExecutor = Executors.newFixedThreadPool(5);

// Get private method through reflection
partialMatchMethod = LogBlockQuery.class.getDeclaredMethod("partialMatch",
int[][].class, int.class);
partialMatchMethod.setAccessible(true);

BitSet bitSet = new BitSet(SectionBloomStore.BLOCK_PER_SECTION);
bitSet.set(0, SectionBloomStore.BLOCK_PER_SECTION);
sectionBloomStore.put(0, 1, bitSet);
sectionBloomStore.put(0, 2, bitSet);
sectionBloomStore.put(0, 3, bitSet);
BitSet bitSet2 = new BitSet(SectionBloomStore.BLOCK_PER_SECTION);
bitSet2.set(0);
sectionBloomStore.put(1, 1, bitSet2);
sectionBloomStore.put(1, 2, bitSet2);
sectionBloomStore.put(1, 3, bitSet2);
}

@Test
public void testPartialMatch() throws Exception {
// Create a basic LogFilterWrapper
LogFilterWrapper logFilterWrapper = new LogFilterWrapper(
new FilterRequest("0x0", "0x1", null, null, null),
CURRENT_MAX_BLOCK_NUM, null, false);

LogBlockQuery logBlockQuery = new LogBlockQuery(logFilterWrapper, sectionBloomStore,
CURRENT_MAX_BLOCK_NUM, sectionExecutor);

int section = 0;

// Create a hit condition
int[][] bitIndexes = new int[][] {
{1, 2, 3}, // topic0
{4, 5, 6} // topic1
};

// topic0 hit section 0
BitSet result = (BitSet) partialMatchMethod.invoke(logBlockQuery, bitIndexes, section);
Assert.assertNotNull(result);
Assert.assertEquals(SectionBloomStore.BLOCK_PER_SECTION, result.cardinality());

// topic0 hit section 1
result = (BitSet) partialMatchMethod.invoke(logBlockQuery, bitIndexes, 1);
Assert.assertNotNull(result);
Assert.assertEquals(1, result.cardinality());

// not exist section 2
result = (BitSet) partialMatchMethod.invoke(logBlockQuery, bitIndexes, 2);
Assert.assertNotNull(result);
Assert.assertTrue(result.isEmpty());

//not hit
bitIndexes = new int[][] {
{1, 2, 4}, // topic0
{3, 5, 6} // topic1
};
result = (BitSet) partialMatchMethod.invoke(logBlockQuery, bitIndexes, section);
Assert.assertNotNull(result);
Assert.assertTrue(result.isEmpty());

// null condition
bitIndexes = new int[0][];
result = (BitSet) partialMatchMethod.invoke(logBlockQuery, bitIndexes, section);
Assert.assertNotNull(result);
Assert.assertTrue(result.isEmpty());
}
}