Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 14 additions & 12 deletions pkg/services/object/get/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (s *Service) getECObjectHeader(ctx context.Context, cnr cid.ID, id oid.ID,
var firstErr error
for i := range ecRules {
hdr, err := s.getECObjectHeaderByRule(ctx, *localNodeKey, cnr, id, sTok, sortedNodeLists[i])
if err == nil || errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, ctx.Err()) {
if err == nil || errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) {
return hdr, err
}

Expand Down Expand Up @@ -128,7 +128,7 @@ func (s *Service) getECObjectHeaderByRule(ctx context.Context, localNodeKey ecds

err = convertContextStatus(err)

if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, ctx.Err()) {
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) {
return object.Object{}, err
}

Expand Down Expand Up @@ -174,7 +174,7 @@ func (s *Service) copyECObject(ctx context.Context, cnr cid.ID, parent oid.ID, s
}
return nil
}
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, ctx.Err()) {
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) {
return err
}

Expand Down Expand Up @@ -224,7 +224,7 @@ nextPart:
continue nextPart
}

if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, ctx.Err()) {
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) {
return err
}

Expand Down Expand Up @@ -270,7 +270,7 @@ func (s *Service) restoreFromECPartsByRule(ctx context.Context, cnr cid.ID, pare
eg.Go(func() error {
parentHdr, partPayload, err := s.getECPart(gCtx, cnr, parent, sTok, rule, ruleIdx, sortedNodes, partIdx)
if err != nil {
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, gCtx.Err()) {
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, gCtx.Err()) {
return err
}
if failed := failCounter.Add(1); failed > uint32(rule.ParityPartNum) {
Expand Down Expand Up @@ -332,7 +332,7 @@ func (s *Service) restoreFromECPartsByRule(ctx context.Context, cnr cid.ID, pare
eg.Go(func() error {
_, part, err := s.getECPart(gCtx, cnr, parent, sTok, rule, ruleIdx, sortedNodes, partIdx)
if err != nil {
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, gCtx.Err()) {
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, gCtx.Err()) {
return err
}
if failed := failCounter.Add(1); failed+uint32(rem) > uint32(rule.ParityPartNum) {
Expand Down Expand Up @@ -408,7 +408,7 @@ func (s *Service) getECPartStream(ctx context.Context, cnr cid.ID, parent oid.ID
if err == nil {
return partHdr, rc, nil
}
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) {
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) {
return object.Object{}, nil, err
}
if errors.Is(err, ctx.Err()) {
Expand Down Expand Up @@ -598,7 +598,7 @@ func (s *Service) copyECObjectRange(ctx context.Context, dst ChunkWriter, cnr ci
for i := range ecRules {
written, err := s.copyECObjectRangeByRule(ctx, dst, *localNodeKey, cnr, parent, sTok, ecRules[i], i, sortedNodeLists[i], off, ln)
if err == nil || errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) ||
errors.Is(err, errStreamFailure) || errors.Is(err, ctx.Err()) {
errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, errStreamFailure) || errors.Is(err, ctx.Err()) {
return err
}

Expand Down Expand Up @@ -815,7 +815,7 @@ func (s *Service) copyECObjectRangeByParts(ctx context.Context, dst ChunkWriter,
failedPartIdx, failedPartWritten, written, err := s.copyECPartsRanges(stageCtx, dst, localNodeKey, cnr, parent, sTok, rule, ruleIdx, sortedNodes,
fullPartLen, firstPartIdx, firstPartOff, lastPartIdx, lastPartTo, firstPartStream)
if err == nil || errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) ||
errors.Is(err, errStreamFailure) || errors.Is(err, ctx.Err()) {
errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, errStreamFailure) || errors.Is(err, ctx.Err()) {
return written, err
}

Expand Down Expand Up @@ -959,7 +959,8 @@ func (s *Service) getRecoveryECPartRanges(ctx context.Context, localNodeKey ecds
eg.Go(func() error {
part, err := s.readFullECPartRange(egCtx, cnr, parent, sTok, rule, ruleIdx, sortedNodes, partIdx, localNodeKey, fullPartLen)
if err != nil {
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || errors.Is(err, egCtx.Err()) {
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) ||
errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, egCtx.Err()) {
return err
}

Expand Down Expand Up @@ -1041,7 +1042,8 @@ func (s *Service) getECPartRangeStream(ctx context.Context, cnr cid.ID, parent o

rc, err = s.getECPartRangeFromNode(ctx, cnr, parent, off, ln, sTok, pi, localNodeKey, sortedNodes[i])
if err != nil {
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || errors.Is(err, ctx.Err()) {
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) ||
errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) {
return nil, err
}
if !errors.Is(err, apistatus.ErrObjectNotFound) {
Expand Down Expand Up @@ -1078,7 +1080,7 @@ func (s *Service) getECPartRangeStream(ctx context.Context, cnr cid.ID, parent o
_, rc, err = s.getECPartFromNode(ctx, cnr, parent, sTok, pi, sortedNodes[i])
if err != nil {
err = convertContextStatus(err)
if errors.Is(err, ctx.Err()) {
if errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) {
return nil, err
}
if !errors.Is(err, apistatus.ErrObjectNotFound) {
Expand Down
56 changes: 53 additions & 3 deletions pkg/services/object/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import (
"errors"
"fmt"

"github.com/nspcc-dev/neofs-node/pkg/network"
"github.com/nspcc-dev/neofs-node/pkg/services/object/internal"
"github.com/nspcc-dev/neofs-node/pkg/util"
apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/nspcc-dev/neofs-sdk-go/object"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -40,8 +43,45 @@ func (s *Service) Get(ctx context.Context, prm Prm) error {
}
}

ecNodeLists := nodeLists[len(repRules):]
if prm.forwarder != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) {
return s.proxyGetRequest(ctx, ecNodeLists, prm.forwarder, "GET", nil)
}

return s.copyECObject(ctx, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(),
ecRules, nodeLists[len(repRules):], prm.objWriter)
ecRules, ecNodeLists, prm.objWriter)
}

func (s *Service) proxyGetRequest(ctx context.Context, sortedNodeLists [][]netmap.NodeInfo, proxyFn RequestForwarder,
req string, headWriter internal.HeaderWriter) error {
for i := range sortedNodeLists {
for j := range sortedNodeLists[i] {
conn, node, err := s.conns.(*clientCacheWrapper)._connect(sortedNodeLists[i][j])
if err != nil {
// TODO: implement address list stringer for lazy encoding
s.log.Debug("get conn to remote node",
zap.String("addresses", network.StringifyGroup(node.AddressGroup())), zap.Error(err))
continue
}

hdr, err := proxyFn(ctx, node, conn)
if err == nil {
if headWriter != nil {
return headWriter.WriteHeader(hdr)
}
return nil
}

if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) ||
errors.Is(err, apistatus.ErrObjectOutOfRange) || errors.Is(err, ctx.Err()) {
return err
}

s.log.Info("request proxy failed", zap.String("request", req), zap.Error(err))
}
}

return apistatus.ErrObjectNotFound
}

// GetRange serves a request to get an object by address, and returns Streamer instance.
Expand Down Expand Up @@ -73,8 +113,13 @@ func (s *Service) GetRange(ctx context.Context, prm RangePrm) error {
}
}

ecNodeLists := nodeLists[len(repRules):]
if prm.forwarder != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) {
return s.proxyGetRequest(ctx, ecNodeLists, prm.forwarder, "RANGE", nil)
}

return s.copyECObjectRange(ctx, prm.objWriter, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(),
ecRules, nodeLists[len(repRules):], prm.rng.GetOffset(), prm.rng.GetLength())
ecRules, ecNodeLists, prm.rng.GetOffset(), prm.rng.GetLength())
}

func (s *Service) getRange(ctx context.Context, prm RangePrm, opts ...execOption) error {
Expand Down Expand Up @@ -141,8 +186,13 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error {
}
}

ecNodeLists := nodeLists[len(repRules):]
if prm.forwarder != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) {
return s.proxyGetRequest(ctx, ecNodeLists, prm.forwarder, "HEAD", prm.objWriter)
}

return s.copyECObjectHeader(ctx, prm.objWriter, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(),
ecRules, nodeLists[len(repRules):])
ecRules, ecNodeLists)
}

func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError {
Expand Down
24 changes: 21 additions & 3 deletions pkg/services/object/get/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"slices"

coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine"
Expand Down Expand Up @@ -405,11 +406,16 @@ func (c *clientCacheWrapper) InitGetObjectRangeStream(ctx context.Context, node
}

func (c *clientCacheWrapper) connect(node netmap.NodeInfo) (coreclient.MultiAddressClient, error) {
conn, _, err := c._connect(node)
return conn, err
}

func (c *clientCacheWrapper) _connect(node netmap.NodeInfo) (coreclient.MultiAddressClient, coreclient.NodeInfo, error) {
// TODO: code is copied from pkg/services/object/get/container.go:63. Worth sharing?
// TODO: we may waste resources doing this per request. Make once on network map change instead.
var ag network.AddressGroup
if err := ag.FromIterator(network.NodeEndpointsIterator(node)); err != nil {
return nil, fmt.Errorf("decode SN network addresses: %w", err)
return nil, coreclient.NodeInfo{}, fmt.Errorf("decode SN network addresses: %w", err)
}

var ni coreclient.NodeInfo
Expand All @@ -418,10 +424,10 @@ func (c *clientCacheWrapper) connect(node netmap.NodeInfo) (coreclient.MultiAddr

conn, err := c.cache.Get(ni)
if err != nil {
return nil, fmt.Errorf("get conn: %w", err)
return nil, coreclient.NodeInfo{}, fmt.Errorf("get conn: %w", err)
}

return conn, nil
return conn, ni, nil
}

// TODO: share.
Expand All @@ -436,3 +442,15 @@ func convertContextStatus(err error) error {
return context.DeadlineExceeded
}
}

func localNodeInSets(n NeoFSNetwork, nodeSets [][]netmap.NodeInfo) bool {
return slices.ContainsFunc(nodeSets, func(nodeSet []netmap.NodeInfo) bool {
return localNodeInSet(n, nodeSet)
})
}

func localNodeInSet(n NeoFSNetwork, nodes []netmap.NodeInfo) bool {
return slices.ContainsFunc(nodes, func(node netmap.NodeInfo) bool {
return n.IsLocalNodePublicKey(node.PublicKey())
})
}
Loading