Skip to content

Commit 4eee4e4

Browse files
committed
sn/object: Proxy EC GET/HEAD/RANGE requests when needed
When processing EC request, new requests to remote nodes are always created. The server may not have read permissions for the container data, so these calls will be forbidden. To avoid this, original request should be forwarded to the container nodes, as is done for REP rules. Refs #3422, #3423, #3614. Signed-off-by: Leonard Lyubich <[email protected]>
1 parent 1ab8419 commit 4eee4e4

File tree

2 files changed

+74
-6
lines changed

2 files changed

+74
-6
lines changed

pkg/services/object/get/get.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import (
55
"errors"
66
"fmt"
77

8+
"github.com/nspcc-dev/neofs-node/pkg/network"
9+
"github.com/nspcc-dev/neofs-node/pkg/services/object/internal"
810
"github.com/nspcc-dev/neofs-node/pkg/util"
911
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
12+
"github.com/nspcc-dev/neofs-sdk-go/netmap"
1013
"github.com/nspcc-dev/neofs-sdk-go/object"
1114
"go.uber.org/zap"
1215
)
@@ -40,8 +43,45 @@ func (s *Service) Get(ctx context.Context, prm Prm) error {
4043
}
4144
}
4245

46+
ecNodeLists := nodeLists[len(repRules):]
47+
if prm.forwarder != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) {
48+
return s.proxyGetRequest(ctx, ecNodeLists, prm.forwarder, "GET", nil)
49+
}
50+
4351
return s.copyECObject(ctx, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(),
44-
ecRules, nodeLists[len(repRules):], prm.objWriter)
52+
ecRules, ecNodeLists, prm.objWriter)
53+
}
54+
55+
func (s *Service) proxyGetRequest(ctx context.Context, sortedNodeLists [][]netmap.NodeInfo, proxyFn RequestForwarder,
56+
req string, headWriter internal.HeaderWriter) error {
57+
for i := range sortedNodeLists {
58+
for j := range sortedNodeLists[i] {
59+
conn, node, err := s.conns.(*clientCacheWrapper)._connect(sortedNodeLists[i][j])
60+
if err != nil {
61+
// TODO: implement address list stringer for lazy encoding
62+
s.log.Debug("get conn to remote node",
63+
zap.String("addresses", network.StringifyGroup(node.AddressGroup())), zap.Error(err))
64+
continue
65+
}
66+
67+
hdr, err := proxyFn(ctx, node, conn)
68+
if err == nil {
69+
if headWriter != nil {
70+
return headWriter.WriteHeader(hdr)
71+
}
72+
return nil
73+
}
74+
75+
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) ||
76+
errors.Is(err, apistatus.ErrObjectOutOfRange) || errors.Is(err, ctx.Err()) {
77+
return err
78+
}
79+
80+
s.log.Info("request proxy failed", zap.String("request", req), zap.Error(err))
81+
}
82+
}
83+
84+
return apistatus.ErrObjectNotFound
4585
}
4686

4787
// GetRange serves a request to get an object by address, and returns Streamer instance.
@@ -73,8 +113,13 @@ func (s *Service) GetRange(ctx context.Context, prm RangePrm) error {
73113
}
74114
}
75115

116+
ecNodeLists := nodeLists[len(repRules):]
117+
if prm.forwarder != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) {
118+
return s.proxyGetRequest(ctx, ecNodeLists, prm.forwarder, "RANGE", nil)
119+
}
120+
76121
return s.copyECObjectRange(ctx, prm.objWriter, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(),
77-
ecRules, nodeLists[len(repRules):], prm.rng.GetOffset(), prm.rng.GetLength())
122+
ecRules, ecNodeLists, prm.rng.GetOffset(), prm.rng.GetLength())
78123
}
79124

80125
func (s *Service) getRange(ctx context.Context, prm RangePrm, opts ...execOption) error {
@@ -141,8 +186,13 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
141186
}
142187
}
143188

189+
ecNodeLists := nodeLists[len(repRules):]
190+
if prm.forwarder != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) {
191+
return s.proxyGetRequest(ctx, ecNodeLists, prm.forwarder, "HEAD", prm.objWriter)
192+
}
193+
144194
return s.copyECObjectHeader(ctx, prm.objWriter, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(),
145-
ecRules, nodeLists[len(repRules):])
195+
ecRules, ecNodeLists)
146196
}
147197

148198
func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError {

pkg/services/object/get/util.go

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"errors"
88
"fmt"
99
"io"
10+
"slices"
1011

1112
coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
1213
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
@@ -405,11 +406,16 @@ func (c *clientCacheWrapper) InitGetObjectRangeStream(ctx context.Context, node
405406
}
406407

407408
func (c *clientCacheWrapper) connect(node netmap.NodeInfo) (coreclient.MultiAddressClient, error) {
409+
conn, _, err := c._connect(node)
410+
return conn, err
411+
}
412+
413+
func (c *clientCacheWrapper) _connect(node netmap.NodeInfo) (coreclient.MultiAddressClient, coreclient.NodeInfo, error) {
408414
// TODO: code is copied from pkg/services/object/get/container.go:63. Worth sharing?
409415
// TODO: we may waste resources doing this per request. Make once on network map change instead.
410416
var ag network.AddressGroup
411417
if err := ag.FromIterator(network.NodeEndpointsIterator(node)); err != nil {
412-
return nil, fmt.Errorf("decode SN network addresses: %w", err)
418+
return nil, coreclient.NodeInfo{}, fmt.Errorf("decode SN network addresses: %w", err)
413419
}
414420

415421
var ni coreclient.NodeInfo
@@ -418,10 +424,10 @@ func (c *clientCacheWrapper) connect(node netmap.NodeInfo) (coreclient.MultiAddr
418424

419425
conn, err := c.cache.Get(ni)
420426
if err != nil {
421-
return nil, fmt.Errorf("get conn: %w", err)
427+
return nil, coreclient.NodeInfo{}, fmt.Errorf("get conn: %w", err)
422428
}
423429

424-
return conn, nil
430+
return conn, ni, nil
425431
}
426432

427433
// TODO: share.
@@ -436,3 +442,15 @@ func convertContextStatus(err error) error {
436442
return context.DeadlineExceeded
437443
}
438444
}
445+
446+
func localNodeInSets(n NeoFSNetwork, nodeSets [][]netmap.NodeInfo) bool {
447+
return slices.ContainsFunc(nodeSets, func(nodeSet []netmap.NodeInfo) bool {
448+
return localNodeInSet(n, nodeSet)
449+
})
450+
}
451+
452+
func localNodeInSet(n NeoFSNetwork, nodes []netmap.NodeInfo) bool {
453+
return slices.ContainsFunc(nodes, func(node netmap.NodeInfo) bool {
454+
return n.IsLocalNodePublicKey(node.PublicKey())
455+
})
456+
}

0 commit comments

Comments
 (0)