Skip to content

Commit c77a2b5

Browse files
committed
Fix native Aerospike op detection
1 parent 55f7f08 commit c77a2b5

5 files changed

Lines changed: 139 additions & 33 deletions

File tree

stores/utxo/aerospike/aerospike_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,6 @@ import (
7272
"github.com/bsv-blockchain/teranode/ulogger"
7373
"github.com/bsv-blockchain/teranode/util/test"
7474
"github.com/bsv-blockchain/teranode/util/uaerospike"
75-
aeroTest "github.com/bsv-blockchain/testcontainers-aerospike-go"
7675
"github.com/stretchr/testify/assert"
7776
"github.com/stretchr/testify/require"
7877
)
@@ -115,7 +114,7 @@ func TestUnmined(t *testing.T) {
115114

116115
tSettings := test.CreateBaseTestSettings(t)
117116

118-
container, err := aeroTest.RunContainer(ctx)
117+
container, err := runAerospikeTestContainer(ctx)
119118
require.NoError(t, err)
120119

121120
t.Cleanup(func() {
@@ -137,6 +136,9 @@ func TestUnmined(t *testing.T) {
137136

138137
store, err := New(ctx, logger, tSettings, aeroURL)
139138
require.NoError(t, err)
139+
if os.Getenv("AEROSPIKE_EXPECT_NATIVE_OPS") == "true" {
140+
require.True(t, store.useNativeTeranodeOps)
141+
}
140142

141143
t.Run("check_empty_store", func(t *testing.T) {
142144
exists, err := store.indexExists("unminedSinceIndex")
@@ -339,7 +341,7 @@ func TestLargeTxStoresExternally(t *testing.T) {
339341

340342
tSettings := test.CreateBaseTestSettings(t)
341343

342-
container, err := aeroTest.RunContainer(ctx)
344+
container, err := runAerospikeTestContainer(ctx)
343345
require.NoError(t, err)
344346

345347
t.Cleanup(func() {
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package aerospike
2+
3+
import (
4+
"context"
5+
"os"
6+
7+
aeroTest "github.com/bsv-blockchain/testcontainers-aerospike-go"
8+
"github.com/testcontainers/testcontainers-go"
9+
)
10+
11+
func runAerospikeTestContainer(ctx context.Context, opts ...testcontainers.ContainerCustomizer) (*aeroTest.Container, error) {
12+
image := os.Getenv("AEROSPIKE_CONTAINER_IMAGE")
13+
if image != "" {
14+
opts = append(opts, aeroTest.WithImage(image))
15+
16+
platform := os.Getenv("AEROSPIKE_CONTAINER_PLATFORM")
17+
if platform == "" {
18+
platform = "linux/amd64"
19+
}
20+
opts = append(opts, testcontainers.WithImagePlatform(platform))
21+
}
22+
23+
return aeroTest.RunContainer(ctx, opts...)
24+
}

stores/utxo/aerospike/index_test.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/bsv-blockchain/teranode/stores/utxo/fields"
1111
"github.com/bsv-blockchain/teranode/ulogger"
1212
"github.com/bsv-blockchain/teranode/util/uaerospike"
13-
aeroTest "github.com/bsv-blockchain/testcontainers-aerospike-go"
1413
"github.com/stretchr/testify/assert"
1514
"github.com/stretchr/testify/require"
1615
)
@@ -19,7 +18,7 @@ func TestCreateIndexIfNotExists(t *testing.T) {
1918
logger := ulogger.NewVerboseTestLogger(t)
2019
ctx := context.Background()
2120

22-
container, err := aeroTest.RunContainer(ctx)
21+
container, err := runAerospikeTestContainer(ctx)
2322
require.NoError(t, err)
2423

2524
t.Cleanup(func() {
@@ -75,7 +74,7 @@ func TestWaitForIndexReady(t *testing.T) {
7574
logger := ulogger.NewVerboseTestLogger(t)
7675
ctx := context.Background()
7776

78-
container, err := aeroTest.RunContainer(ctx)
77+
container, err := runAerospikeTestContainer(ctx)
7978
require.NoError(t, err)
8079

8180
t.Cleanup(func() {
@@ -132,7 +131,7 @@ func TestIndexWaiterAdapter(t *testing.T) {
132131
logger := ulogger.NewVerboseTestLogger(t)
133132
ctx := context.Background()
134133

135-
container, err := aeroTest.RunContainer(ctx)
134+
container, err := runAerospikeTestContainer(ctx)
136135
require.NoError(t, err)
137136

138137
t.Cleanup(func() {

stores/utxo/aerospike/native_op.go

Lines changed: 66 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
// invoked through one of two paths:
55
//
66
// 1. UDF path (legacy): aerospike.NewBatchUDF(..., "<fn>", args...)
7-
// 2. Native path: aerospike.NewBatchWrite(..., TeranodeModifyOp("result", payload))
7+
// 2. Native path: aerospike.NewBatchWrite(..., TeranodeModifyOp("SUCCESS", payload))
88
//
99
// The native path bypasses the UDF executor and runs under the same
1010
// lock as native ops like LIST_APPEND. It requires both:
@@ -30,6 +30,7 @@ import (
3030
aerospike "github.com/bsv-blockchain/aerospike-client-go/v8"
3131
"github.com/bsv-blockchain/aerospike-client-go/v8/types"
3232
"github.com/bsv-blockchain/teranode/errors"
33+
"github.com/bsv-blockchain/teranode/stores/utxo/fields"
3334
"github.com/vmihailenco/msgpack/v5"
3435
)
3536

@@ -52,11 +53,10 @@ const (
5253
subOpAddDeletedChildren uint8 = 13
5354
)
5455

55-
// nativeOpResultBin is the bin name used by the native-op result
56-
// message. The dispatcher writes the result map into this bin in the
57-
// response; the request bin name is echoed back unchanged so callers
58-
// can read result by name.
59-
const nativeOpResultBin = "result"
56+
// nativeOpResultBin is the bin name used by the native-op result message.
57+
// Keep this aligned with LuaSuccess so existing batch response parsing works
58+
// for both UDF and native paths.
59+
const nativeOpResultBin = string(LuaSuccess)
6060

6161
// encodeNativeOpPayload serializes a sub-op invocation onto the wire
6262
// as MessagePack `[sub_op_id, [args...]]` matching the dispatcher's
@@ -173,11 +173,10 @@ func (s *Store) executeTeranodeUDF(
173173

174174
// detectNativeTeranodeOpSupport probes the cluster to confirm it
175175
// understands AS_MSG_OP_TERANODE_MODIFY (wire op 200). Returns true
176-
// only when the probe got a response shape consistent with our
177-
// dispatcher (PARAMETER_ERROR for malformed payload, or OK for a
178-
// valid one). Anything else — connection error, timeout, an
179-
// unexpected result code — biases toward false-negative so the
180-
// fallback never runs against a server that doesn't support the op.
176+
// only when the server accepts a valid native sub-op, returns a parseable
177+
// response, and applies the expected record mutation. Anything else biases
178+
// toward false-negative so the fallback never runs against a server that
179+
// doesn't support the op.
181180
//
182181
// Called once during store construction; the result is cached in
183182
// s.useNativeTeranodeOps.
@@ -187,41 +186,82 @@ func (s *Store) detectNativeTeranodeOpSupport(ctx context.Context) bool {
187186
}
188187

189188
// Probe key — chosen to never collide with real txid keys (always
190-
// 32 bytes / chainhash). The bin we read back from the response
191-
// also doesn't pollute the namespace because we never commit; the
192-
// dispatcher fails fast on the malformed payload.
189+
// 32 bytes / chainhash). Create a short-lived record and run setLocked:
190+
// a patched server mutates the record and returns a structured response,
191+
// while an unpatched server rejects the custom opcode with PARAMETER_ERROR.
193192
probeKey, err := aerospike.NewKey(s.namespace, s.setName, "_teranode-native-op-probe")
194193
if err != nil {
195194
s.logger.Warnf("[teranode-native-op] probe key creation failed: %v; falling back to UDF path", err)
196195
return false
197196
}
198197

199-
// Intentionally malformed payload: msgpack `nil` (0xc0). The
200-
// dispatcher's first step is as_unpack_list_header_element_count;
201-
// nil isn't a list header, so we expect AS_ERR_PARAMETER (=4).
202-
probeOp := aerospike.TeranodeModifyOp(nativeOpResultBin, []byte{0xc0})
198+
payload, encErr := encodeNativeOpPayload(subOpSetLocked, []any{true})
199+
if encErr != nil {
200+
s.logger.Warnf("[teranode-native-op] probe payload encode failed: %v; falling back to UDF path", encErr)
201+
return false
202+
}
203+
204+
probeOp := aerospike.TeranodeModifyOp(nativeOpResultBin, payload)
203205

204206
policy := aerospike.NewWritePolicy(0, 0)
205207
policy.TotalTimeout = 2 * time.Second
208+
policy.Expiration = 60
206209

207210
probeCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
208211
defer cancel()
209212
_ = probeCtx // reserved for future cancellation plumbing into the client
210213

211-
_, opErr := s.client.Operate(policy, probeKey, probeOp)
214+
if putErr := s.client.PutBins(policy, probeKey, aerospike.NewBin("_nativeProbe", true)); putErr != nil {
215+
s.logger.Warnf("[teranode-native-op] probe record setup failed: %v; falling back to UDF path", putErr)
216+
return false
217+
}
218+
defer func() {
219+
_, _ = s.client.Delete(policy, probeKey)
220+
}()
221+
222+
rec, opErr := s.client.Operate(policy, probeKey, probeOp)
212223
if opErr == nil {
213-
// Unexpected — we sent garbage; getting OK means the dispatcher
214-
// is even more permissive than we thought. Treat as supported.
224+
if rec == nil || rec.Bins == nil || rec.Bins[nativeOpResultBin] == nil {
225+
s.logger.Warnf("[teranode-native-op] probe returned no %q bin; falling back to UDF path", nativeOpResultBin)
226+
return false
227+
}
228+
229+
res, parseErr := s.ParseLuaMapResponse(rec.Bins[nativeOpResultBin])
230+
if parseErr != nil {
231+
s.logger.Warnf("[teranode-native-op] probe returned unparsable response (%v); falling back to UDF path", parseErr)
232+
return false
233+
}
234+
235+
if res.Status != LuaStatusOK {
236+
s.logger.Warnf("[teranode-native-op] probe returned non-OK response (%+v); falling back to UDF path", res)
237+
return false
238+
}
239+
240+
readPolicy := aerospike.NewPolicy()
241+
readPolicy.TotalTimeout = 2 * time.Second
242+
243+
probeRecord, readErr := s.client.Get(readPolicy, probeKey, fields.Locked.String())
244+
if readErr != nil {
245+
s.logger.Warnf("[teranode-native-op] probe verification failed: %v; falling back to UDF path", readErr)
246+
return false
247+
}
248+
if probeRecord == nil || probeRecord.Bins == nil {
249+
s.logger.Warnf("[teranode-native-op] probe verification returned no record; falling back to UDF path")
250+
return false
251+
}
252+
if locked, ok := probeRecord.Bins[fields.Locked.String()].(bool); !ok || !locked {
253+
s.logger.Warnf("[teranode-native-op] probe did not set %q=true; falling back to UDF path", fields.Locked.String())
254+
return false
255+
}
256+
215257
return true
216258
}
217259

218-
// Aerospike Error wraps a result code; PARAMETER_ERROR (=4) is
219-
// what our dispatcher returns for the malformed payload.
220260
var aerr aerospike.Error
221261
if errors.As(opErr, &aerr) {
222-
switch aerr.Matches(types.PARAMETER_ERROR) {
223-
case true:
224-
return true
262+
if aerr.Matches(types.PARAMETER_ERROR) {
263+
s.logger.Infof("[teranode-native-op] server rejected native-op probe; falling back to UDF path")
264+
return false
225265
}
226266
s.logger.Warnf("[teranode-native-op] probe got unexpected error code (%v); "+
227267
"falling back to UDF path", aerr)
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package aerospike
2+
3+
import (
4+
"testing"
5+
6+
"github.com/vmihailenco/msgpack/v5"
7+
)
8+
9+
func TestNativeOpResultBinMatchesLuaSuccess(t *testing.T) {
10+
if nativeOpResultBin != LuaSuccess.String() {
11+
t.Fatalf("native result bin %q must match Lua success bin %q",
12+
nativeOpResultBin, LuaSuccess.String())
13+
}
14+
}
15+
16+
func TestEncodeNativeOpPayloadUsesSubOpAndArgsArray(t *testing.T) {
17+
payload, err := encodeNativeOpPayload(subOpSetLocked, []any{true})
18+
if err != nil {
19+
t.Fatalf("encode native op payload: %v", err)
20+
}
21+
22+
var decoded []any
23+
if err := msgpack.Unmarshal(payload, &decoded); err != nil {
24+
t.Fatalf("decode native op payload: %v", err)
25+
}
26+
27+
if len(decoded) != 2 {
28+
t.Fatalf("payload length = %d, want 2", len(decoded))
29+
}
30+
if decoded[0] != subOpSetLocked {
31+
t.Fatalf("sub op = %v (%T), want %d", decoded[0], decoded[0], subOpSetLocked)
32+
}
33+
34+
args, ok := decoded[1].([]any)
35+
if !ok {
36+
t.Fatalf("args = %T, want []any", decoded[1])
37+
}
38+
if len(args) != 1 || args[0] != true {
39+
t.Fatalf("args = %#v, want []any{true}", args)
40+
}
41+
}

0 commit comments

Comments
 (0)