diff --git a/pixels-common/src/test/java/io/pixelsdb/pixels/common/retina/TestRetinaService.java b/pixels-common/src/test/java/io/pixelsdb/pixels/common/retina/TestRetinaService.java index 37f5ac347..61d768a77 100644 --- a/pixels-common/src/test/java/io/pixelsdb/pixels/common/retina/TestRetinaService.java +++ b/pixels-common/src/test/java/io/pixelsdb/pixels/common/retina/TestRetinaService.java @@ -19,38 +19,67 @@ */ package io.pixelsdb.pixels.common.retina; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.protobuf.ByteString; import io.pixelsdb.pixels.common.exception.MetadataException; import io.pixelsdb.pixels.common.metadata.MetadataService; -import io.pixelsdb.pixels.common.metadata.domain.Layout; -import io.pixelsdb.pixels.common.metadata.domain.Table; -import io.pixelsdb.pixels.common.metadata.domain.SinglePointIndex; +import io.pixelsdb.pixels.common.metadata.domain.*; import io.pixelsdb.pixels.daemon.MetadataProto; import io.pixelsdb.pixels.index.IndexProto; import io.pixelsdb.pixels.retina.RetinaProto; import io.pixelsdb.pixels.sink.SinkProto; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.RateLimiter; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Assertions; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.LongStream; public class TestRetinaService { - private static String schemaName; - private static String tableName; + private static final int NUM_THREADS = 1; // Number of concurrent threads + private static final double RPC_PER_SECOND = 1000.0; // Desired RPCs per second + private static final int ROWS_PER_RPC = 200; // Number of rows per RPC + private static final double UPDATE_RATIO = 1; // Ratio of updates to total operations + private static final int TEST_DURATION_SECONDS = 180; // Total test duration in seconds + private static final int INITIAL_KEYS = 20000; // Initial keys to pre-populate for updates + private static final AtomicLong primaryKeyCounter = new AtomicLong(0); // Counter for generating unique primary keys + private static final List> threadLocalKeyDeques = new ArrayList<>(NUM_THREADS); + private static final ThreadLocal threadLocalStreamHandler = + ThreadLocal.withInitial(() -> RetinaService.Instance().startUpdateStream()); + + private static String schemaName = "tpch"; + private static String tableName = "nation"; + private static String[] keyColumnNames = {"n_nationkey"}; + private static List colNames = new ArrayList<>(); private static SinglePointIndex index; @BeforeAll - public static void setUp() throws MetadataException + public static void setUp() throws MetadataException, InterruptedException, JsonProcessingException { - schemaName = "tpch"; - tableName = "nation"; MetadataService metadataService = MetadataService.Instance(); - - String keyColumn = "{\"keyColumnIds\":[25]}"; Table table = metadataService.getTable(schemaName, tableName); + List columns = metadataService.getColumns(schemaName, tableName, false); + KeyColumns keyColumns = new KeyColumns(); + for (Column column : columns) + { + colNames.add(column.getName()); + for (String keyColumn : keyColumnNames) + { + if (column.getName().equals(keyColumn)) + { + keyColumns.addKeyColumnIds((int) column.getId()); + } + } + } + String keyColumn = new ObjectMapper().writeValueAsString(keyColumns); Layout layout = metadataService.getLatestLayout(schemaName, tableName); MetadataProto.SinglePointIndex.Builder singlePointIndexBuilder = MetadataProto.SinglePointIndex.newBuilder() .setId(0L) @@ -65,77 +94,234 @@ public static void setUp() throws MetadataException boolean result = metadataService.createSinglePointIndex(singlePointIndex); Assertions.assertTrue(result); index = metadataService.getPrimaryIndex(table.getId()); + + System.out.println("Pre-populating data for UPDATE operations..."); + List initialKeys = LongStream.range(0, INITIAL_KEYS) + .map(i -> primaryKeyCounter.getAndIncrement()) + .boxed() + .collect(Collectors.toList()); + + final CountDownLatch setupLatch = new CountDownLatch(1); + final List allIndexKeys = Collections.synchronizedList(new ArrayList<>()); + updateRecords(initialKeys, null, initialIndexKeys -> { + allIndexKeys.addAll(initialIndexKeys); + setupLatch.countDown(); + }); + setupLatch.await(); + + int keysPerThread = INITIAL_KEYS / NUM_THREADS; + for (int i = 0; i < NUM_THREADS; i++) + { + int startIndex = i * keysPerThread; + int endIndex = (i == NUM_THREADS - 1) ? INITIAL_KEYS : startIndex + keysPerThread; + ConcurrentLinkedDeque threadDeque = new ConcurrentLinkedDeque<>(allIndexKeys.subList(startIndex, endIndex)); + threadLocalKeyDeques.add(threadDeque); + } + System.out.println("Pre-population complete."); } - @Test - public void testStreamUpdateRecord() + /** + * Construct insertion data + * @param i For example, when i = 0, insert: 0 | name_0 | 0 | comment_0 + * @return IndexKey of the inserted record + */ + public static IndexProto.IndexKey constructInsertData(long i, RetinaProto.InsertData.Builder insertDataBuilder) { - String[] colNames = {"key", "name", "region", "comment"}; - try (RetinaService.StreamHandler streamHandler = RetinaService.Instance().startUpdateStream()) + byte[][] cols = new byte[4][]; + cols[0] = ByteBuffer.allocate(8).putLong(i).array(); + cols[1] = ("name_" + i).getBytes(); + cols[2] = ByteBuffer.allocate(8).putLong(i).array(); + cols[3] = ("comment_" + i).getBytes(); + + SinkProto.RowValue.Builder valueBuilder = SinkProto.RowValue.newBuilder(); + for (int j = 0; j < 4; ++j) + { + SinkProto.ColumnValue.Builder columnValueBuilder = SinkProto.ColumnValue.newBuilder() + .setValue(ByteString.copyFrom(cols[j])); + valueBuilder.addValues(columnValueBuilder.build()); + } + Map valueMap = new HashMap<>(); + for (int j = 0; j < colNames.size(); j++) { - for (int i = 0; i < 10; ++i) + valueMap.put(colNames.get(j), valueBuilder.getValues(j)); + } + + int len = keyColumnNames.length; + List keyColumnValues = new ArrayList<>(len); + int keySize = 0; + for (String keyColumnName : keyColumnNames) + { + ByteString value = valueMap.get(keyColumnName).getValue(); + keyColumnValues.add(value); + keySize += value.size(); + } + keySize += Long.BYTES + (len + 1) * 2 + Long.BYTES; + ByteBuffer byteBuffer = ByteBuffer.allocate(keySize); + byteBuffer.putLong(index.getTableId()).putChar(':'); + for (ByteString value : keyColumnValues) + { + byteBuffer.put(value.toByteArray()); + byteBuffer.putChar(':'); + } + byteBuffer.putLong(0); // timestamp + byteBuffer.flip(); + IndexProto.IndexKey indexKey = IndexProto.IndexKey.newBuilder() + .setTimestamp(0) + .setKey(ByteString.copyFrom(byteBuffer)) + .setIndexId(index.getId()) + .setTableId(index.getTableId()) + .build(); + + insertDataBuilder.addIndexKeys(indexKey) + .addColValues(ByteString.copyFrom(cols[0])) + .addColValues(ByteString.copyFrom(cols[1])) + .addColValues(ByteString.copyFrom(cols[2])) + .addColValues(ByteString.copyFrom(cols[3])); + + return indexKey; + } + + /** + * Update records + * @param insertKeys : parameter for constructing the insert data + * @param indexKeys : parameter for constructing the delete data + */ + public static void updateRecords(List insertKeys, List indexKeys, + Consumer> onCompleteCallback) + { + List result = new ArrayList<>(); + + List tableUpdateData = new ArrayList<>(); + RetinaProto.TableUpdateData.Builder tableUpdateDataBuilder = RetinaProto.TableUpdateData.newBuilder() + .setTableName(tableName).setPrimaryIndexId(index.getId()).setTimestamp(0L); + + if (insertKeys != null) + { + for (Long insertKey : insertKeys) { - List tableUpdateData = new ArrayList<>(); - RetinaProto.TableUpdateData.Builder tableUpdateDataBuilder = RetinaProto.TableUpdateData.newBuilder() - .setTableName(tableName).setPrimaryIndexId(index.getId()); - byte[][] cols = new byte[4][]; - cols[0] = Integer.toString(i).getBytes(); - cols[1] = ("name_" + i).getBytes(); - cols[2] = Integer.toString(i).getBytes(); - cols[3] = ("comment_" + i).getBytes(); - - SinkProto.RowValue.Builder valueBuilder = SinkProto.RowValue.newBuilder(); - for (int j = 0; j < 4; ++j) - { - SinkProto.ColumnValue.Builder columnValueBuilder = SinkProto.ColumnValue.newBuilder() - .setValue(ByteString.copyFrom(cols[j])); - valueBuilder.addValues(columnValueBuilder.build()); - } - Map valueMap = new HashMap<>(); - for (int j = 0; j < colNames.length; j++) { - valueMap.put(colNames[i], valueBuilder.getValues(j)); - } + RetinaProto.InsertData.Builder insertDataBuilder = RetinaProto.InsertData.newBuilder(); + IndexProto.IndexKey indexKey = constructInsertData(insertKey, insertDataBuilder); + result.add(indexKey); + tableUpdateDataBuilder.addInsertData(insertDataBuilder.build()); + } + } + + if (indexKeys != null) + { + for (IndexProto.IndexKey indexKey : indexKeys) + { + RetinaProto.DeleteData.Builder deleteDataBuilder = RetinaProto.DeleteData.newBuilder(); + deleteDataBuilder.addIndexKeys(indexKey); + tableUpdateDataBuilder.addDeleteData(deleteDataBuilder.build()); + } + } + + tableUpdateData.add(tableUpdateDataBuilder.build()); + + RetinaService.StreamHandler streamHandler = threadLocalStreamHandler.get(); + CompletableFuture future = streamHandler.updateRecord(schemaName, tableUpdateData); + + future.whenComplete(((response, throwable) -> + { + if (throwable == null) + { + onCompleteCallback.accept(result); + } else + { + System.err.println("Update failed: " + throwable); + } + })); + } - List keyColumnNames = new LinkedList<>(); - keyColumnNames.add("key"); // 'key' is the primary key's name - int len = keyColumnNames.size(); - List keyColumnValues = new ArrayList<>(len); - int keySize = 0; - for (String keyColumnName : keyColumnNames) + @Test + public void configurableLoadTest() throws InterruptedException + { + System.out.println("======================================================"); + System.out.printf("Starting Load Test with configuration:\n"); + System.out.printf(" - Threads: %d\n", NUM_THREADS); + System.out.printf(" - Target RPCs/sec: %.2f\n", RPC_PER_SECOND); + System.out.printf(" - Rows per RPC: %d\n", ROWS_PER_RPC); + System.out.printf(" - Update Ratio: %.2f\n", UPDATE_RATIO); + System.out.printf(" - Duration: %d seconds\n", TEST_DURATION_SECONDS); + System.out.println("======================================================"); + + ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS); + final RateLimiter rateLimiter = RateLimiter.create(RPC_PER_SECOND); + final AtomicLong totalOperations = new AtomicLong(0); + final AtomicLong insertCount = new AtomicLong(0); + final AtomicLong deleteCount = new AtomicLong(0); + final CountDownLatch latch = new CountDownLatch(NUM_THREADS); + final long testEndTime = System.currentTimeMillis() + TEST_DURATION_SECONDS * 1000L; + + long startTime = System.currentTimeMillis(); + + for (int i = 0; i < NUM_THREADS; i++) + { + final ConcurrentLinkedDeque myKeys = threadLocalKeyDeques.get(i); + + executor.submit(() -> { + try { - ByteString value = valueMap.get(keyColumnName).getValue(); - keyColumnValues.add(value); - keySize += value.size(); - } - keySize += Long.BYTES + (len + 1) * 2 + Long.BYTES; - ByteBuffer byteBuffer = ByteBuffer.allocate(keySize); - byteBuffer.putLong(index.getTableId()).putChar(':'); - for (ByteString value : keyColumnValues) + while (System.currentTimeMillis() < testEndTime) + { + rateLimiter.acquire(); // Wait for token + + List keysToInsert = new ArrayList<>(); + List keysToDelete = new ArrayList<>(); + + for (int j = 0; j < ROWS_PER_RPC; j++) + { + // Decide whether to perform update or insert based on ratio + if (ThreadLocalRandom.current().nextDouble() <= UPDATE_RATIO && !myKeys.isEmpty()) + { + // Perform update: delete an old key and insert a new key + IndexProto.IndexKey keyToDelete = myKeys.poll(); + if (keyToDelete != null) + { + keysToDelete.add(keyToDelete); + keysToInsert.add(primaryKeyCounter.getAndIncrement()); + } else + { + // If no key is available, perform pure insert + keysToInsert.add(primaryKeyCounter.getAndIncrement()); + } + } else + { + // Perform pure insert + keysToInsert.add(primaryKeyCounter.getAndIncrement()); + } + } + + if (!keysToInsert.isEmpty() || !keysToDelete.isEmpty()) + { + updateRecords(keysToInsert, keysToDelete, newKeys -> { + myKeys.addAll(newKeys); // Add new keys back to the deque + totalOperations.incrementAndGet(); + insertCount.addAndGet(keysToInsert.size()); + deleteCount.addAndGet(keysToDelete.size()); + }); + } + } + } finally { - byteBuffer.put(value.toByteArray()); - byteBuffer.putChar(':'); + latch.countDown(); } - byteBuffer.putLong(0); // timestamp - byteBuffer.flip(); - IndexProto.IndexKey indexKey = IndexProto.IndexKey.newBuilder() - .setTimestamp(0) - .setKey(ByteString.copyFrom(byteBuffer)) - .setIndexId(index.getId()) - .setTableId(index.getTableId()) - .build(); - - RetinaProto.InsertData.Builder insertDataBuilder = RetinaProto.InsertData.newBuilder() - .addColValues(ByteString.copyFrom(cols[0])) - .addColValues(ByteString.copyFrom(cols[1])) - .addColValues(ByteString.copyFrom(cols[2])) - .addColValues(ByteString.copyFrom(cols[3])) - .addIndexKeys(indexKey); - - tableUpdateDataBuilder.addInsertData(insertDataBuilder.build()); - tableUpdateDataBuilder.setTimestamp(0L); - tableUpdateData.add(tableUpdateDataBuilder.build()); - streamHandler.updateRecord(schemaName, tableUpdateData); - } + }); } + + latch.await(); // Wait for all threads to finish + long endTime = System.currentTimeMillis(); + long durationMillis = endTime - startTime; + executor.shutdown(); + + System.out.println("======================================================"); + System.out.println("Test Finished."); + System.out.printf(" - Total execution time: %.3f seconds\n", durationMillis / 1000.0); + System.out.printf(" - Total RPC calls: %d\n", totalOperations.get()); + System.out.printf(" - Actual TPS (RPCs/sec): %.2f\n", totalOperations.get() / (durationMillis / 1000.0)); + System.out.println("------------------------------------------------------"); + System.out.printf(" - Total rows inserted: %d\n", insertCount.get()); + System.out.printf(" - Total rows deleted: %d\n", deleteCount.get()); + System.out.println("======================================================"); } } diff --git a/pixels-daemon/pom.xml b/pixels-daemon/pom.xml index daa90ec32..5edeedb17 100644 --- a/pixels-daemon/pom.xml +++ b/pixels-daemon/pom.xml @@ -61,11 +61,11 @@ io.pixelsdb - pixels-index-memory + pixels-index-main-sqlite io.pixelsdb - pixels-index-main-sqlite + pixels-index-memory diff --git a/pixels-index/pixels-index-rocksdb/pom.xml b/pixels-index/pixels-index-rocksdb/pom.xml index ad9192bbf..b03f5d656 100644 --- a/pixels-index/pixels-index-rocksdb/pom.xml +++ b/pixels-index/pixels-index-rocksdb/pom.xml @@ -39,11 +39,33 @@ test + + io.pixelsdb + pixels-index-memory + test + + io.grpc grpc-testing test + + + io.grpc + grpc-netty-shaded + test + + + io.grpc + grpc-protobuf + test + + + io.grpc + grpc-stub + test + diff --git a/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRetinaTrace.java b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRetinaTrace.java new file mode 100644 index 000000000..54f61e076 --- /dev/null +++ b/pixels-index/pixels-index-rocksdb/src/test/java/io/pixelsdb/pixels/index/rocksdb/TestRetinaTrace.java @@ -0,0 +1,329 @@ +/* + * Copyright 2025 PixelsDB. + * + * This file is part of Pixels. + * + * Pixels is free software: you can redistribute it and/or modify + * it under the terms of the Affero GNU General Public License as + * published by the Free Software Foundation, either version 3 of + * the License, or (at your option) any later version. + * + * Pixels is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Affero GNU General Public License for more details. + * + * You should have received a copy of the Affero GNU General Public + * License along with Pixels. If not, see + * . + */ +package io.pixelsdb.pixels.index.rocksdb; + +import com.google.protobuf.ByteString; +import io.pixelsdb.pixels.common.exception.IndexException; +import io.pixelsdb.pixels.common.index.IndexService; +import io.pixelsdb.pixels.common.index.IndexServiceProvider; +import io.pixelsdb.pixels.index.IndexProto; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.BeforeAll; + +import java.io.BufferedReader; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * Test using the actual index load and update during Retina runtime. + * Primarily uses the LocalIndexService's deletePrimaryIndexEntry and putPrimaryIndexEntry interfaces. + * PUT format: tableId, indexId, key, timestamp, rowId, fileId, rgId, rgRowOffset + * DEL format: tableId, indexId, key, timestamp + * ***************************************************************************** + * Note + * 1. The key is an integer here. + * 2. Launching the metadataService separately in tests is cumbersome; + * it's best to start the pixels-related services externally. + * 3. Insert records used for tracing into the SINGLE_POINT_INDICES table + * in MySQL pixels_metadata. + * e.g., + * ```sql + * INSERT INTO SINGLE_POINT_INDICES + * (SPI_ID, SPI_KEY_COLUMNS, SPI_PRIMARY, SPI_UNIQUE, SPI_INDEX_SCHEME, TBLS_TBL_ID, SCHEMA_VERSIONS_SV_ID) + * VALUES + * (403, '{"keyColumnIds":[403]}', 1, 1, 'rocksdb', 3, 3), + * (404, '{"keyColumnIds":[404]}', 1, 1, 'rocksdb', 3, 3); + * ``` + * ***************************************************************************** + */ +public class TestRetinaTrace +{ + private static final int THREAD_COUNT = 16; + + // Trace load path: only put operations + private static final String loadPath = "/home/gengdy/data/index/index.load.trace"; + + // Trace update path: both put and delete operations + private static final String updatePath = "/home/gengdy/data/index/index.update.trace"; + + private static final IndexService indexService = IndexServiceProvider.getService(IndexServiceProvider.ServiceMode.local); + + /** + * Load the initial data into the index + */ + @BeforeAll + public static void prepare() + { + System.out.println("Preparing data from loadPath into index..."); + long count = 0; + try (BufferedReader reader = Files.newBufferedReader(Paths.get(loadPath))) + { + String line; + while ((line = reader.readLine()) != null) + { + count++; + String[] parts = line.split("\\t"); + PutOperation putOperation = new PutOperation(parts); + IndexProto.PrimaryIndexEntry entry = (IndexProto.PrimaryIndexEntry) putOperation.toProto(); + indexService.putPrimaryIndexEntry(entry); + } + } catch (IOException e) + { + throw new RuntimeException("Failed to prepare data from loadPath", e); + } catch (IndexException e) + { + throw new RuntimeException("Failed to put index entry during prepare", e); + } + System.out.println("Finished preparing " + count * THREAD_COUNT + " records into index."); + } + + private interface TraceOperation + { + int getBucket(); + Object toProto(); + } + + private static class PutOperation implements TraceOperation + { + final long tableId, indexId, timestamp, rowId, fileId; + final ByteString key; + final int rgId, rgRowOffset; + final int bucket; + + PutOperation(String[] parts) + { + if (parts.length != 9) + { + throw new RuntimeException("Invalid PUT operation: " + String.join("\t", parts)); + } + this.tableId = Long.parseLong(parts[1]); + this.indexId = Long.parseLong(parts[2]); + int keyInt = Integer.parseInt(parts[3]); + this.key = ByteString.copyFrom(ByteBuffer.allocate(Integer.BYTES) + .putInt(keyInt).array()); + this.timestamp = Long.parseLong(parts[4]); + this.rowId = Long.parseLong(parts[5]); + this.fileId = Long.parseLong(parts[6]); + this.rgId = Integer.parseInt(parts[7]); + this.rgRowOffset = Integer.parseInt(parts[8]); + this.bucket = keyInt % THREAD_COUNT; + } + + @Override + public int getBucket() + { + return bucket; + } + + @Override + public Object toProto() + { + IndexProto.IndexKey indexKey = IndexProto.IndexKey.newBuilder() + .setTableId(tableId) + .setIndexId(indexId) + .setKey(key) + .setTimestamp(timestamp) + .build(); + + IndexProto.RowLocation rowLocation = IndexProto.RowLocation.newBuilder() + .setFileId(fileId) + .setRgId(rgId) + .setRgRowOffset(rgRowOffset) + .build(); + + return IndexProto.PrimaryIndexEntry.newBuilder() + .setIndexKey(indexKey) + .setRowId(rowId) + .setRowLocation(rowLocation) + .build(); + } + } + + private static class DeleteOperation implements TraceOperation + { + final long tableId, indexId, timestamp; + final ByteString key; + final int bucket; + + DeleteOperation(String[] parts) + { + if (parts.length != 5) + { + throw new RuntimeException("Invalid DEL operation: " + String.join("\t", parts)); + } + this.tableId = Long.parseLong(parts[1]); + this.indexId = Long.parseLong(parts[2]); + int keyInt = Integer.parseInt(parts[3]); + this.key = ByteString.copyFrom(ByteBuffer.allocate(Integer.BYTES) + .putInt(keyInt).array()); + this.timestamp = Long.parseLong(parts[4]); + this.bucket = keyInt % THREAD_COUNT; + } + + @Override + public int getBucket() + { + return bucket; + } + + @Override + public Object toProto() + { + return IndexProto.IndexKey.newBuilder() + .setTableId(tableId) + .setIndexId(indexId) + .setKey(key) + .setTimestamp(timestamp) + .build(); + } + } + + private static class IndexWorker implements Runnable + { + private final List protoOperations; + + public IndexWorker(List protoOperations) + { + this.protoOperations = protoOperations; + } + + @Override + public void run() + { + try + { + for (Object proto : protoOperations) + { + if (proto instanceof IndexProto.PrimaryIndexEntry) + { + indexService.putPrimaryIndexEntry((IndexProto.PrimaryIndexEntry) proto); + } else + { + indexService.deletePrimaryIndexEntry((IndexProto.IndexKey) proto); + } + } + } catch (IndexException e) + { + throw new RuntimeException("Index operation failed in worker thread", e); + } + } + } + + @Test + public void testIndex() + { + System.out.println("Loading baseTrace..."); + List operations = new ArrayList<>(); + long putCount = 0, delCount = 0; + try (BufferedReader reader = Files.newBufferedReader(Paths.get(updatePath))) + { + String line; + while ((line = reader.readLine()) != null) + { + String[] parts = line.split("\\t"); + if (parts.length < 1) + { + continue; + } + String opType = parts[0]; + if (opType.equals("P")) + { + putCount++; + operations.add(new PutOperation(parts)); + } else if (opType.equals("D")) + { + delCount++; + operations.add(new DeleteOperation(parts)); + } else + { + throw new RuntimeException("Unknown operation type: " + opType); + } + } + } catch (IOException e) + { + throw new RuntimeException("Failed to read update trace file", e); + } catch (NumberFormatException e) + { + throw new RuntimeException("Malformed number in update trace", e); + } + System.out.println("Loaded " + operations.size() + " operations from update trace."); + + System.out.println("Generating workloads for " + THREAD_COUNT + " threads..."); + List> threadProtoOperations = IntStream.range(0, THREAD_COUNT) + .mapToObj(i -> new ArrayList()) + .collect(Collectors.toList()); + + for (TraceOperation op : operations) + { + threadProtoOperations.get(op.getBucket()).add(op.toProto()); + } + System.out.println("Finished pre-building protobuf objects."); + + System.out.println("Starting index performance test with " + THREAD_COUNT + " threads..."); + ExecutorService executor = Executors.newFixedThreadPool(THREAD_COUNT); + List> futures = new ArrayList<>(); + long startTime = System.currentTimeMillis(); + for (List threadProtoOps : threadProtoOperations) + { + futures.add(executor.submit(new IndexWorker(threadProtoOps))); + } + for (Future f : futures) + { + try + { + f.get(); + } catch (Exception e) + { + throw new RuntimeException("Thread execution failed", e); + } + } + long endTime = System.currentTimeMillis(); + + long totalDurationNanos = endTime - startTime; + double totalDurationSeconds = totalDurationNanos / 1000.0; + long totalOps = putCount + delCount; + + double putThroughput = (totalDurationSeconds > 0) ? (putCount / totalDurationSeconds) : 0; + double deleteThroughput = (totalDurationSeconds > 0) ? (delCount / totalDurationSeconds) : 0; + double totalThroughput = (totalDurationSeconds > 0) ? (totalOps / totalDurationSeconds) : 0; + + System.out.println("\n--- Index Performance Test Results ---"); + System.out.printf("Thread Count: %d, Mode: Single Entry\n", THREAD_COUNT); + System.out.printf("Total test time: %.3f seconds\n", totalDurationSeconds); + System.out.println("------------------------------------"); + System.out.printf("Total PUT operations: %,d\n", putCount); + System.out.printf("Total DELETE operations: %,d\n", delCount); + System.out.printf("Total operations: %,d\n", totalOps); + System.out.println("------------------------------------"); + System.out.printf("PUT throughput: %,.2f ops/sec\n", putThroughput); + System.out.printf("DELETE throughput: %,.2f ops/sec\n", deleteThroughput); + System.out.printf("Total throughput: %,.2f ops/sec\n", totalThroughput); + System.out.println("------------------------------------\n"); + } +} diff --git a/pixels-retina/pom.xml b/pixels-retina/pom.xml index 26a98d45c..ff8937b79 100644 --- a/pixels-retina/pom.xml +++ b/pixels-retina/pom.xml @@ -51,6 +51,11 @@ pixels-index-rockset true + + io.pixelsdb + pixels-index-memory + true + net.java.dev.jna diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/FileWriterManager.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/FileWriterManager.java index 2c3fb23b8..f82f49976 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/FileWriterManager.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/FileWriterManager.java @@ -24,8 +24,6 @@ import io.pixelsdb.pixels.common.metadata.MetadataService; import io.pixelsdb.pixels.common.metadata.domain.File; import io.pixelsdb.pixels.common.metadata.domain.Path; -import io.pixelsdb.pixels.common.physical.PhysicalReader; -import io.pixelsdb.pixels.common.physical.PhysicalReaderUtil; import io.pixelsdb.pixels.common.physical.Storage; import io.pixelsdb.pixels.common.utils.DateUtil; import io.pixelsdb.pixels.core.PixelsWriter; diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/MemTable.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/MemTable.java index f0bd2d4ba..47ff74c19 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/MemTable.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/MemTable.java @@ -19,8 +19,6 @@ */ package io.pixelsdb.pixels.retina; -import static com.google.common.base.Preconditions.checkArgument; - import io.pixelsdb.pixels.common.exception.RetinaException; import io.pixelsdb.pixels.core.TypeDescription; import io.pixelsdb.pixels.core.vector.VectorizedRowBatch; @@ -58,25 +56,18 @@ public MemTable(long id, TypeDescription schema, int size, int mode, * @return rowOffset * @throws RetinaException */ - public int add(byte[][] values, long timestamp) throws RetinaException + public synchronized int add(byte[][] values, long timestamp) throws RetinaException { - int columnCount = schema.getChildren().size(); - checkArgument(values.length == columnCount, - "Column values count does not match schema column count"); - - synchronized (this) + if (isFull()) { - if (isFull()) - { - return -1; - } - for (int i = 0; i < values.length; ++i) - { - this.rowBatch.cols[i].add(values[i]); - } - this.rowBatch.cols[columnCount].add(timestamp); - return this.rowBatch.size++; + return -1; + } + for (int i = 0; i < values.length; ++i) + { + this.rowBatch.cols[i].add(values[i]); } + this.rowBatch.cols[schema.getChildren().size()].add(timestamp); + return this.rowBatch.size++; } public long getId() diff --git a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriterBuffer.java b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriterBuffer.java index 6dde1d162..69f1d8636 100644 --- a/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriterBuffer.java +++ b/pixels-retina/src/main/java/io/pixelsdb/pixels/retina/PixelsWriterBuffer.java @@ -176,8 +176,7 @@ public PixelsWriterBuffer(long tableId, TypeDescription schema, Path targetOrder */ public long addRow(byte[][] values, long timestamp, IndexProto.RowLocation.Builder builder) throws RetinaException { - int columnCount = this.schema.getChildren().size(); - checkArgument(values.length == columnCount, + checkArgument(values.length == this.schema.getChildren().size(), "Column values count does not match schema column count"); MemTable currentMemTable = null; @@ -197,7 +196,7 @@ public long addRow(byte[][] values, long timestamp, IndexProto.RowLocation.Build switchMemTable(); } } - int rgRowOffset = (int) (currentMemTable.getStartIndex() + rowOffset); + int rgRowOffset = currentMemTable.getStartIndex() + rowOffset; if(rgRowOffset < 0) { throw new RetinaException("Expect rgRowOffset >= 0, get " + rgRowOffset); @@ -373,7 +372,7 @@ private void startFlushMinioToDiskScheduler() { throw new RuntimeException(e); } - }, 0, 5, TimeUnit.SECONDS); + }, 0, 30, TimeUnit.SECONDS); } /** diff --git a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriterBuffer.java b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriterBuffer.java index e06e3e206..bb4c095ed 100644 --- a/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriterBuffer.java +++ b/pixels-retina/src/test/java/io/pixelsdb/pixels/retina/TestPixelsWriterBuffer.java @@ -25,13 +25,14 @@ import org.junit.Before; import org.junit.Test; +import java.lang.management.ManagementFactory; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; - public class TestPixelsWriterBuffer { private List columnNames = new ArrayList<>(); @@ -40,41 +41,48 @@ public class TestPixelsWriterBuffer Path targetOrderDirPath; Path targetCompactDirPath; PixelsWriterBuffer buffer; - @Before public void setup() { targetOrderDirPath = new Path(); - targetOrderDirPath.setUri("file:///home/gengdy/data/test/v-0-ordered"); - targetOrderDirPath.setId(21); + targetOrderDirPath.setUri("file:///home/gengdy/data/tpch/1g/customer/v-0-ordered"); + targetOrderDirPath.setId(1); // path id get from mysql `PATHS` table targetCompactDirPath = new Path(); - targetCompactDirPath.setUri("file:///home/gengdy/data/test/v-0-compact"); - targetCompactDirPath.setId(22); + targetCompactDirPath.setUri("file:///home/gengdy/data/tpch/1g/customer/v-0-compact"); + targetCompactDirPath.setId(2); // get from mysql `PATHS` table try { columnNames.add("id"); columnNames.add("name"); - columnTypes.add("int"); columnTypes.add("int"); schema = TypeDescription.createSchemaFromStrings(columnNames, columnTypes); - buffer = new PixelsWriterBuffer(0L, schema, targetOrderDirPath, targetCompactDirPath); + buffer = new PixelsWriterBuffer(0L, schema, targetOrderDirPath, targetCompactDirPath); // table id get from mysql `TBLS` table } catch (Exception e) { System.out.println("setup error: " + e); } } - @Test public void testConcurrentWriteOperations() { - int numThreads = 1; - int numRowsPerThread = 10241; + +// // print pid if you want to attach a profiler like async-profiler or YourKit +// try +// { +// System.out.println(Long.parseLong(ManagementFactory.getRuntimeMXBean().getName().split("@")[0])); +// Thread.sleep(10000); +// } catch (InterruptedException e) +// { +// throw new RuntimeException(e); +// } + + int numThreads = 1000; + int numRowsPerThread = 100000; CountDownLatch startLatch = new CountDownLatch(1); CountDownLatch completionLatch = new CountDownLatch(numThreads); AtomicBoolean hasError = new AtomicBoolean(false); - ExecutorService executor = Executors.newFixedThreadPool(numThreads); for (int t = 0; t < numThreads; ++t) { @@ -87,8 +95,8 @@ public void testConcurrentWriteOperations() for (int i = 0; i < numRowsPerThread; ++i) { IndexProto.RowLocation.Builder builder = IndexProto.RowLocation.newBuilder(); - values[0] = String.valueOf(threadId * numRowsPerThread + i).getBytes(); - values[1] = String.valueOf(threadId * numRowsPerThread + i + 1).getBytes(); + values[0] = ByteBuffer.allocate(4).putInt(threadId * numRowsPerThread + i).array(); + values[1] = ByteBuffer.allocate(4).putInt(threadId * numRowsPerThread + i + 1).array(); buffer.addRow(values, threadId, builder); } } catch (Exception e) @@ -101,16 +109,15 @@ public void testConcurrentWriteOperations() } }); } - startLatch.countDown(); try { completionLatch.await(); - Thread.sleep(10000); - buffer.close(); - } catch (Exception e) - { - System.out.println("error: " + e); + Thread.sleep(10000); // wait for async flush to complete + buffer.close(); + } catch (Exception e) + { + System.out.println("error: " + e); + } } } -} diff --git a/pom.xml b/pom.xml index 4fa87c118..c02eaa1b3 100644 --- a/pom.xml +++ b/pom.xml @@ -289,12 +289,12 @@ io.pixelsdb - pixels-index-memory + pixels-index-main-sqlite ${project.version} io.pixelsdb - pixels-index-main-sqlite + pixels-index-memory ${project.version}