Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
2cf0698
fix: pass in raw bytes directly when testing addRow
gengdy1545 Oct 1, 2025
1a8f1a2
feat: add steam update test
gengdy1545 Oct 2, 2025
22c1f19
fix: values.length is already checked in addRow
gengdy1545 Oct 3, 2025
8a4fea1
feat: update retina service test
gengdy1545 Oct 3, 2025
115c44a
feat: update retina service test
gengdy1545 Oct 3, 2025
82b5153
feat: update retina service test
gengdy1545 Oct 8, 2025
03093e3
feat: update retina service test
gengdy1545 Oct 8, 2025
87d695d
feat: update retina pom
gengdy1545 Oct 8, 2025
c29d8b9
feat: update retina test
gengdy1545 Oct 8, 2025
09a68ad
feat: update retina pom
gengdy1545 Oct 9, 2025
f209303
feat: add retina trace test for index service
gengdy1545 Oct 11, 2025
d50c3e9
feat: update retina trace test for index service
gengdy1545 Oct 11, 2025
b7498d4
feat: update retina trace test for index service
gengdy1545 Oct 12, 2025
266b7ab
feat: reduce file flushing time to 30 seconds
gengdy1545 Oct 13, 2025
e8207ac
fix: add the index-memory dependency
gengdy1545 Oct 13, 2025
9f47f18
feat: update test for retina service
gengdy1545 Oct 13, 2025
eb56e53
feat: update test for retina service
gengdy1545 Oct 14, 2025
9805b9b
feat: avoid hard-coding the keyColumn for SinglePointIndex
gengdy1545 Oct 14, 2025
16b4107
fix: use sharding to enable parallel processing
gengdy1545 Oct 14, 2025
a5402f6
fix: keyColumn construction error
gengdy1545 Oct 14, 2025
19fbbaa
fix: keyColumnNames error
gengdy1545 Oct 15, 2025
44da852
fix: remove duplicate dependencies
gengdy1545 Oct 15, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions pixels-daemon/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@
</dependency>
<dependency>
<groupId>io.pixelsdb</groupId>
<artifactId>pixels-index-memory</artifactId>
<artifactId>pixels-index-main-sqlite</artifactId>
</dependency>
<dependency>
<groupId>io.pixelsdb</groupId>
<artifactId>pixels-index-main-sqlite</artifactId>
<artifactId>pixels-index-memory</artifactId>
</dependency>

<dependency>
Expand Down
22 changes: 22 additions & 0 deletions pixels-index/pixels-index-rocksdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,33 @@
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.pixelsdb</groupId>
<artifactId>pixels-index-memory</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-testing</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,329 @@
/*
* Copyright 2025 PixelsDB.
*
* This file is part of Pixels.
*
* Pixels is free software: you can redistribute it and/or modify
* it under the terms of the Affero GNU General Public License as
* published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Pixels is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Affero GNU General Public License for more details.
*
* You should have received a copy of the Affero GNU General Public
* License along with Pixels. If not, see
* <https://www.gnu.org/licenses/>.
*/
package io.pixelsdb.pixels.index.rocksdb;

import com.google.protobuf.ByteString;
import io.pixelsdb.pixels.common.exception.IndexException;
import io.pixelsdb.pixels.common.index.IndexService;
import io.pixelsdb.pixels.common.index.IndexServiceProvider;
import io.pixelsdb.pixels.index.IndexProto;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.BeforeAll;

import java.io.BufferedReader;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* Test using the actual index load and update during Retina runtime.
* Primarily uses the LocalIndexService's deletePrimaryIndexEntry and putPrimaryIndexEntry interfaces.
* PUT format: tableId, indexId, key, timestamp, rowId, fileId, rgId, rgRowOffset
* DEL format: tableId, indexId, key, timestamp
* *****************************************************************************
* Note
* 1. The key is an integer here.
* 2. Launching the metadataService separately in tests is cumbersome;
* it's best to start the pixels-related services externally.
* 3. Insert records used for tracing into the SINGLE_POINT_INDICES table
* in MySQL pixels_metadata.
* e.g.,
* ```sql
* INSERT INTO SINGLE_POINT_INDICES
* (SPI_ID, SPI_KEY_COLUMNS, SPI_PRIMARY, SPI_UNIQUE, SPI_INDEX_SCHEME, TBLS_TBL_ID, SCHEMA_VERSIONS_SV_ID)
* VALUES
* (403, '{"keyColumnIds":[403]}', 1, 1, 'rocksdb', 3, 3),
* (404, '{"keyColumnIds":[404]}', 1, 1, 'rocksdb', 3, 3);
* ```
* *****************************************************************************
*/
public class TestRetinaTrace
{
private static final int THREAD_COUNT = 16;

// Trace load path: only put operations
private static final String loadPath = "/home/gengdy/data/index/index.load.trace";

// Trace update path: both put and delete operations
private static final String updatePath = "/home/gengdy/data/index/index.update.trace";

private static final IndexService indexService = IndexServiceProvider.getService(IndexServiceProvider.ServiceMode.local);

/**
* Load the initial data into the index
*/
@BeforeAll
public static void prepare()
{
System.out.println("Preparing data from loadPath into index...");
long count = 0;
try (BufferedReader reader = Files.newBufferedReader(Paths.get(loadPath)))
{
String line;
while ((line = reader.readLine()) != null)
{
count++;
String[] parts = line.split("\\t");
PutOperation putOperation = new PutOperation(parts);
IndexProto.PrimaryIndexEntry entry = (IndexProto.PrimaryIndexEntry) putOperation.toProto();
indexService.putPrimaryIndexEntry(entry);
}
} catch (IOException e)
{
throw new RuntimeException("Failed to prepare data from loadPath", e);
} catch (IndexException e)
{
throw new RuntimeException("Failed to put index entry during prepare", e);
}
System.out.println("Finished preparing " + count * THREAD_COUNT + " records into index.");
}

private interface TraceOperation
{
int getBucket();
Object toProto();
}

private static class PutOperation implements TraceOperation
{
final long tableId, indexId, timestamp, rowId, fileId;
final ByteString key;
final int rgId, rgRowOffset;
final int bucket;

PutOperation(String[] parts)
{
if (parts.length != 9)
{
throw new RuntimeException("Invalid PUT operation: " + String.join("\t", parts));
}
this.tableId = Long.parseLong(parts[1]);
this.indexId = Long.parseLong(parts[2]);
int keyInt = Integer.parseInt(parts[3]);
this.key = ByteString.copyFrom(ByteBuffer.allocate(Integer.BYTES)
.putInt(keyInt).array());
this.timestamp = Long.parseLong(parts[4]);
this.rowId = Long.parseLong(parts[5]);
this.fileId = Long.parseLong(parts[6]);
this.rgId = Integer.parseInt(parts[7]);
this.rgRowOffset = Integer.parseInt(parts[8]);
this.bucket = keyInt % THREAD_COUNT;
}

@Override
public int getBucket()
{
return bucket;
}

@Override
public Object toProto()
{
IndexProto.IndexKey indexKey = IndexProto.IndexKey.newBuilder()
.setTableId(tableId)
.setIndexId(indexId)
.setKey(key)
.setTimestamp(timestamp)
.build();

IndexProto.RowLocation rowLocation = IndexProto.RowLocation.newBuilder()
.setFileId(fileId)
.setRgId(rgId)
.setRgRowOffset(rgRowOffset)
.build();

return IndexProto.PrimaryIndexEntry.newBuilder()
.setIndexKey(indexKey)
.setRowId(rowId)
.setRowLocation(rowLocation)
.build();
}
}

private static class DeleteOperation implements TraceOperation
{
final long tableId, indexId, timestamp;
final ByteString key;
final int bucket;

DeleteOperation(String[] parts)
{
if (parts.length != 5)
{
throw new RuntimeException("Invalid DEL operation: " + String.join("\t", parts));
}
this.tableId = Long.parseLong(parts[1]);
this.indexId = Long.parseLong(parts[2]);
int keyInt = Integer.parseInt(parts[3]);
this.key = ByteString.copyFrom(ByteBuffer.allocate(Integer.BYTES)
.putInt(keyInt).array());
this.timestamp = Long.parseLong(parts[4]);
this.bucket = keyInt % THREAD_COUNT;
}

@Override
public int getBucket()
{
return bucket;
}

@Override
public Object toProto()
{
return IndexProto.IndexKey.newBuilder()
.setTableId(tableId)
.setIndexId(indexId)
.setKey(key)
.setTimestamp(timestamp)
.build();
}
}

private static class IndexWorker implements Runnable
{
private final List<Object> protoOperations;

public IndexWorker(List<Object> protoOperations)
{
this.protoOperations = protoOperations;
}

@Override
public void run()
{
try
{
for (Object proto : protoOperations)
{
if (proto instanceof IndexProto.PrimaryIndexEntry)
{
indexService.putPrimaryIndexEntry((IndexProto.PrimaryIndexEntry) proto);
} else
{
indexService.deletePrimaryIndexEntry((IndexProto.IndexKey) proto);
}
}
} catch (IndexException e)
{
throw new RuntimeException("Index operation failed in worker thread", e);
}
}
}

@Test
public void testIndex()
{
System.out.println("Loading baseTrace...");
List<TraceOperation> operations = new ArrayList<>();
long putCount = 0, delCount = 0;
try (BufferedReader reader = Files.newBufferedReader(Paths.get(updatePath)))
{
String line;
while ((line = reader.readLine()) != null)
{
String[] parts = line.split("\\t");
if (parts.length < 1)
{
continue;
}
String opType = parts[0];
if (opType.equals("P"))
{
putCount++;
operations.add(new PutOperation(parts));
} else if (opType.equals("D"))
{
delCount++;
operations.add(new DeleteOperation(parts));
} else
{
throw new RuntimeException("Unknown operation type: " + opType);
}
}
} catch (IOException e)
{
throw new RuntimeException("Failed to read update trace file", e);
} catch (NumberFormatException e)
{
throw new RuntimeException("Malformed number in update trace", e);
}
System.out.println("Loaded " + operations.size() + " operations from update trace.");

System.out.println("Generating workloads for " + THREAD_COUNT + " threads...");
List<List<Object>> threadProtoOperations = IntStream.range(0, THREAD_COUNT)
.mapToObj(i -> new ArrayList<Object>())
.collect(Collectors.toList());

for (TraceOperation op : operations)
{
threadProtoOperations.get(op.getBucket()).add(op.toProto());
}
System.out.println("Finished pre-building protobuf objects.");

System.out.println("Starting index performance test with " + THREAD_COUNT + " threads...");
ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT);
List<Future<?>> futures = new ArrayList<>();
long startTime = System.currentTimeMillis();
for (List<Object> threadProtoOps : threadProtoOperations)
{
futures.add(executor.submit(new IndexWorker(threadProtoOps)));
}
for (Future<?> f : futures)
{
try
{
f.get();
} catch (Exception e)
{
throw new RuntimeException("Thread execution failed", e);
}
}
long endTime = System.currentTimeMillis();

long totalDurationNanos = endTime - startTime;
double totalDurationSeconds = totalDurationNanos / 1000.0;
long totalOps = putCount + delCount;

double putThroughput = (totalDurationSeconds > 0) ? (putCount / totalDurationSeconds) : 0;
double deleteThroughput = (totalDurationSeconds > 0) ? (delCount / totalDurationSeconds) : 0;
double totalThroughput = (totalDurationSeconds > 0) ? (totalOps / totalDurationSeconds) : 0;

System.out.println("\n--- Index Performance Test Results ---");
System.out.printf("Thread Count: %d, Mode: Single Entry\n", THREAD_COUNT);
System.out.printf("Total test time: %.3f seconds\n", totalDurationSeconds);
System.out.println("------------------------------------");
System.out.printf("Total PUT operations: %,d\n", putCount);
System.out.printf("Total DELETE operations: %,d\n", delCount);
System.out.printf("Total operations: %,d\n", totalOps);
System.out.println("------------------------------------");
System.out.printf("PUT throughput: %,.2f ops/sec\n", putThroughput);
System.out.printf("DELETE throughput: %,.2f ops/sec\n", deleteThroughput);
System.out.printf("Total throughput: %,.2f ops/sec\n", totalThroughput);
System.out.println("------------------------------------\n");
}
}
5 changes: 5 additions & 0 deletions pixels-retina/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
<artifactId>pixels-index-rockset</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.pixelsdb</groupId>
<artifactId>pixels-index-memory</artifactId>
<optional>true</optional>
</dependency>

<dependency>
<groupId>net.java.dev.jna</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@
import io.pixelsdb.pixels.common.metadata.MetadataService;
import io.pixelsdb.pixels.common.metadata.domain.File;
import io.pixelsdb.pixels.common.metadata.domain.Path;
import io.pixelsdb.pixels.common.physical.PhysicalReader;
import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil;
import io.pixelsdb.pixels.common.physical.Storage;
import io.pixelsdb.pixels.common.utils.DateUtil;
import io.pixelsdb.pixels.core.PixelsWriter;
Expand Down
Loading