Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions fix_remaining_conflicts.ps1
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# 修复剩余的Git合并冲突
$filePath = "hbase-common\src\main\java\org\apache\hadoop\hbase\CellComparatorImpl.java"
$content = Get-Content $filePath -Raw

# 修复所有剩余的冲突模式
$content = $content -replace '<<<<<<< HEAD\s*\n\s*private int compareBBKV\(final ByteBufferKeyValue left, final ByteBufferKeyValue right\) \{\s*\n=======\s*\n\s*private static int compareBBKV\(final ByteBufferKeyValue left, final ByteBufferKeyValue right\) \{\s*\n>>>>>>> rvv-optimization', 'private static int compareBBKV(final ByteBufferKeyValue left, final ByteBufferKeyValue right) {'

$content = $content -replace '<<<<<<< HEAD\s*\n\s*private int compareKVVsBBKV\(final KeyValue left, final ByteBufferKeyValue right\) \{\s*\n=======\s*\n\s*private static int compareKVVsBBKV\(final KeyValue left, final ByteBufferKeyValue right\) \{\s*\n>>>>>>> rvv-optimization', 'private static int compareKVVsBBKV(final KeyValue left, final ByteBufferKeyValue right) {'

# 修复方法调用冲突
$content = $content -replace '<<<<<<< HEAD\s*\n\s*diff = ByteBufferUtils\.compareTo\(left\.getRowByteBuffer\(\), left\.getRowPosition\(\), leftRowLength,\s*\n\s*right\.getRowByteBuffer\(\), right\.getRowPosition\(\), rightRowLength\);\s*\n=======\s*\n\s*diff = ByteBufferUtils\.compareToRvv\(left\.getRowByteBuffer\(\), left\.getRowPosition\(\), leftRowLength,\s*\n\s*right\.getRowByteBuffer\(\), right\.getRowPosition\(\), rightRowLength\);\s*\n>>>>>>> rvv-optimization', 'diff = ByteBufferUtils.compareToRvv(left.getRowByteBuffer(), left.getRowPosition(), leftRowLength, right.getRowByteBuffer(), right.getRowPosition(), rightRowLength);'

# 修复其他常见的冲突模式
$content = $content -replace '<<<<<<< HEAD\s*\n\s*.*?\s*\n=======\s*\n\s*(.*?)\s*\n>>>>>>> rvv-optimization', '$1'

# 保存文件
Set-Content $filePath $content -Encoding UTF8

Write-Host "冲突修复完成!"

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import org.apache.yetus.audience.InterfaceStability;

/**
* Compare two HBase cells inner store, skip compare family for better performance. Important!!! we
* should not make fake cell with fake family which length greater than zero inner store, otherwise
* this optimization cannot be used.
* Compare two HBase cells inner store, skip compare family for better performance. Important!!!
* We should not make fake cell with fake family which length greater than zero inner store,
* otherwise this optimization cannot be used.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
Expand All @@ -33,32 +33,9 @@ public class InnerStoreCellComparator extends CellComparatorImpl {
private static final long serialVersionUID = 8186411895799094989L;

public static final InnerStoreCellComparator INNER_STORE_COMPARATOR =
new InnerStoreCellComparator();
new InnerStoreCellComparator();

@Override
protected int compareFamilies(Cell left, int leftFamilyLength, Cell right,
int rightFamilyLength) {
return leftFamilyLength - rightFamilyLength;
}

@Override
protected int compareFamilies(KeyValue left, int leftFamilyPosition, int leftFamilyLength,
KeyValue right, int rightFamilyPosition, int rightFamilyLength) {
return leftFamilyLength - rightFamilyLength;
}

@Override
protected int compareFamilies(ByteBufferKeyValue left, int leftFamilyPosition,
int leftFamilyLength, ByteBufferKeyValue right, int rightFamilyPosition,
int rightFamilyLength) {
return leftFamilyLength - rightFamilyLength;
}

@Override
protected int compareFamilies(KeyValue left, int leftFamilyPosition, int leftFamilyLength,
ByteBufferKeyValue right, int rightFamilyPosition, int rightFamilyLength) {
return leftFamilyLength - rightFamilyLength;
}
// 不再重写 compareFamilies,由父类 CellComparatorImpl 处理 RVV 优化逻辑

/**
* Utility method that makes a guess at comparator to use based off passed tableName. Use in
Expand All @@ -76,7 +53,7 @@ public static CellComparator getInnerStoreCellComparator(TableName tableName) {
*/
public static CellComparator getInnerStoreCellComparator(byte[] tableName) {
return Bytes.equals(tableName, TableName.META_TABLE_NAME.toBytes())
? MetaCellComparator.META_COMPARATOR
: InnerStoreCellComparator.INNER_STORE_COMPARATOR;
? MetaCellComparator.META_COMPARATOR
: InnerStoreCellComparator.INNER_STORE_COMPARATOR;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.apache.hadoop.hbase.util;

import java.nio.ByteBuffer;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public class RVVByteBufferUtils {

static {
try {
System.loadLibrary("scan_rvv_jni");
} catch (Throwable t) {
// ignore; use availability checks
}
}

public static boolean available() {
return ScanRVV.available();
}

public static native int compareToRvv(ByteBuffer a, int aOffset, int aLen,
ByteBuffer b, int bOffset, int bLen);

public static native int commonPrefixRvv(byte[] left, int leftOffset,
byte[] right, int rightOffset,
int maxLen);

public static native int findCommonPrefixRvv(ByteBuffer a, int aOffset, int aLen,
ByteBuffer b, int bOffset, int bLen);

public static native int findCommonPrefixRvv(ByteBuffer a, int aOffset, int aLen,
byte[] b, int bOffset, int bLen);

public static byte[] readBytesRvv(ByteBuffer buf) {
int len = buf.remaining();
byte[] dst = new byte[len];
if (!available()) {
if (buf.hasArray()) {
System.arraycopy(buf.array(), buf.arrayOffset() + buf.position(), dst, 0, len);
} else {
int pos = buf.position();
for (int i = 0; i < len; i++) {
dst[i] = buf.get(pos + i);
}
}
return dst;
}
if (buf.hasArray()) {
System.arraycopy(buf.array(), buf.arrayOffset() + buf.position(), dst, 0, len);
} else {
ScanRVV.rvvMemcpy(dst, 0, buf, buf.position(), len);
}
return dst;
}
}
130 changes: 130 additions & 0 deletions hbase-common/src/main/java/org/apache/hadoop/hbase/util/ScanRVV.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package org.apache.hadoop.hbase.util;

import java.nio.ByteBuffer;

import org.apache.yetus.audience.InterfaceAudience;

/**
* RISC-V RVV 向量化扫描优化工具类
*/
@InterfaceAudience.Private
public class ScanRVV {

private static boolean rvvEnabled = false;

static {
try {
System.loadLibrary("scan_rvv_jni");
rvvEnabled = true;
} catch (UnsatisfiedLinkError e) {
rvvEnabled = false;
} catch (Throwable t) {
rvvEnabled = false;
}
}

public static native boolean isEnabled();

public static void setEnabled(boolean enabled) {
rvvEnabled = enabled;
}

public static boolean available() {
if (!rvvEnabled) {
return false;
}
try {
return isEnabled();
} catch (Throwable t) {
return false;
}
}

public static native int compareCells(byte[] aKey, int aLen, byte[] bKey, int bLen);

public static native int compareKeyForNextRow(byte[] indexedKey, int idxLen, byte[] curKey, int curLen);

public static native int compareKeyForNextColumn(byte[] indexedKey, int idxLen, byte[] curKey, int curLen);

public static native int memcmp(byte[] a, int offsetA, int lengthA,
byte[] b, int offsetB, int lengthB);

public static native boolean prefixMatch(byte[] a, int offsetA,
byte[] b, int offsetB,
int prefixLen);

public static int memcmp(byte[] a, byte[] b, int length) {
if (a == null || b == null) {
throw new IllegalArgumentException("Input arrays cannot be null");
}
if (length < 0) {
throw new IllegalArgumentException("Length cannot be negative");
}
return memcmp(a, 0, length, b, 0, length);
}

public static boolean prefixMatch(byte[] a, byte[] b, int prefixLen) {
if (a == null || b == null) {
return false;
}
if (prefixLen <= 0 || prefixLen > a.length || prefixLen > b.length) {
return false;
}
return prefixMatch(a, 0, b, 0, prefixLen);
}

public static native int rvvCommonPrefix(byte[] a, int offsetA, int lengthA,
byte[] b, int offsetB, int lengthB);

public static native void rvvMemcpy(byte[] dst, int dstOffset,
byte[] src, int srcOffset,
int length);

public static void rvvMemcpy(byte[] dst, int dstOffset,
ByteBuffer src, int srcOffset,
int length) {
if (dst == null || src == null) {
throw new IllegalArgumentException("Input parameters cannot be null");
}
if (length < 0) {
throw new IllegalArgumentException("Length cannot be negative");
}

if (src.hasArray()) {
System.arraycopy(src.array(), src.arrayOffset() + srcOffset, dst, dstOffset, length);
} else {
byte[] tmp = new byte[length];
src.position(srcOffset);
src.get(tmp);
rvvMemcpy(dst, dstOffset, tmp, 0, length);
}
}

public static byte[] copyToArray(byte[] src, int offset, int length) {
if (src == null) {
throw new IllegalArgumentException("Source array cannot be null");
}
if (offset < 0 || length < 0 || offset + length > src.length) {
throw new IllegalArgumentException("Invalid offset or length");
}

byte[] dst = new byte[length];
ScanRVV.rvvMemcpy(dst, 0, src, offset, length);
return dst;
}

public static byte[] copyToArray(ByteBuffer src) {
if (src == null) {
throw new IllegalArgumentException("Source ByteBuffer cannot be null");
}

int len = src.remaining();
byte[] dst = new byte[len];
if (src.hasArray()) {
System.arraycopy(src.array(), src.arrayOffset() + src.position(), dst, 0, len);
} else {
rvvMemcpy(dst, 0, src, src.position(), len);
}
return dst;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,13 @@ public class Lz4Compressor implements CanReinit, Compressor {
protected boolean finish, finished;
protected long bytesRead, bytesWritten;

private final boolean useNative = Lz4Native.available();

Lz4Compressor(int bufferSize) {
compressor = LZ4Factory.fastestInstance().fastCompressor();
this.bufferSize = bufferSize;
this.inBuf = ByteBuffer.allocate(bufferSize);
this.outBuf = ByteBuffer.allocate(bufferSize);
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
this.outBuf.position(bufferSize);
}

Expand All @@ -61,37 +63,58 @@ public int compress(byte[] b, int off, int len) throws IOException {
if (inBuf.position() > 0) {
inBuf.flip();
int uncompressed = inBuf.remaining();
int needed = maxCompressedLength(uncompressed);
// Can we decompress directly into the provided array?
ByteBuffer writeBuffer;
boolean direct = false;
if (len <= needed) {
writeBuffer = ByteBuffer.wrap(b, off, len);
direct = true;
} else {
// If we don't have enough capacity in our currently allocated output buffer,
// allocate a new one which does.

if (useNative && inBuf.isDirect() && outBuf.isDirect()) {
// 如果你现在用的是 heap ByteBuffer,改成 direct 分配(见 reset/reinit 部分)
int needed = Lz4Native.maxCompressedLength(uncompressed);
if (outBuf.capacity() < needed) {
needed = CompressionUtil.roundInt2(needed);
outBuf = ByteBuffer.allocate(needed);
needed = org.apache.hadoop.hbase.io.compress.CompressionUtil.roundInt2(needed);
outBuf = ByteBuffer.allocateDirect(needed);
} else {
outBuf.clear();
}
writeBuffer = outBuf;
}
final int oldPos = writeBuffer.position();
compressor.compress(inBuf, writeBuffer);
final int written = writeBuffer.position() - oldPos;
bytesWritten += written;
inBuf.clear();
finished = true;
if (!direct) {
outBuf.flip();
int written = Lz4Native.compressDirect(inBuf, inBuf.position(), uncompressed,
outBuf, outBuf.position(), outBuf.remaining());
if (written < 0)
throw new IOException("LZ4 native compress failed: " + written);
bytesWritten += written;
inBuf.clear();
finished = true;
outBuf.limit(outBuf.position() + written);
int n = Math.min(written, len);
outBuf.get(b, off, n);
return n;
} else {
return written;
// 走原 lz4-java 路径
int needed = maxCompressedLength(uncompressed);
ByteBuffer writeBuffer;
boolean direct = false;
if (len <= needed) {
writeBuffer = ByteBuffer.wrap(b, off, len);
direct = true;
} else {
if (outBuf.capacity() < needed) {
needed = org.apache.hadoop.hbase.io.compress.CompressionUtil.roundInt2(needed);
outBuf = ByteBuffer.allocate(needed);
} else {
outBuf.clear();
}
writeBuffer = outBuf;
}
final int oldPos = writeBuffer.position();
compressor.compress(inBuf, writeBuffer);
final int written = writeBuffer.position() - oldPos;
bytesWritten += written;
inBuf.clear();
finished = true;
if (!direct) {
outBuf.flip();
int n = Math.min(written, len);
outBuf.get(b, off, n);
return n;
} else {
return written;
}
}
} else {
finished = true;
Expand Down Expand Up @@ -136,8 +159,8 @@ public void reinit(Configuration conf) {
int newBufferSize = Lz4Codec.getBufferSize(conf);
if (bufferSize != newBufferSize) {
bufferSize = newBufferSize;
this.inBuf = ByteBuffer.allocate(bufferSize);
this.outBuf = ByteBuffer.allocate(bufferSize);
this.inBuf = ByteBuffer.allocateDirect(bufferSize);
this.outBuf = ByteBuffer.allocateDirect(bufferSize);
}
}
reset();
Expand All @@ -162,7 +185,8 @@ public void setDictionary(byte[] b, int off, int len) {
@Override
public void setInput(byte[] b, int off, int len) {
if (inBuf.remaining() < len) {
// Get a new buffer that can accomodate the accumulated input plus the additional
// Get a new buffer that can accomodate the accumulated input plus the
// additional
// input that would cause a buffer overflow without reallocation.
// This condition should be fortunately rare, because it is expensive.
int needed = CompressionUtil.roundInt2(inBuf.capacity() + len);
Expand Down
Loading