Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 30 additions & 9 deletions pkg/services/object/get/ec.go
Original file line number Diff line number Diff line change
Expand Up @@ -595,8 +595,15 @@ func (s *Service) copyECObjectRange(ctx context.Context, dst ChunkWriter, cnr ci
// https://github.com/nspcc-dev/neofs-node/issues/3563
// TODO: limit per-rule context? https://github.com/nspcc-dev/neofs-node/issues/3560
var firstErr error
var fullPldLen uint64
var written uint64
for i := range ecRules {
written, err := s.copyECObjectRangeByRule(ctx, dst, *localNodeKey, cnr, parent, sTok, ecRules[i], i, sortedNodeLists[i], off, ln)
if fullPldLen == 0 {
written, fullPldLen, err = s.copyECObjectRangeByRule(ctx, dst, *localNodeKey, cnr, parent, sTok, ecRules[i], i, sortedNodeLists[i], off, ln)
} else {
written, err = s.copyECObjectRangeByParts(ctx, dst, *localNodeKey, cnr, parent, sTok, ecRules[i], i, sortedNodeLists[i],
fullPldLen, off, ln, nil)
}
if err == nil || errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectOutOfRange) ||
errors.Is(err, errStreamFailure) || errors.Is(err, ctx.Err()) {
return err
Expand All @@ -616,7 +623,16 @@ func (s *Service) copyECObjectRange(ctx context.Context, dst ChunkWriter, cnr ci
zap.Int("ruleIdx", i), zap.Int("rulesLeft", len(ecRules)-i-1), zap.Uint64("off", off),
zap.Uint64("len", ln), zap.Uint64("written", written), zap.Error(err))

if fullPldLen == 0 { // not yet reached
continue
}

off += written

if ln == 0 {
ln = fullPldLen
}
ln -= written
}

return fmt.Errorf("%w: failed processing of all %d EC rules, first error: %w", apistatus.ErrObjectNotFound, len(ecRules), firstErr)
Expand Down Expand Up @@ -705,6 +721,7 @@ nextPart:
zap.Uint64("partOff", partOff), zap.Uint64("partLen", partLen), zap.Uint64("written", written), zap.Error(err))

partOff += written
partLen -= written
}

if fromRule >= len(rules) {
Expand All @@ -718,14 +735,14 @@ nextPart:

func (s *Service) copyECObjectRangeByRule(ctx context.Context, dst ChunkWriter, localNodeKey ecdsa.PrivateKey, cnr cid.ID,
parent oid.ID, sTok *session.Object, rule iec.Rule, ruleIdx int, sortedNodes []netmap.NodeInfo,
off, ln uint64) (uint64, error) {
off, ln uint64) (uint64, uint64, error) {
deadline, deadlineSet := ctx.Deadline()
var stageTimeout time.Duration

// Resolve full parent and all parts' len. Since this op should go fast, limit it to 10% of remaining context.
if deadlineSet {
if stageTimeout = time.Until(deadline) / 10; stageTimeout <= 0 {
return 0, context.DeadlineExceeded
return 0, 0, context.DeadlineExceeded
}
} else {
stageTimeout = 10 * time.Second
Expand All @@ -735,7 +752,7 @@ func (s *Service) copyECObjectRangeByRule(ctx context.Context, dst ChunkWriter,

partHdr, firstPartStream, err := s.getECPartStream(stageCtx, cnr, parent, sTok, rule, ruleIdx, sortedNodes, 0)
if err != nil {
return 0, fmt.Errorf("resolve parent payload length: %w", err)
return 0, 0, fmt.Errorf("resolve parent payload length: %w", err)
}
if firstPartStream != nil {
defer firstPartStream.Close()
Expand All @@ -749,19 +766,23 @@ func (s *Service) copyECObjectRangeByRule(ctx context.Context, dst ChunkWriter,
} else {
err = convertContextStatus(err)
}
return 0, fmt.Errorf("read size-split linker payload stream: %w", err)
return 0, 0, fmt.Errorf("read size-split linker payload stream: %w", err)
}
partHdr.SetPayload(buf)
return 0, sizeSplitinkerError(partHdr)
return 0, 0, sizeSplitinkerError(partHdr)
}

parentHdr := partHdr.Parent()
if parentHdr == nil {
return 0, errors.New("missing parent header in object for part") // TODO: copied, share?
return 0, 0, errors.New("missing parent header in object for part") // TODO: copied, share?
}

return s.copyECObjectRangeByParts(ctx, dst, localNodeKey, cnr, parent, sTok, rule, ruleIdx, sortedNodes,
parentHdr.PayloadSize(), off, ln, firstPartStream)
pldLen := parentHdr.PayloadSize()

written, err := s.copyECObjectRangeByParts(ctx, dst, localNodeKey, cnr, parent, sTok, rule, ruleIdx, sortedNodes,
pldLen, off, ln, firstPartStream)

return written, pldLen, err
}

func (s *Service) copyECObjectRangeByParts(ctx context.Context, dst ChunkWriter, localNodeKey ecdsa.PrivateKey, cnr cid.ID,
Expand Down
Loading