From 1ab84190c5d4fda2d8d7728920477a0a09245e82 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Mon, 27 Oct 2025 18:27:03 +0300 Subject: [PATCH 1/2] sn/object: Abort GET/HEAD/RANGE in EC containers on access error There is no point in continuing the operation with such an error. Returning it will make it easier to find the problem. Refs #3422, #3423, #3614. Signed-off-by: Leonard Lyubich --- pkg/services/object/get/ec.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/pkg/services/object/get/ec.go b/pkg/services/object/get/ec.go index 65d6298c71..b46c953fc6 100644 --- a/pkg/services/object/get/ec.go +++ b/pkg/services/object/get/ec.go @@ -75,7 +75,7 @@ func (s *Service) getECObjectHeader(ctx context.Context, cnr cid.ID, id oid.ID, var firstErr error for i := range ecRules { hdr, err := s.getECObjectHeaderByRule(ctx, *localNodeKey, cnr, id, sTok, sortedNodeLists[i]) - if err == nil || errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, ctx.Err()) { + if err == nil || errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) { return hdr, err } @@ -128,7 +128,7 @@ func (s *Service) getECObjectHeaderByRule(ctx context.Context, localNodeKey ecds err = convertContextStatus(err) - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, ctx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) { return object.Object{}, err } @@ -174,7 +174,7 @@ func (s *Service) copyECObject(ctx context.Context, cnr cid.ID, parent oid.ID, s } return nil } - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, ctx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) { return err } @@ -224,7 +224,7 @@ nextPart: continue nextPart } - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, ctx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) { return err } @@ -270,7 +270,7 @@ func (s *Service) restoreFromECPartsByRule(ctx context.Context, cnr cid.ID, pare eg.Go(func() error { parentHdr, partPayload, err := s.getECPart(gCtx, cnr, parent, sTok, rule, ruleIdx, sortedNodes, partIdx) if err != nil { - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, gCtx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, gCtx.Err()) { return err } if failed := failCounter.Add(1); failed > uint32(rule.ParityPartNum) { @@ -332,7 +332,7 @@ func (s *Service) restoreFromECPartsByRule(ctx context.Context, cnr cid.ID, pare eg.Go(func() error { _, part, err := s.getECPart(gCtx, cnr, parent, sTok, rule, ruleIdx, sortedNodes, partIdx) if err != nil { - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, gCtx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, gCtx.Err()) { return err } if failed := failCounter.Add(1); failed+uint32(rem) > uint32(rule.ParityPartNum) { @@ -408,7 +408,7 @@ func (s *Service) getECPartStream(ctx context.Context, cnr cid.ID, parent oid.ID if err == nil { return partHdr, rc, nil } - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) { return object.Object{}, nil, err } if errors.Is(err, ctx.Err()) { @@ -598,7 +598,7 @@ func (s *Service) copyECObjectRange(ctx context.Context, dst ChunkWriter, cnr ci for i := range ecRules { written, err := s.copyECObjectRangeByRule(ctx, dst, *localNodeKey, cnr, parent, sTok, ecRules[i], i, sortedNodeLists[i], off, ln) if err == nil || errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || - errors.Is(err, errStreamFailure) || errors.Is(err, ctx.Err()) { + errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, errStreamFailure) || errors.Is(err, ctx.Err()) { return err } @@ -815,7 +815,7 @@ func (s *Service) copyECObjectRangeByParts(ctx context.Context, dst ChunkWriter, failedPartIdx, failedPartWritten, written, err := s.copyECPartsRanges(stageCtx, dst, localNodeKey, cnr, parent, sTok, rule, ruleIdx, sortedNodes, fullPartLen, firstPartIdx, firstPartOff, lastPartIdx, lastPartTo, firstPartStream) if err == nil || errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || - errors.Is(err, errStreamFailure) || errors.Is(err, ctx.Err()) { + errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, errStreamFailure) || errors.Is(err, ctx.Err()) { return written, err } @@ -959,7 +959,8 @@ func (s *Service) getRecoveryECPartRanges(ctx context.Context, localNodeKey ecds eg.Go(func() error { part, err := s.readFullECPartRange(egCtx, cnr, parent, sTok, rule, ruleIdx, sortedNodes, partIdx, localNodeKey, fullPartLen) if err != nil { - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || errors.Is(err, egCtx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || + errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, egCtx.Err()) { return err } @@ -1041,7 +1042,8 @@ func (s *Service) getECPartRangeStream(ctx context.Context, cnr cid.ID, parent o rc, err = s.getECPartRangeFromNode(ctx, cnr, parent, off, ln, sTok, pi, localNodeKey, sortedNodes[i]) if err != nil { - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || errors.Is(err, ctx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || + errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) { return nil, err } if !errors.Is(err, apistatus.ErrObjectNotFound) { @@ -1078,7 +1080,7 @@ func (s *Service) getECPartRangeStream(ctx context.Context, cnr cid.ID, parent o _, rc, err = s.getECPartFromNode(ctx, cnr, parent, sTok, pi, sortedNodes[i]) if err != nil { err = convertContextStatus(err) - if errors.Is(err, ctx.Err()) { + if errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) { return nil, err } if !errors.Is(err, apistatus.ErrObjectNotFound) { From 4eee4e4bdbf55fed65d284740a1f07ac879f7ab3 Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Thu, 30 Oct 2025 13:58:28 +0300 Subject: [PATCH 2/2] sn/object: Proxy EC GET/HEAD/RANGE requests when needed When processing EC request, new requests to remote nodes are always created. The server may not have read permissions for the container data, so these calls will be forbidden. To avoid this, original request should be forwarded to the container nodes, as is done for REP rules. Refs #3422, #3423, #3614. Signed-off-by: Leonard Lyubich --- pkg/services/object/get/get.go | 56 +++++++++++++++++++++++++++++++-- pkg/services/object/get/util.go | 24 ++++++++++++-- 2 files changed, 74 insertions(+), 6 deletions(-) diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 0bf40b7a51..bc430670b4 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -5,8 +5,11 @@ import ( "errors" "fmt" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" "github.com/nspcc-dev/neofs-node/pkg/util" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" "go.uber.org/zap" ) @@ -40,8 +43,45 @@ func (s *Service) Get(ctx context.Context, prm Prm) error { } } + ecNodeLists := nodeLists[len(repRules):] + if prm.forwarder != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) { + return s.proxyGetRequest(ctx, ecNodeLists, prm.forwarder, "GET", nil) + } + return s.copyECObject(ctx, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(), - ecRules, nodeLists[len(repRules):], prm.objWriter) + ecRules, ecNodeLists, prm.objWriter) +} + +func (s *Service) proxyGetRequest(ctx context.Context, sortedNodeLists [][]netmap.NodeInfo, proxyFn RequestForwarder, + req string, headWriter internal.HeaderWriter) error { + for i := range sortedNodeLists { + for j := range sortedNodeLists[i] { + conn, node, err := s.conns.(*clientCacheWrapper)._connect(sortedNodeLists[i][j]) + if err != nil { + // TODO: implement address list stringer for lazy encoding + s.log.Debug("get conn to remote node", + zap.String("addresses", network.StringifyGroup(node.AddressGroup())), zap.Error(err)) + continue + } + + hdr, err := proxyFn(ctx, node, conn) + if err == nil { + if headWriter != nil { + return headWriter.WriteHeader(hdr) + } + return nil + } + + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || + errors.Is(err, apistatus.ErrObjectOutOfRange) || errors.Is(err, ctx.Err()) { + return err + } + + s.log.Info("request proxy failed", zap.String("request", req), zap.Error(err)) + } + } + + return apistatus.ErrObjectNotFound } // GetRange serves a request to get an object by address, and returns Streamer instance. @@ -73,8 +113,13 @@ func (s *Service) GetRange(ctx context.Context, prm RangePrm) error { } } + ecNodeLists := nodeLists[len(repRules):] + if prm.forwarder != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) { + return s.proxyGetRequest(ctx, ecNodeLists, prm.forwarder, "RANGE", nil) + } + return s.copyECObjectRange(ctx, prm.objWriter, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(), - ecRules, nodeLists[len(repRules):], prm.rng.GetOffset(), prm.rng.GetLength()) + ecRules, ecNodeLists, prm.rng.GetOffset(), prm.rng.GetLength()) } func (s *Service) getRange(ctx context.Context, prm RangePrm, opts ...execOption) error { @@ -141,8 +186,13 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error { } } + ecNodeLists := nodeLists[len(repRules):] + if prm.forwarder != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) { + return s.proxyGetRequest(ctx, ecNodeLists, prm.forwarder, "HEAD", prm.objWriter) + } + return s.copyECObjectHeader(ctx, prm.objWriter, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(), - ecRules, nodeLists[len(repRules):]) + ecRules, ecNodeLists) } func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError { diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 78ce61e54c..f4f560acd7 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "slices" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" @@ -405,11 +406,16 @@ func (c *clientCacheWrapper) InitGetObjectRangeStream(ctx context.Context, node } func (c *clientCacheWrapper) connect(node netmap.NodeInfo) (coreclient.MultiAddressClient, error) { + conn, _, err := c._connect(node) + return conn, err +} + +func (c *clientCacheWrapper) _connect(node netmap.NodeInfo) (coreclient.MultiAddressClient, coreclient.NodeInfo, error) { // TODO: code is copied from pkg/services/object/get/container.go:63. Worth sharing? // TODO: we may waste resources doing this per request. Make once on network map change instead. var ag network.AddressGroup if err := ag.FromIterator(network.NodeEndpointsIterator(node)); err != nil { - return nil, fmt.Errorf("decode SN network addresses: %w", err) + return nil, coreclient.NodeInfo{}, fmt.Errorf("decode SN network addresses: %w", err) } var ni coreclient.NodeInfo @@ -418,10 +424,10 @@ func (c *clientCacheWrapper) connect(node netmap.NodeInfo) (coreclient.MultiAddr conn, err := c.cache.Get(ni) if err != nil { - return nil, fmt.Errorf("get conn: %w", err) + return nil, coreclient.NodeInfo{}, fmt.Errorf("get conn: %w", err) } - return conn, nil + return conn, ni, nil } // TODO: share. @@ -436,3 +442,15 @@ func convertContextStatus(err error) error { return context.DeadlineExceeded } } + +func localNodeInSets(n NeoFSNetwork, nodeSets [][]netmap.NodeInfo) bool { + return slices.ContainsFunc(nodeSets, func(nodeSet []netmap.NodeInfo) bool { + return localNodeInSet(n, nodeSet) + }) +} + +func localNodeInSet(n NeoFSNetwork, nodes []netmap.NodeInfo) bool { + return slices.ContainsFunc(nodes, func(node netmap.NodeInfo) bool { + return n.IsLocalNodePublicKey(node.PublicKey()) + }) +}