Skip to content

Commit 8552488

Browse files
authored
[8.19] Fix systemd notify to use a shared arena (elastic#135235) (elastic#135281)
* Fix systemd notify to use a shared arena (elastic#135235) CloseableByteBuffer currently only uses confined Arena for buffer allocation meaning only the thread creating the Arena is allowed to access the native memory. When using elasticsearch with systemd, our notify-extend message is executed on a thread separate from where the native memory was allocated. This is causing a RuntimeException to be thrown. This changes CloseableByteBuffer to allow for a shared Arena as well for systemd. * fix backport
1 parent 9a7879a commit 8552488

File tree

11 files changed

+74
-27
lines changed

11 files changed

+74
-27
lines changed

docs/changelog/135235.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 135235
2+
summary: Fix systemd notify to use a shared arena
3+
area: Infra/Node Lifecycle
4+
type: bug
5+
issues: []

libs/native/jna/src/main/java/org/elasticsearch/nativeaccess/jna/JnaJavaLibrary.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,14 @@
1313
import org.elasticsearch.nativeaccess.lib.JavaLibrary;
1414

1515
class JnaJavaLibrary implements JavaLibrary {
16+
17+
@Override
18+
public CloseableByteBuffer newConfinedBuffer(int len) {
19+
return new JnaCloseableByteBuffer(len);
20+
}
21+
1622
@Override
17-
public CloseableByteBuffer newBuffer(int len) {
23+
public CloseableByteBuffer newSharedBuffer(int len) {
1824
return new JnaCloseableByteBuffer(len);
1925
}
2026
}

libs/native/src/main/java/org/elasticsearch/nativeaccess/AbstractNativeAccess.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,15 @@ public Zstd getZstd() {
4646
}
4747

4848
@Override
49-
public CloseableByteBuffer newBuffer(int len) {
49+
public CloseableByteBuffer newSharedBuffer(int len) {
5050
assert len > 0;
51-
return javaLib.newBuffer(len);
51+
return javaLib.newSharedBuffer(len);
52+
}
53+
54+
@Override
55+
public CloseableByteBuffer newConfinedBuffer(int len) {
56+
assert len > 0;
57+
return javaLib.newConfinedBuffer(len);
5258
}
5359

5460
@Override

libs/native/src/main/java/org/elasticsearch/nativeaccess/LinuxNativeAccess.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ record Arch(
9898
this.systemd = null; // not running under systemd
9999
} else {
100100
logger.debug("Systemd socket path: {}", socketPath);
101-
var buffer = newBuffer(64);
101+
var buffer = newSharedBuffer(64);
102102
this.systemd = new Systemd(libraryProvider.getLibrary(PosixCLibrary.class), socketPath, buffer);
103103
}
104104
}

libs/native/src/main/java/org/elasticsearch/nativeaccess/NativeAccess.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,12 +88,20 @@ default WindowsFunctions getWindowsFunctions() {
8888
Optional<VectorSimilarityFunctions> getVectorSimilarityFunctions();
8989

9090
/**
91-
* Creates a new {@link CloseableByteBuffer}. The buffer must be used within the same thread
92-
* that it is created.
91+
* Creates a new {@link CloseableByteBuffer} using a shared arena. The buffer can be used
92+
* across multiple threads.
9393
* @param len the number of bytes the buffer should allocate
9494
* @return the buffer
9595
*/
96-
CloseableByteBuffer newBuffer(int len);
96+
CloseableByteBuffer newSharedBuffer(int len);
97+
98+
/**
99+
* Creates a new {@link CloseableByteBuffer} using a confined arena. The buffer must be
100+
* used within the same thread that it is created.
101+
* @param len the number of bytes the buffer should allocate
102+
* @return the buffer
103+
*/
104+
CloseableByteBuffer newConfinedBuffer(int len);
97105

98106
/**
99107
* Possible stats for execution filtering.

libs/native/src/main/java/org/elasticsearch/nativeaccess/NoopNativeAccess.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,13 @@ public Zstd getZstd() {
7878
}
7979

8080
@Override
81-
public CloseableByteBuffer newBuffer(int len) {
81+
public CloseableByteBuffer newSharedBuffer(int len) {
82+
logger.warn("cannot allocate buffer because native access is not available");
83+
return null;
84+
}
85+
86+
@Override
87+
public CloseableByteBuffer newConfinedBuffer(int len) {
8288
logger.warn("cannot allocate buffer because native access is not available");
8389
return null;
8490
}

libs/native/src/main/java/org/elasticsearch/nativeaccess/lib/JavaLibrary.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,5 +12,7 @@
1212
import org.elasticsearch.nativeaccess.CloseableByteBuffer;
1313

1414
public non-sealed interface JavaLibrary extends NativeLibrary {
15-
CloseableByteBuffer newBuffer(int len);
15+
CloseableByteBuffer newSharedBuffer(int len);
16+
17+
CloseableByteBuffer newConfinedBuffer(int len);
1618
}

libs/native/src/main21/java/org/elasticsearch/nativeaccess/jdk/JdkCloseableByteBuffer.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,16 @@ class JdkCloseableByteBuffer implements CloseableByteBuffer {
2020
final MemorySegment segment;
2121
private final ByteBuffer bufferView;
2222

23-
JdkCloseableByteBuffer(int len) {
24-
this.arena = Arena.ofConfined();
23+
static JdkCloseableByteBuffer ofShared(int len) {
24+
return new JdkCloseableByteBuffer(len, true);
25+
}
26+
27+
static JdkCloseableByteBuffer ofConfined(int len) {
28+
return new JdkCloseableByteBuffer(len, false);
29+
}
30+
31+
private JdkCloseableByteBuffer(int len, boolean shared) {
32+
this.arena = shared ? Arena.ofShared() : Arena.ofConfined();
2533
this.segment = arena.allocate(len);
2634
this.bufferView = segment.asByteBuffer();
2735
}

libs/native/src/main21/java/org/elasticsearch/nativeaccess/jdk/JdkJavaLibrary.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,14 @@
1313
import org.elasticsearch.nativeaccess.lib.JavaLibrary;
1414

1515
class JdkJavaLibrary implements JavaLibrary {
16+
17+
@Override
18+
public CloseableByteBuffer newSharedBuffer(int len) {
19+
return JdkCloseableByteBuffer.ofShared(len);
20+
}
21+
1622
@Override
17-
public CloseableByteBuffer newBuffer(int len) {
18-
return new JdkCloseableByteBuffer(len);
23+
public CloseableByteBuffer newConfinedBuffer(int len) {
24+
return JdkCloseableByteBuffer.ofConfined(len);
1925
}
2026
}

libs/native/src/test/java/org/elasticsearch/nativeaccess/ZstdTests.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public void testCompressBound() {
3838
}
3939

4040
public void testCompressValidation() {
41-
try (var src = nativeAccess.newBuffer(1000); var dst = nativeAccess.newBuffer(500)) {
41+
try (var src = nativeAccess.newConfinedBuffer(1000); var dst = nativeAccess.newConfinedBuffer(500)) {
4242
var srcBuf = src.buffer();
4343
var dstBuf = dst.buffer();
4444

@@ -58,9 +58,9 @@ public void testCompressValidation() {
5858

5959
public void testDecompressValidation() {
6060
try (
61-
var original = nativeAccess.newBuffer(1000);
62-
var compressed = nativeAccess.newBuffer(500);
63-
var restored = nativeAccess.newBuffer(500)
61+
var original = nativeAccess.newConfinedBuffer(1000);
62+
var compressed = nativeAccess.newConfinedBuffer(500);
63+
var restored = nativeAccess.newConfinedBuffer(500)
6464
) {
6565
var originalBuf = original.buffer();
6666
var compressedBuf = compressed.buffer();
@@ -105,9 +105,9 @@ public void testCycle() {
105105

106106
private void doTestRoundtrip(byte[] data) {
107107
try (
108-
var original = nativeAccess.newBuffer(data.length);
109-
var compressed = nativeAccess.newBuffer(zstd.compressBound(data.length));
110-
var restored = nativeAccess.newBuffer(data.length)
108+
var original = nativeAccess.newConfinedBuffer(data.length);
109+
var compressed = nativeAccess.newConfinedBuffer(zstd.compressBound(data.length));
110+
var restored = nativeAccess.newConfinedBuffer(data.length)
111111
) {
112112
original.buffer().put(0, data);
113113
int compressedLength = zstd.compress(compressed, original, randomIntBetween(-3, 9));
@@ -121,9 +121,9 @@ private void doTestRoundtrip(byte[] data) {
121121
final int compressedOffset = randomIntBetween(1, 1000);
122122
final int decompressedOffset = randomIntBetween(1, 1000);
123123
try (
124-
var original = nativeAccess.newBuffer(decompressedOffset + data.length);
125-
var compressed = nativeAccess.newBuffer(compressedOffset + zstd.compressBound(data.length));
126-
var restored = nativeAccess.newBuffer(decompressedOffset + data.length)
124+
var original = nativeAccess.newConfinedBuffer(decompressedOffset + data.length);
125+
var compressed = nativeAccess.newConfinedBuffer(compressedOffset + zstd.compressBound(data.length));
126+
var restored = nativeAccess.newConfinedBuffer(decompressedOffset + data.length)
127127
) {
128128
original.buffer().put(decompressedOffset, data);
129129
original.buffer().position(decompressedOffset);

0 commit comments

Comments
 (0)