Skip to content

Commit aeab16b

Browse files
committed
Try fix
Signed-off-by: Leonard Lyubich <[email protected]>
1 parent 297a138 commit aeab16b

File tree

4 files changed

+46
-12
lines changed

4 files changed

+46
-12
lines changed

pkg/services/object/get/exec.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ type execCtx struct {
3535

3636
ctx context.Context
3737

38-
prm RangePrm
38+
prm RangePrm
39+
prmRangeHash *RangeHashPrm
3940

4041
statusError
4142

@@ -81,6 +82,12 @@ func withPayloadRange(r *objectSDK.Range) execOption {
8182
}
8283
}
8384

85+
func withHash(p *RangeHashPrm) execOption {
86+
return func(ctx *execCtx) {
87+
ctx.prmRangeHash = p
88+
}
89+
}
90+
8491
func withLogger(l *zap.Logger) execOption {
8592
return func(ctx *execCtx) {
8693
ctx.log = l
@@ -411,8 +418,15 @@ func (exec execCtx) isForwardingEnabled() bool {
411418
return exec.prm.forwarder != nil
412419
}
413420

421+
// isRangeHashForwardingEnabled returns true if common execution
422+
// parameters has GETRANGEHASH request forwarding closure set.
423+
func (exec execCtx) isRangeHashForwardingEnabled() bool {
424+
return exec.prm.rangeForwarder != nil
425+
}
426+
414427
// disableForwarding removes request forwarding closure from common
415428
// parameters, so it won't be inherited in new execution contexts.
416429
func (exec *execCtx) disableForwarding() {
417430
exec.prm.SetRequestForwarder(nil)
431+
exec.prm.SetRangeHashRequestForwarder(nil)
418432
}

pkg/services/object/get/get.go

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,19 +69,29 @@ func (s *Service) GetRange(ctx context.Context, prm RangePrm) error {
6969
return s.copyLocalECPartRange(prm.objWriter, prm.addr.Container(), prm.addr.Object(), pi, prm.rng.GetOffset(), prm.rng.GetLength())
7070
}
7171

72-
return s.getRange(ctx, prm, nodeLists, repRules, ecRules)
72+
return s.getRange(ctx, prm, nodeLists, repRules, ecRules, nil)
7373
}
7474

75-
func (s *Service) getRange(ctx context.Context, prm RangePrm, nodeLists [][]netmapsdk.NodeInfo, repRules []uint, ecRules []iec.Rule) error {
75+
func (s *Service) getRange(ctx context.Context, prm RangePrm, nodeLists [][]netmapsdk.NodeInfo, repRules []uint, ecRules []iec.Rule,
76+
hashPrm *RangeHashPrm) error {
7677
if len(repRules) > 0 { // REP format does not require encoding
77-
err := s.get(ctx, prm.commonPrm, withPreSortedContainerNodes(nodeLists[:len(repRules)], repRules), withPayloadRange(prm.rng)).err
78+
err := s.get(ctx, prm.commonPrm, withPreSortedContainerNodes(nodeLists[:len(repRules)], repRules), withPayloadRange(prm.rng), withHash(hashPrm)).err
7879
if len(ecRules) == 0 || !errors.Is(err, apistatus.ErrObjectNotFound) {
7980
return err
8081
}
8182
}
8283

84+
ecNodeLists := nodeLists[len(repRules):]
85+
if hashPrm != nil && prm.rangeForwarder != nil && !localNodeInSets(s.neoFSNet, nodeLists) {
86+
hashes, err := s.proxyHashRequest(ctx, ecNodeLists, prm.rangeForwarder)
87+
if err == nil {
88+
hashPrm.forwardedRangeHashResponse = hashes
89+
}
90+
return err
91+
}
92+
8393
return s.copyECObjectRange(ctx, prm.objWriter, prm.addr.Container(), prm.addr.Object(), prm.common.SessionToken(),
84-
ecRules, nodeLists[len(repRules):], prm.rng.GetOffset(), prm.rng.GetLength())
94+
ecRules, ecNodeLists, prm.rng.GetOffset(), prm.rng.GetLength())
8595
}
8696

8797
func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHashRes, error) {
@@ -90,10 +100,6 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
90100
return nil, fmt.Errorf("get nodes for object: %w", err)
91101
}
92102

93-
if prm.rangeForwarder != nil && !localNodeInSets(s.neoFSNet, nodeLists) {
94-
return s.proxyHashRequest(ctx, nodeLists, prm.rangeForwarder)
95-
}
96-
97103
hashes := make([][]byte, 0, len(prm.rngs))
98104

99105
for _, rng := range prm.rngs {
@@ -113,10 +119,17 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
113119
hash: util.NewSaltingWriter(h, prm.salt),
114120
})
115121

116-
if err := s.getRange(ctx, rngPrm, nodeLists, repRules, ecRules); err != nil {
122+
if err := s.getRange(ctx, rngPrm, nodeLists, repRules, ecRules, &prm); err != nil {
117123
return nil, err
118124
}
119125

126+
if prm.forwardedRangeHashResponse != nil {
127+
// forwarder request case; no need to collect the other
128+
// parts, the whole response has already been received
129+
hashes = prm.forwardedRangeHashResponse
130+
break
131+
}
132+
120133
hashes = append(hashes, h.Sum(nil))
121134
}
122135

@@ -125,7 +138,7 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas
125138
}, nil
126139
}
127140

128-
func (s *Service) proxyHashRequest(ctx context.Context, sortedNodeLists [][]netmapsdk.NodeInfo, proxyFn RangeRequestForwarder) (*RangeHashRes, error) {
141+
func (s *Service) proxyHashRequest(ctx context.Context, sortedNodeLists [][]netmapsdk.NodeInfo, proxyFn RangeRequestForwarder) ([][]byte, error) {
129142
for i := range sortedNodeLists {
130143
for j := range sortedNodeLists[i] {
131144
conn, node, err := s.conns.(*clientCacheWrapper)._connect(sortedNodeLists[i][j])
@@ -138,7 +151,7 @@ func (s *Service) proxyHashRequest(ctx context.Context, sortedNodeLists [][]netm
138151

139152
hashes, err := proxyFn(ctx, node, conn)
140153
if err == nil {
141-
return &RangeHashRes{hashes: hashes}, nil
154+
return hashes, nil
142155
}
143156

144157
if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) ||

pkg/services/object/get/prm.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ type RangeHashPrm struct {
3333
rngs []object.Range
3434

3535
salt []byte
36+
37+
forwardedRangeHashResponse [][]byte
3638
}
3739

3840
type RequestForwarder func(context.Context, coreclient.NodeInfo, coreclient.MultiAddressClient) (*object.Object, error)

pkg/services/object/get/util.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,11 @@ func (c *clientWrapper) getObject(exec *execCtx, info coreclient.NodeInfo) (*obj
214214
return hdr, nil, nil
215215
}
216216

217+
if rngH := exec.prmRangeHash; rngH != nil && exec.isRangeHashForwardingEnabled() {
218+
exec.prmRangeHash.forwardedRangeHashResponse, err = exec.prm.rangeForwarder(exec.ctx, info, c.client)
219+
return nil, nil, err
220+
}
221+
217222
// we don't specify payload writer because we accumulate
218223
// the object locally (even huge).
219224
if rng := exec.ctxRange(); rng != nil {

0 commit comments

Comments
 (0)