diff --git a/pkg/services/object/get/ec.go b/pkg/services/object/get/ec.go index 65d6298c71..b46c953fc6 100644 --- a/pkg/services/object/get/ec.go +++ b/pkg/services/object/get/ec.go @@ -75,7 +75,7 @@ func (s *Service) getECObjectHeader(ctx context.Context, cnr cid.ID, id oid.ID, var firstErr error for i := range ecRules { hdr, err := s.getECObjectHeaderByRule(ctx, *localNodeKey, cnr, id, sTok, sortedNodeLists[i]) - if err == nil || errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, ctx.Err()) { + if err == nil || errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) { return hdr, err } @@ -128,7 +128,7 @@ func (s *Service) getECObjectHeaderByRule(ctx context.Context, localNodeKey ecds err = convertContextStatus(err) - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, ctx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) { return object.Object{}, err } @@ -174,7 +174,7 @@ func (s *Service) copyECObject(ctx context.Context, cnr cid.ID, parent oid.ID, s } return nil } - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, ctx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) { return err } @@ -224,7 +224,7 @@ nextPart: continue nextPart } - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, ctx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) { return err } @@ -270,7 +270,7 @@ func (s *Service) restoreFromECPartsByRule(ctx context.Context, cnr cid.ID, pare eg.Go(func() error { parentHdr, partPayload, err := s.getECPart(gCtx, cnr, parent, sTok, rule, ruleIdx, sortedNodes, partIdx) if err != nil { - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, gCtx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, gCtx.Err()) { return err } if failed := failCounter.Add(1); failed > uint32(rule.ParityPartNum) { @@ -332,7 +332,7 @@ func (s *Service) restoreFromECPartsByRule(ctx context.Context, cnr cid.ID, pare eg.Go(func() error { _, part, err := s.getECPart(gCtx, cnr, parent, sTok, rule, ruleIdx, sortedNodes, partIdx) if err != nil { - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, gCtx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, gCtx.Err()) { return err } if failed := failCounter.Add(1); failed+uint32(rem) > uint32(rule.ParityPartNum) { @@ -408,7 +408,7 @@ func (s *Service) getECPartStream(ctx context.Context, cnr cid.ID, parent oid.ID if err == nil { return partHdr, rc, nil } - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) { return object.Object{}, nil, err } if errors.Is(err, ctx.Err()) { @@ -598,7 +598,7 @@ func (s *Service) copyECObjectRange(ctx context.Context, dst ChunkWriter, cnr ci for i := range ecRules { written, err := s.copyECObjectRangeByRule(ctx, dst, *localNodeKey, cnr, parent, sTok, ecRules[i], i, sortedNodeLists[i], off, ln) if err == nil || errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || - errors.Is(err, errStreamFailure) || errors.Is(err, ctx.Err()) { + errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, errStreamFailure) || errors.Is(err, ctx.Err()) { return err } @@ -815,7 +815,7 @@ func (s *Service) copyECObjectRangeByParts(ctx context.Context, dst ChunkWriter, failedPartIdx, failedPartWritten, written, err := s.copyECPartsRanges(stageCtx, dst, localNodeKey, cnr, parent, sTok, rule, ruleIdx, sortedNodes, fullPartLen, firstPartIdx, firstPartOff, lastPartIdx, lastPartTo, firstPartStream) if err == nil || errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || - errors.Is(err, errStreamFailure) || errors.Is(err, ctx.Err()) { + errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, errStreamFailure) || errors.Is(err, ctx.Err()) { return written, err } @@ -959,7 +959,8 @@ func (s *Service) getRecoveryECPartRanges(ctx context.Context, localNodeKey ecds eg.Go(func() error { part, err := s.readFullECPartRange(egCtx, cnr, parent, sTok, rule, ruleIdx, sortedNodes, partIdx, localNodeKey, fullPartLen) if err != nil { - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || errors.Is(err, egCtx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || + errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, egCtx.Err()) { return err } @@ -1041,7 +1042,8 @@ func (s *Service) getECPartRangeStream(ctx context.Context, cnr cid.ID, parent o rc, err = s.getECPartRangeFromNode(ctx, cnr, parent, off, ln, sTok, pi, localNodeKey, sortedNodes[i]) if err != nil { - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || errors.Is(err, ctx.Err()) { + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) || + errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) { return nil, err } if !errors.Is(err, apistatus.ErrObjectNotFound) { @@ -1078,7 +1080,7 @@ func (s *Service) getECPartRangeStream(ctx context.Context, cnr cid.ID, parent o _, rc, err = s.getECPartFromNode(ctx, cnr, parent, sTok, pi, sortedNodes[i]) if err != nil { err = convertContextStatus(err) - if errors.Is(err, ctx.Err()) { + if errors.Is(err, apistatus.ErrObjectAccessDenied) || errors.Is(err, ctx.Err()) { return nil, err } if !errors.Is(err, apistatus.ErrObjectNotFound) { diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 0bf40b7a51..bc430670b4 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -5,8 +5,11 @@ import ( "errors" "fmt" + "github.com/nspcc-dev/neofs-node/pkg/network" + "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" "github.com/nspcc-dev/neofs-node/pkg/util" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" "go.uber.org/zap" ) @@ -40,8 +43,45 @@ func (s *Service) Get(ctx context.Context, prm Prm) error { } } + ecNodeLists := nodeLists[len(repRules):] + if prm.forwarder != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) { + return s.proxyGetRequest(ctx, ecNodeLists, prm.forwarder, "GET", nil) + } + return s.copyECObject(ctx, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(), - ecRules, nodeLists[len(repRules):], prm.objWriter) + ecRules, ecNodeLists, prm.objWriter) +} + +func (s *Service) proxyGetRequest(ctx context.Context, sortedNodeLists [][]netmap.NodeInfo, proxyFn RequestForwarder, + req string, headWriter internal.HeaderWriter) error { + for i := range sortedNodeLists { + for j := range sortedNodeLists[i] { + conn, node, err := s.conns.(*clientCacheWrapper)._connect(sortedNodeLists[i][j]) + if err != nil { + // TODO: implement address list stringer for lazy encoding + s.log.Debug("get conn to remote node", + zap.String("addresses", network.StringifyGroup(node.AddressGroup())), zap.Error(err)) + continue + } + + hdr, err := proxyFn(ctx, node, conn) + if err == nil { + if headWriter != nil { + return headWriter.WriteHeader(hdr) + } + return nil + } + + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || + errors.Is(err, apistatus.ErrObjectOutOfRange) || errors.Is(err, ctx.Err()) { + return err + } + + s.log.Info("request proxy failed", zap.String("request", req), zap.Error(err)) + } + } + + return apistatus.ErrObjectNotFound } // GetRange serves a request to get an object by address, and returns Streamer instance. @@ -73,8 +113,13 @@ func (s *Service) GetRange(ctx context.Context, prm RangePrm) error { } } + ecNodeLists := nodeLists[len(repRules):] + if prm.forwarder != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) { + return s.proxyGetRequest(ctx, ecNodeLists, prm.forwarder, "RANGE", nil) + } + return s.copyECObjectRange(ctx, prm.objWriter, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(), - ecRules, nodeLists[len(repRules):], prm.rng.GetOffset(), prm.rng.GetLength()) + ecRules, ecNodeLists, prm.rng.GetOffset(), prm.rng.GetLength()) } func (s *Service) getRange(ctx context.Context, prm RangePrm, opts ...execOption) error { @@ -141,8 +186,13 @@ func (s *Service) Head(ctx context.Context, prm HeadPrm) error { } } + ecNodeLists := nodeLists[len(repRules):] + if prm.forwarder != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) { + return s.proxyGetRequest(ctx, ecNodeLists, prm.forwarder, "HEAD", prm.objWriter) + } + return s.copyECObjectHeader(ctx, prm.objWriter, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(), - ecRules, nodeLists[len(repRules):]) + ecRules, ecNodeLists) } func (s *Service) get(ctx context.Context, prm commonPrm, opts ...execOption) statusError { diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 78ce61e54c..f4f560acd7 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "slices" coreclient "github.com/nspcc-dev/neofs-node/pkg/core/client" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" @@ -405,11 +406,16 @@ func (c *clientCacheWrapper) InitGetObjectRangeStream(ctx context.Context, node } func (c *clientCacheWrapper) connect(node netmap.NodeInfo) (coreclient.MultiAddressClient, error) { + conn, _, err := c._connect(node) + return conn, err +} + +func (c *clientCacheWrapper) _connect(node netmap.NodeInfo) (coreclient.MultiAddressClient, coreclient.NodeInfo, error) { // TODO: code is copied from pkg/services/object/get/container.go:63. Worth sharing? // TODO: we may waste resources doing this per request. Make once on network map change instead. var ag network.AddressGroup if err := ag.FromIterator(network.NodeEndpointsIterator(node)); err != nil { - return nil, fmt.Errorf("decode SN network addresses: %w", err) + return nil, coreclient.NodeInfo{}, fmt.Errorf("decode SN network addresses: %w", err) } var ni coreclient.NodeInfo @@ -418,10 +424,10 @@ func (c *clientCacheWrapper) connect(node netmap.NodeInfo) (coreclient.MultiAddr conn, err := c.cache.Get(ni) if err != nil { - return nil, fmt.Errorf("get conn: %w", err) + return nil, coreclient.NodeInfo{}, fmt.Errorf("get conn: %w", err) } - return conn, nil + return conn, ni, nil } // TODO: share. @@ -436,3 +442,15 @@ func convertContextStatus(err error) error { return context.DeadlineExceeded } } + +func localNodeInSets(n NeoFSNetwork, nodeSets [][]netmap.NodeInfo) bool { + return slices.ContainsFunc(nodeSets, func(nodeSet []netmap.NodeInfo) bool { + return localNodeInSet(n, nodeSet) + }) +} + +func localNodeInSet(n NeoFSNetwork, nodes []netmap.NodeInfo) bool { + return slices.ContainsFunc(nodes, func(node netmap.NodeInfo) bool { + return n.IsLocalNodePublicKey(node.PublicKey()) + }) +}