Skip to content

Commit 0d27034

Browse files
committed
sn/object: Initial support for erasure coding policies by HASH server
Closes #3656. Refs #3423, #3654, #526. Signed-off-by: Leonard Lyubich <[email protected]>
1 parent c5d8230 commit 0d27034

File tree

4 files changed

+67
-33
lines changed

4 files changed

+67
-33
lines changed

pkg/services/object/get/exec.go

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ type execCtx struct {
3535

3636
ctx context.Context
3737

38-
prm RangePrm
39-
prmRangeHash *RangeHashPrm
38+
prm RangePrm
4039

4140
statusError
4241

@@ -82,12 +81,6 @@ func withPayloadRange(r *objectSDK.Range) execOption {
8281
}
8382
}
8483

85-
func withHash(p *RangeHashPrm) execOption {
86-
return func(ctx *execCtx) {
87-
ctx.prmRangeHash = p
88-
}
89-
}
90-
9184
func withLogger(l *zap.Logger) execOption {
9285
return func(ctx *execCtx) {
9386
ctx.log = l
@@ -418,15 +411,8 @@ func (exec execCtx) isForwardingEnabled() bool {
418411
return exec.prm.forwarder != nil
419412
}
420413

421-
// isRangeHashForwardingEnabled returns true if common execution
422-
// parameters has GETRANGEHASH request forwarding closure set.
423-
func (exec execCtx) isRangeHashForwardingEnabled() bool {
424-
return exec.prm.rangeForwarder != nil
425-
}
426-
427414
// disableForwarding removes request forwarding closure from common
428415
// parameters, so it won't be inherited in new execution contexts.
429416
func (exec *execCtx) disableForwarding() {
430417
exec.prm.SetRequestForwarder(nil)
431-
exec.prm.SetRangeHashRequestForwarder(nil)
432418
}

pkg/services/object/get/get.go

Lines changed: 45 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import (
55
"errors"
66
"fmt"
77

8+
iec "github.com/nspcc-dev/neofs-node/internal/ec"
9+
"github.com/nspcc-dev/neofs-node/pkg/network"
810
"github.com/nspcc-dev/neofs-node/pkg/util"
911
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
12+
netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap"
1013
"github.com/nspcc-dev/neofs-sdk-go/object"
1114
"go.uber.org/zap"
1215
)
@@ -66,6 +69,10 @@ func (s *Service) GetRange(ctx context.Context, prm RangePrm) error {
6669
return s.copyLocalECPartRange(prm.objWriter, prm.addr.Container(), prm.addr.Object(), pi, prm.rng.GetOffset(), prm.rng.GetLength())
6770
}
6871

72+
return s.getRange(ctx, prm, nodeLists, repRules, ecRules)
73+
}
74+
75+
func (s *Service) getRange(ctx context.Context, prm RangePrm, nodeLists [][]netmapsdk.NodeInfo, repRules []uint, ecRules []iec.Rule) error {
6976
if len(repRules) > 0 { // REP format does not require encoding
7077
err := s.get(ctx, prm.commonPrm, withPreSortedContainerNodes(nodeLists[:len(repRules)], repRules), withPayloadRange(prm.rng)).err
7178
if len(ecRules) == 0 || !errors.Is(err, apistatus.ErrObjectNotFound) {
@@ -78,6 +85,15 @@ func (s *Service) GetRange(ctx context.Context, prm RangePrm) error {
7885
}
7986

8087
func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHashRes, error) {
88+
nodeLists, repRules, ecRules, err := s.neoFSNet.GetNodesForObject(prm.addr)
89+
if err != nil {
90+
return nil, fmt.Errorf("get nodes for object: %w", err)
91+
}
92+
93+
if prm.common.SessionToken() == nil && prm.rangeForwarder != nil && !localNodeInSets(s.neoFSNet, nodeLists) {
94+
return s.proxyHashRequest(ctx, nodeLists, prm.rangeForwarder)
95+
}
96+
8197
hashes := make([][]byte, 0, len(prm.rngs))
8298

8399
for _, rng := range prm.rngs {
@@ -97,17 +113,10 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
97113
hash: util.NewSaltingWriter(h, prm.salt),
98114
})
99115

100-
if err := s.get(ctx, rngPrm.commonPrm, withHash(&prm), withPayloadRange(rngPrm.rng)).err; err != nil {
116+
if err := s.getRange(ctx, rngPrm, nodeLists, repRules, ecRules); err != nil {
101117
return nil, err
102118
}
103119

104-
if prm.forwardedRangeHashResponse != nil {
105-
// forwarder request case; no need to collect the other
106-
// parts, the whole response has already been received
107-
hashes = prm.forwardedRangeHashResponse
108-
break
109-
}
110-
111120
hashes = append(hashes, h.Sum(nil))
112121
}
113122

@@ -116,6 +125,34 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
116125
}, nil
117126
}
118127

128+
func (s *Service) proxyHashRequest(ctx context.Context, sortedNodeLists [][]netmapsdk.NodeInfo, proxyFn RangeRequestForwarder) (*RangeHashRes, error) {
129+
for i := range sortedNodeLists {
130+
for j := range sortedNodeLists[i] {
131+
conn, node, err := s.conns.(*clientCacheWrapper)._connect(sortedNodeLists[i][j])
132+
if err != nil {
133+
// TODO: implement address list stringer for lazy encoding
134+
s.log.Debug("get conn to remote node",
135+
zap.String("addresses", network.StringifyGroup(node.AddressGroup())), zap.Error(err))
136+
continue
137+
}
138+
139+
hashes, err := proxyFn(ctx, node, conn)
140+
if err == nil {
141+
return &RangeHashRes{hashes: hashes}, nil
142+
}
143+
144+
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) ||
145+
errors.Is(err, apistatus.ErrObjectOutOfRange) || errors.Is(err, ctx.Err()) {
146+
return nil, err
147+
}
148+
149+
s.log.Debug("HASH proxy failed", zap.Error(err))
150+
}
151+
}
152+
153+
return nil, apistatus.ErrObjectNotFound
154+
}
155+
119156
// Head reads object header from container.
120157
//
121158
// Returns ErrNotFound if the header was not received for the call.

pkg/services/object/get/prm.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,6 @@ type RangeHashPrm struct {
3333
rngs []object.Range
3434

3535
salt []byte
36-
37-
forwardedRangeHashResponse [][]byte
3836
}
3937

4038
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)

pkg/services/object/get/util.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"io"
10+
"slices"
1011

1112
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
1213
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
@@ -213,11 +214,6 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
213214
return hdr, nil, nil
214215
}
215216

216-
if rngH := exec.prmRangeHash; rngH != nil && exec.isRangeHashForwardingEnabled() {
217-
exec.prmRangeHash.forwardedRangeHashResponse, err = exec.prm.rangeForwarder(exec.ctx, info, c.client)
218-
return nil, nil, err
219-
}
220-
221217
// we don't specify payload writer because we accumulate
222218
// the object locally (even huge).
223219
if rng := exec.ctxRange(); rng != nil {
@@ -405,11 +401,16 @@ func (c *clientCacheWrapper) InitGetObjectRangeStream(ctx context.Context, node
405401
}
406402

407403
func (c *clientCacheWrapper) connect(node netmap.NodeInfo) (coreclient.MultiAddressClient, error) {
404+
conn, _, err := c._connect(node)
405+
return conn, err
406+
}
407+
408+
func (c *clientCacheWrapper) _connect(node netmap.NodeInfo) (coreclient.MultiAddressClient, coreclient.NodeInfo, error) {
408409
// TODO: code is copied from pkg/services/object/get/container.go:63. Worth sharing?
409410
// TODO: we may waste resources doing this per request. Make once on network map change instead.
410411
var ag network.AddressGroup
411412
if err := ag.FromIterator(network.NodeEndpointsIterator(node)); err != nil {
412-
return nil, fmt.Errorf("decode SN network addresses: %w", err)
413+
return nil, coreclient.NodeInfo{}, fmt.Errorf("decode SN network addresses: %w", err)
413414
}
414415

415416
var ni coreclient.NodeInfo
@@ -418,10 +419,10 @@ func (c *clientCacheWrapper) connect(node netmap.NodeInfo) (coreclient.MultiAddr
418419

419420
conn, err := c.cache.Get(ni)
420421
if err != nil {
421-
return nil, fmt.Errorf("get conn: %w", err)
422+
return nil, coreclient.NodeInfo{}, fmt.Errorf("get conn: %w", err)
422423
}
423424

424-
return conn, nil
425+
return conn, ni, nil
425426
}
426427

427428
// TODO: share.
@@ -436,3 +437,15 @@ func convertContextStatus(err error) error {
436437
return context.DeadlineExceeded
437438
}
438439
}
440+
441+
func localNodeInSets(n NeoFSNetwork, nodeSets [][]netmap.NodeInfo) bool {
442+
return slices.ContainsFunc(nodeSets, func(nodeSet []netmap.NodeInfo) bool {
443+
return localNodeInSet(n, nodeSet)
444+
})
445+
}
446+
447+
func localNodeInSet(n NeoFSNetwork, nodes []netmap.NodeInfo) bool {
448+
return slices.ContainsFunc(nodes, func(node netmap.NodeInfo) bool {
449+
return n.IsLocalNodePublicKey(node.PublicKey())
450+
})
451+
}

0 commit comments

Comments
 (0)