From 93b920c8b9acb1347e83d7184dd4cedfbe8bfc9f Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Wed, 8 Oct 2025 14:05:10 +0300 Subject: [PATCH] sn/policer: Recreate unavailable EC parts in container when possible Extend `Policer` with workers checking availability of EC parts. Free worker takes the job each time some EC part is processed by regular Policer routine. If no more than N parts are unavailable (where N is a number of parity parts in the container policy), they are recreated using available ones. The part has consistent ID even after restoration which is good for deduplication. Closes #3554. Signed-off-by: Leonard Lyubich --- cmd/neofs-node/object.go | 2 +- internal/ec/ec.go | 19 + pkg/local_object_storage/engine/ec.go | 72 +++ pkg/local_object_storage/engine/ec_test.go | 394 +++++++++++++++- pkg/local_object_storage/engine/engine.go | 2 + .../engine/engine_test.go | 63 ++- pkg/local_object_storage/engine/metrics.go | 1 + pkg/local_object_storage/shard/ec.go | 29 ++ pkg/local_object_storage/shard/ec_test.go | 184 ++++++++ .../shard/shard_internal_test.go | 23 + pkg/metrics/engine.go | 14 + pkg/services/object/get/ec.go | 14 + pkg/services/object/get/get.go | 15 + pkg/services/object/get/service.go | 2 + pkg/services/object/get/service_test.go | 4 + pkg/services/object/head/remote.go | 87 ++++ pkg/services/policer/check.go | 4 +- pkg/services/policer/ec.go | 420 +++++++++++++++++- pkg/services/policer/policer.go | 39 +- pkg/services/policer/policer_test.go | 28 +- pkg/services/replicator/process.go | 13 + 21 files changed, 1410 insertions(+), 19 deletions(-) diff --git a/cmd/neofs-node/object.go b/cmd/neofs-node/object.go index 4e2620b03d..8bd69deb3c 100644 --- a/cmd/neofs-node/object.go +++ b/cmd/neofs-node/object.go @@ -199,7 +199,7 @@ func initObjectService(c *cfg) { ), ) - c.policer = policer.New( + c.policer = policer.New(neofsecdsa.Signer(c.key.PrivateKey), policer.WithLogger(c.log), policer.WithLocalStorage(ls), policer.WithRemoteHeader( diff --git a/internal/ec/ec.go b/internal/ec/ec.go index 4eee2189a0..f1ed0c2a0b 100644 --- a/internal/ec/ec.go +++ b/internal/ec/ec.go @@ -112,6 +112,25 @@ func DecodeRange(rule Rule, fromIdx, toIdx int, parts [][]byte) error { return nil } +// DecodeIndexes decodes specified EC parts obtained by applying specified rule. +func DecodeIndexes(rule Rule, parts [][]byte, idxs []int) error { + rs, err := newCoderForRule(rule) + if err != nil { + return err + } + + required := make([]bool, rule.DataPartNum+rule.ParityPartNum) + for i := range idxs { + required[idxs[i]] = true + } + + if err := rs.ReconstructSome(parts, required); err != nil { + return fmt.Errorf("restore Reed-Solomon: %w", err) + } + + return nil +} + func newCoderForRule(rule Rule) (reedsolomon.Encoder, error) { enc, err := reedsolomon.New(int(rule.DataPartNum), int(rule.ParityPartNum)) if err != nil { // should never happen with correct rule diff --git a/pkg/local_object_storage/engine/ec.go b/pkg/local_object_storage/engine/ec.go index 76d3af654e..5ef24f6797 100644 --- a/pkg/local_object_storage/engine/ec.go +++ b/pkg/local_object_storage/engine/ec.go @@ -203,3 +203,75 @@ loop: return 0, nil, apistatus.ErrObjectNotFound } + +// HeadECPart is similar to [StorageEngine.GetECPart] but returns only the header. +func (e *StorageEngine) HeadECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Object, error) { + if e.metrics != nil { + defer elapsed(e.metrics.AddHeadECPartDuration)() + } + + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + if e.blockErr != nil { + return object.Object{}, e.blockErr + } + + // TODO: sync placement with PUT. They should sort shards equally, but now PUT sorts by part ID. + // https://github.com/nspcc-dev/neofs-node/issues/3537 + s := e.sortShardsFn(e, oid.NewAddress(cnr, parent)) + + var partID oid.ID +loop: + for i := range s { + hdr, err := s[i].shardIface.HeadECPart(cnr, parent, pi) + switch { + case err == nil: + return hdr, nil + case errors.Is(err, apistatus.ErrObjectAlreadyRemoved): + return object.Object{}, err + case errors.Is(err, meta.ErrObjectIsExpired): + return object.Object{}, apistatus.ErrObjectNotFound // like Get + case errors.As(err, (*ierrors.ObjectID)(&partID)): + if partID.IsZero() { + panic("zero object ID returned as error") + } + + e.log.Info("EC part's object ID resolved in shard but reading failed, continue by ID", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), + zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index), + zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err)) + // TODO: need report error? Same for other places. https://github.com/nspcc-dev/neofs-node/issues/3538 + + s = s[i+1:] + break loop + case errors.Is(err, apistatus.ErrObjectNotFound): + default: + e.log.Info("failed to get EC part header from shard, ignore error", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), + zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index), + zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err)) + } + } + + if partID.IsZero() { + return object.Object{}, apistatus.ErrObjectNotFound + } + + for i := range s { + // get an object bypassing the metabase. We can miss deletion or expiration mark. Get behaves like this, so here too. + hdr, err := s[i].shardIface.Head(oid.NewAddress(cnr, partID), true) + switch { + case err == nil: + return *hdr, nil + case errors.Is(err, apistatus.ErrObjectNotFound): + default: + e.log.Info("failed to get EC part header from shard bypassing metabase, ignore error", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), + zap.Int("ecRule", pi.RuleIndex), zap.Int("partIdx", pi.Index), + zap.Stringer("partID", partID), + zap.Stringer("shardID", s[i].shardIface.ID()), zap.Error(err)) + } + } + + return object.Object{}, apistatus.ErrObjectNotFound +} diff --git a/pkg/local_object_storage/engine/ec_test.go b/pkg/local_object_storage/engine/ec_test.go index a6e49388f3..0a1e4db117 100644 --- a/pkg/local_object_storage/engine/ec_test.go +++ b/pkg/local_object_storage/engine/ec_test.go @@ -66,7 +66,7 @@ func TestStorageEngine_GetECPart(t *testing.T) { const sleepTime = 50 * time.Millisecond // pretty big for test, pretty fast IRL var m testMetrics - shardOK.getECPartSleep = sleepTime + shardOK.eCPartSleep = sleepTime s := newEngineWithFixedShardOrder([]shardInterface{shardOK, unimplementedShard{}}) // to ensure 2nd shard is not accessed s.metrics = &m @@ -468,7 +468,7 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { const sleepTime = 50 * time.Millisecond // pretty big for test, pretty fast IRL var m testMetrics - shardOK.getECPartSleep = sleepTime + shardOK.eCPartSleep = sleepTime s := newEngineWithFixedShardOrder([]shardInterface{shardOK, unimplementedShard{}}) // to ensure 2nd shard is not accessed s.metrics = &m @@ -852,6 +852,396 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { lb.AssertEmpty() } +func TestStorageEngine_HeadECPart(t *testing.T) { + cnr := cidtest.ID() + parentID := oidtest.ID() + pi := iec.PartInfo{ + RuleIndex: 123, + Index: 456, + } + + t.Run("blocked", func(t *testing.T) { + s := newEngineWithFixedShardOrder([]shardInterface{unimplementedShard{}}) // to ensure shards are not accessed + + e := errors.New("any error") + require.NoError(t, s.BlockExecution(e)) + + _, err := s.HeadECPart(cnr, parentID, pi) + require.Equal(t, e, err) + }) + + var parentObj object.Object + parentObj.SetContainerID(cnr) + parentObj.SetID(parentID) + + partObj, err := iec.FormObjectForECPart(neofscryptotest.Signer(), parentObj, testutil.RandByteSlice(32), pi) + require.NoError(t, err) + + partHdr := *partObj.CutPayload() + + partID := partObj.GetID() + partAddr := oid.NewAddress(cnr, partID) + + shardOK := &mockShard{ + headECPart: map[headECPartKey]headECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {hdr: partHdr}, + }, + } + + t.Run("metric", func(t *testing.T) { + const sleepTime = 50 * time.Millisecond // pretty big for test, pretty fast IRL + var m testMetrics + + shardOK.eCPartSleep = sleepTime + + s := newEngineWithFixedShardOrder([]shardInterface{shardOK, unimplementedShard{}}) // to ensure 2nd shard is not accessed + s.metrics = &m + + _, _ = s.HeadECPart(cnr, parentID, pi) + require.GreaterOrEqual(t, time.Duration(m.headECPart.Load()), sleepTime) + }) + + t.Run("zero OID error", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{&mockShard{ + headECPart: map[headECPartKey]headECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: fmt.Errorf("some error: %w", ierrors.ObjectID{})}, + }, + }, unimplementedShard{}}) // to ensure 2nd shard is not accessed + s.log = l + + require.PanicsWithValue(t, "zero object ID returned as error", func() { + _, _ = s.HeadECPart(cnr, parentID, pi) + }) + + lb.AssertEmpty() + }) + + shardAlreadyRemoved := &mockShard{ + headECPart: map[headECPartKey]headECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: apistatus.ErrObjectAlreadyRemoved}, + }, + } + shardExpired := &mockShard{ + headECPart: map[headECPartKey]headECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: meta.ErrObjectIsExpired}, + }, + } + shard500 := &mockShard{ + i: 0, + headECPart: map[headECPartKey]headECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: errors.New("some shard error")}, + }, + } + shard404 := &mockShard{ + headECPart: map[headECPartKey]headECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: apistatus.ErrObjectNotFound}, + }, + head: map[headKey]headValue{ + {addr: partAddr, raw: true}: {err: apistatus.ErrObjectNotFound}, + }, + } + + checkOK := func(t *testing.T, s *StorageEngine) { + hdr, err := s.HeadECPart(cnr, parentID, pi) + require.NoError(t, err) + require.Equal(t, partHdr, hdr) + } + checkErrorIs := func(t *testing.T, s *StorageEngine, e error) { + _, err := s.HeadECPart(cnr, parentID, pi) + require.ErrorIs(t, err, e) + } + + t.Run("404,OK", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, shardOK}) + s.log = l + + checkOK(t, s) + + lb.AssertEmpty() + }) + + t.Run("404,already removed", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, shardAlreadyRemoved}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectAlreadyRemoved) + + lb.AssertEmpty() + }) + + t.Run("404,expired", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, shardExpired}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertEmpty() + }) + + t.Run("404,404", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, shard404}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertEmpty() + }) + + t.Run("internal,OK", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard500, shardOK}) + s.log = l + + checkOK(t, s) + + lb.AssertSingle(testutil.LogEntry{ + Level: zap.InfoLevel, + Message: "failed to get EC part header from shard, ignore error", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("0")), + "error": "some shard error", + }, + }) + }) + + t.Run("internal,already removed", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard500, shardAlreadyRemoved}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectAlreadyRemoved) + + lb.AssertSingle(testutil.LogEntry{ + Level: zap.InfoLevel, + Message: "failed to get EC part header from shard, ignore error", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("0")), + "error": "some shard error", + }, + }) + }) + + t.Run("internal,expired", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard500, shardExpired}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertSingle(testutil.LogEntry{ + Level: zap.InfoLevel, + Message: "failed to get EC part header from shard, ignore error", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("0")), + "error": "some shard error", + }, + }) + }) + + t.Run("internal,404", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard500, shard404}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertSingle(testutil.LogEntry{ + Level: zap.InfoLevel, + Message: "failed to get EC part header from shard, ignore error", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("0")), + "error": "some shard error", + }, + }) + }) + + t.Run("404,OID,OK", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, &mockShard{ + i: 1, + headECPart: map[headECPartKey]headECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: fmt.Errorf("some error: %w", ierrors.ObjectID(partID))}, + }, + }, &mockShard{ + head: map[headKey]headValue{ + {addr: partAddr, raw: true}: {hdr: partHdr}, + }, + }}) + s.log = l + + checkOK(t, s) + + lb.AssertSingle(testutil.LogEntry{ + Level: zap.InfoLevel, + Message: "EC part's object ID resolved in shard but reading failed, continue by ID", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("1")), + "error": "some error: " + partID.String(), + }, + }) + }) + + t.Run("404,OID,404", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, &mockShard{ + i: 1, + headECPart: map[headECPartKey]headECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: fmt.Errorf("some error: %w", ierrors.ObjectID(partID))}, + }, + }, shard404}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertSingle(testutil.LogEntry{ + Level: zap.InfoLevel, + Message: "EC part's object ID resolved in shard but reading failed, continue by ID", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("1")), + "error": "some error: " + partID.String(), + }, + }) + }) + + t.Run("404,OID,internal", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shard404, &mockShard{ + i: 1, + headECPart: map[headECPartKey]headECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: fmt.Errorf("some error: %w", ierrors.ObjectID(partID))}, + }, + }, &mockShard{ + i: 2, + head: map[headKey]headValue{ + {addr: partAddr, raw: true}: {err: errors.New("some shard error")}, + }, + }}) + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertEqual([]testutil.LogEntry{{ + Level: zap.InfoLevel, + Message: "EC part's object ID resolved in shard but reading failed, continue by ID", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "shardID": base58.Encode([]byte("1")), + "error": "some error: " + partID.String(), + }, + }, { + Level: zap.InfoLevel, + Message: "failed to get EC part header from shard bypassing metabase, ignore error", + Fields: map[string]any{ + "container": cnr.String(), + "parent": parentID.String(), + "ecRule": json.Number("123"), + "partIdx": json.Number("456"), + "partID": partID.String(), + "shardID": base58.Encode([]byte("2")), + "error": "some shard error", + }, + }}) + }) + + t.Run("already removed", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shardAlreadyRemoved, unimplementedShard{}}) // to ensure 2nd shard is not accessed + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectAlreadyRemoved) + + lb.AssertEmpty() + }) + + t.Run("expired", func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shardExpired, unimplementedShard{}}) // to ensure 2nd shard is not accessed + s.log = l + + checkErrorIs(t, s, apistatus.ErrObjectNotFound) + + lb.AssertEmpty() + }) + + for _, tc := range []struct { + typ object.Type + associate func(*object.Object, oid.ID) + }{ + {typ: object.TypeTombstone, associate: (*object.Object).AssociateDeleted}, + {typ: object.TypeLock, associate: (*object.Object).AssociateLocked}, + } { + t.Run(tc.typ.String(), func(t *testing.T) { + const shardNum = 5 + s := testNewEngineWithShardNum(t, shardNum) + + sysObj := *generateObjectWithCID(cnr) + tc.associate(&sysObj, oidtest.ID()) + addAttribute(&sysObj, "__NEOFS__EXPIRATION_EPOCH", strconv.Itoa(123)) + + require.NoError(t, s.Put(&sysObj, nil)) + + hdr, err := s.HeadECPart(cnr, sysObj.GetID(), pi) + require.NoError(t, err) + require.Equal(t, sysObj, hdr) + }) + } + + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + s := newEngineWithFixedShardOrder([]shardInterface{shardOK, unimplementedShard{}}) // to ensure 2nd shard is not accessed + s.log = l + + checkOK(t, s) + + lb.AssertEmpty() +} + func assertGetECPartOK(t testing.TB, exp, hdr object.Object, rdr io.ReadCloser) { b, err := io.ReadAll(rdr) require.NoError(t, err) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index 9608ec2d85..c9db679e6b 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -49,6 +49,8 @@ type shardInterface interface { GetRangeStream(cnr cid.ID, id oid.ID, off, ln int64) (uint64, io.ReadCloser, error) GetECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, io.ReadCloser, error) GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln int64) (uint64, io.ReadCloser, error) + Head(oid.Address, bool) (*object.Object, error) + HeadECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, error) } type shardWrapper struct { diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 227a14e28e..4a22fe5d25 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -237,6 +237,14 @@ func (unimplementedShard) GetECPartRange(cid.ID, oid.ID, iec.PartInfo, int64, in panic("unimplemented") } +func (unimplementedShard) Head(oid.Address, bool) (*object.Object, error) { + panic("unimplemented") +} + +func (unimplementedShard) HeadECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, error) { + panic("unimplemented") +} + type getECPartKey struct { cnr cid.ID parent oid.ID @@ -283,13 +291,29 @@ type getStreamValue struct { err error } +type headKey struct { + addr oid.Address + raw bool +} + +type headValue struct { + hdr object.Object + err error +} + +type headECPartKey = getECPartKey + +type headECPartValue = headValue + type mockShard struct { i int - getECPartSleep time.Duration + eCPartSleep time.Duration getECPart map[getECPartKey]getECPartValue getRangeStream map[getRangeStreamKey]getRangeStreamValue getECPartRange map[getECPartRangeKey]getECPartRangeValue getStream map[getStreamKey]getStreamValue + head map[headKey]headValue + headECPart map[headECPartKey]headECPartValue } func (x *mockShard) ID() *shard.ID { @@ -335,7 +359,7 @@ func (x *mockShard) GetRangeStream(cnr cid.ID, id oid.ID, off, ln int64) (uint64 } func (x *mockShard) GetECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Object, io.ReadCloser, error) { - time.Sleep(x.getECPartSleep) + time.Sleep(x.eCPartSleep) val, ok := x.getECPart[getECPartKey{ cnr: cnr, parent: parent, @@ -348,7 +372,7 @@ func (x *mockShard) GetECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (objec } func (x *mockShard) GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln int64) (uint64, io.ReadCloser, error) { - time.Sleep(x.getECPartSleep) + time.Sleep(x.eCPartSleep) val, ok := x.getECPartRange[getECPartRangeKey{ cnr: cnr, parent: parent, @@ -375,6 +399,30 @@ func (x *mockShard) GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, o return val.obj.PayloadSize(), io.NopCloser(bytes.NewReader(pld[off:][:ln])), nil } +func (x *mockShard) Head(addr oid.Address, raw bool) (*object.Object, error) { + val, ok := x.head[headKey{ + addr: addr, + raw: raw, + }] + if !ok { + return nil, errors.New("[test] unexpected object requested") + } + return &val.hdr, val.err +} + +func (x *mockShard) HeadECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Object, error) { + time.Sleep(x.eCPartSleep) + val, ok := x.headECPart[headECPartKey{ + cnr: cnr, + parent: parent, + pi: pi, + }] + if !ok { + return object.Object{}, errors.New("[test] unexpected object requested") + } + return val.hdr, val.err +} + type unimplementedMetrics struct{} func (x unimplementedMetrics) AddListContainersDuration(time.Duration) { @@ -433,6 +481,10 @@ func (x unimplementedMetrics) AddGetECPartRangeDuration(d time.Duration) { panic("unimplemented") } +func (x unimplementedMetrics) AddHeadECPartDuration(time.Duration) { + panic("unimplemented") +} + func (x unimplementedMetrics) SetObjectCounter(string, string, uint64) { panic("unimplemented") } @@ -457,6 +509,7 @@ type testMetrics struct { unimplementedMetrics getECPart atomic.Int64 getECPartRange atomic.Int64 + headECPart atomic.Int64 } func (x *testMetrics) AddGetECPartDuration(d time.Duration) { @@ -466,3 +519,7 @@ func (x *testMetrics) AddGetECPartDuration(d time.Duration) { func (x *testMetrics) AddGetECPartRangeDuration(d time.Duration) { x.getECPartRange.Add(int64(d)) } + +func (x *testMetrics) AddHeadECPartDuration(d time.Duration) { + x.headECPart.Add(int64(d)) +} diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index 61d729c371..2ac97c4e86 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -19,6 +19,7 @@ type MetricRegister interface { AddListObjectsDuration(d time.Duration) AddGetECPartDuration(d time.Duration) AddGetECPartRangeDuration(d time.Duration) + AddHeadECPartDuration(d time.Duration) SetObjectCounter(shardID, objectType string, v uint64) AddToObjectCounter(shardID, objectType string, delta int) diff --git a/pkg/local_object_storage/shard/ec.go b/pkg/local_object_storage/shard/ec.go index 60cee8fd00..698f0e5ed9 100644 --- a/pkg/local_object_storage/shard/ec.go +++ b/pkg/local_object_storage/shard/ec.go @@ -118,3 +118,32 @@ func (s *Shard) GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, return pldLen, rc, nil } + +// HeadECPart is similar to [Shard.GetECPart] but returns only the header. +func (s *Shard) HeadECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Object, error) { + partID, err := s.metaBaseIface.ResolveECPart(cnr, parent, pi) + if err != nil { + return object.Object{}, fmt.Errorf("resolve part ID in metabase: %w", err) + } + + partAddr := oid.NewAddress(cnr, partID) + if s.hasWriteCache() { + hdr, err := s.writeCache.Head(partAddr) + if err == nil { + return *hdr, nil + } + + if errors.Is(err, apistatus.ErrObjectNotFound) { + s.log.Debug("EC part object is missing in write-cache, fallback to BLOB storage", zap.Stringer("partAddr", partAddr), zap.Error(err)) + } else { + s.log.Info("failed to get EC part object header from write-cache, fallback to BLOB storage", zap.Stringer("partAddr", partAddr), zap.Error(err)) + } + } + + hdr, err := s.blobStor.Head(partAddr) + if err != nil { + return object.Object{}, fmt.Errorf("get header from BLOB storage by ID %w: %w", ierrors.ObjectID(partID), err) + } + + return *hdr, nil +} diff --git a/pkg/local_object_storage/shard/ec_test.go b/pkg/local_object_storage/shard/ec_test.go index 55d88f36b4..8065a7f112 100644 --- a/pkg/local_object_storage/shard/ec_test.go +++ b/pkg/local_object_storage/shard/ec_test.go @@ -459,6 +459,190 @@ func TestShard_GetECPartRange(t *testing.T) { testGetECPartRangeStream(t, partObj, parentID, pi, s) } +func TestShard_HeadECPart(t *testing.T) { + cnr := cidtest.ID() + parentID := oidtest.ID() + pi := iec.PartInfo{ + RuleIndex: 123, + Index: 456, + } + + var parentObj object.Object + parentObj.SetContainerID(cnr) + parentObj.SetID(parentID) + + partObj, err := iec.FormObjectForECPart(neofscryptotest.Signer(), parentObj, testutil.RandByteSlice(32), pi) + require.NoError(t, err) + + partHdr := *partObj.CutPayload() + + partID := partObj.GetID() + partAddr := oid.NewAddress(cnr, partID) + + mb := mockMetabase{ + resolveECPart: map[resolveECPartKey]resolveECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {id: partID}, + }, + } + bs := mockBLOBStore{ + head: map[oid.Address]headValue{ + partAddr: {hdr: partHdr}, + }, + } + + // metabase errors + for _, tc := range []struct { + name string + err error + assertErr func(t *testing.T, err error) + }{ + {name: "internal error", err: errors.New("internal error"), assertErr: func(t *testing.T, err error) { + require.ErrorContains(t, err, "internal error") + }}, + {name: "object not found", err: apistatus.ErrObjectNotFound, assertErr: func(t *testing.T, err error) { + require.ErrorIs(t, err, apistatus.ErrObjectNotFound) + }}, + {name: "object already removed", err: apistatus.ErrObjectAlreadyRemoved, assertErr: func(t *testing.T, err error) { + require.ErrorIs(t, err, apistatus.ErrObjectAlreadyRemoved) + }}, + {name: "object expired", err: meta.ErrObjectIsExpired, assertErr: func(t *testing.T, err error) { + require.ErrorIs(t, err, meta.ErrObjectIsExpired) + }}, + } { + t.Run("metabase/"+tc.name, func(t *testing.T) { + mdb := mockMetabase{ + resolveECPart: map[resolveECPartKey]resolveECPartValue{ + {cnr: cnr, parent: parentID, pi: pi}: {err: tc.err}, + }, + } + + s := newSimpleTestShard(t, unimplementedBLOBStore{}, &mdb, unimplementedWriteCache{}) + + _, err := s.HeadECPart(cnr, parentID, pi) + require.ErrorContains(t, err, "resolve part ID in metabase") + tc.assertErr(t, err) + }) + } + + // BLOB storage errors + for _, tc := range []struct { + name string + err error + }{ + {name: "internal error", err: errors.New("internal error")}, + {name: "object not found", err: apistatus.ErrObjectNotFound}, + } { + t.Run("BLOB storage/"+tc.name, func(t *testing.T) { + bs := mockBLOBStore{ + head: map[oid.Address]headValue{ + partAddr: {err: tc.err}, + }, + } + + s := newSimpleTestShard(t, &bs, &mb, nil) + + _, err := s.HeadECPart(cnr, parentID, pi) + require.ErrorIs(t, err, tc.err) + require.ErrorContains(t, err, fmt.Sprintf("get header from BLOB storage by ID %s", partID)) + + var oidErr ierrors.ObjectID + require.ErrorAs(t, err, &oidErr) + require.EqualValues(t, partID, oidErr) + }) + } + + t.Run("writecache", func(t *testing.T) { + // errors + for _, tc := range []struct { + name string + err error + logMsg testutil.LogEntry + }{ + {name: "internal error", err: errors.New("internal error"), logMsg: testutil.LogEntry{Fields: map[string]any{ + "partAddr": partAddr.String(), + "error": "internal error", + }, Level: zap.InfoLevel, Message: "failed to get EC part object header from write-cache, fallback to BLOB storage"}}, + {name: "object not found", err: fmt.Errorf("wrapped: %w", apistatus.ErrObjectNotFound), logMsg: testutil.LogEntry{Fields: map[string]any{ + "partAddr": partAddr.String(), + "error": "wrapped: " + apistatus.ErrObjectNotFound.Error(), + }, Level: zap.DebugLevel, Message: "EC part object is missing in write-cache, fallback to BLOB storage"}}, + } { + t.Run(tc.name, func(t *testing.T) { + l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) + + wc := mockWriteCache{ + head: map[oid.Address]headValue{ + partAddr: {err: tc.err}, + }, + } + + s := newSimpleTestShard(t, &bs, &mb, &wc) + s.log = l + + hdr, err := s.HeadECPart(cnr, parentID, pi) + require.NoError(t, err) + require.Equal(t, partHdr, hdr) + + lb.AssertSingle(tc.logMsg) + }) + } + + wc := mockWriteCache{ + head: map[oid.Address]headValue{ + partAddr: {hdr: partHdr}, + }, + } + + s := newSimpleTestShard(t, unimplementedBLOBStore{}, &mb, &wc) + + hdr, err := s.HeadECPart(cnr, parentID, pi) + require.NoError(t, err) + require.Equal(t, partHdr, hdr) + }) + + for _, tc := range []struct { + typ object.Type + associate func(*object.Object, oid.ID) + }{ + {typ: object.TypeTombstone, associate: (*object.Object).AssociateDeleted}, + {typ: object.TypeLock, associate: (*object.Object).AssociateLocked}, + } { + t.Run(tc.typ.String(), func(t *testing.T) { + mb := meta.New( + meta.WithPath(filepath.Join(t.TempDir(), "meta")), + meta.WithEpochState(epochState{}), + meta.WithLogger(zaptest.NewLogger(t)), + ) + require.NoError(t, mb.Open(false)) + t.Cleanup(func() { _ = mb.Close() }) + require.NoError(t, mb.Init()) + + sysObj := *newObject(t) + sysObj.SetContainerID(cnr) + tc.associate(&sysObj, oidtest.ID()) + require.NoError(t, mb.Put(&sysObj)) + + bs := mockBLOBStore{ + head: map[oid.Address]headValue{ + objectcore.AddressOf(&sysObj): {hdr: sysObj}, + }, + } + + s := newSimpleTestShard(t, &bs, mb, nil) + + hdr, err := s.HeadECPart(cnr, sysObj.GetID(), pi) + require.NoError(t, err) + require.Equal(t, sysObj, hdr) + }) + } + + s := newSimpleTestShard(t, &bs, &mb, nil) + + hdr, err := s.HeadECPart(cnr, parentID, pi) + require.NoError(t, err) + require.Equal(t, partHdr, hdr) +} + func testGetECPartRangeStream(t *testing.T, obj object.Object, parent oid.ID, pi iec.PartInfo, s *Shard) { full := int64(obj.PayloadSize()) for _, rng := range [][2]int64{ diff --git a/pkg/local_object_storage/shard/shard_internal_test.go b/pkg/local_object_storage/shard/shard_internal_test.go index 988ee4bfc8..603970626c 100644 --- a/pkg/local_object_storage/shard/shard_internal_test.go +++ b/pkg/local_object_storage/shard/shard_internal_test.go @@ -80,9 +80,15 @@ type getStreamValue struct { err error } +type headValue struct { + hdr object.Object + err error +} + type mockBLOBStore struct { unimplementedBLOBStore getStream map[oid.Address]getStreamValue + head map[oid.Address]headValue } func (x *mockBLOBStore) GetStream(addr oid.Address) (*object.Object, io.ReadCloser, error) { @@ -96,9 +102,18 @@ func (x *mockBLOBStore) GetStream(addr oid.Address) (*object.Object, io.ReadClos return val.obj.CutPayload(), io.NopCloser(bytes.NewReader(val.obj.Payload())), val.err } +func (x *mockBLOBStore) Head(addr oid.Address) (*object.Object, error) { + val, ok := x.head[addr] + if !ok { + return nil, errors.New("[test] unexpected object requested") + } + return &val.hdr, val.err +} + type mockWriteCache struct { unimplementedWriteCache getStream map[oid.Address]getStreamValue + head map[oid.Address]headValue } func (x *mockWriteCache) GetStream(addr oid.Address) (*object.Object, io.ReadCloser, error) { @@ -112,6 +127,14 @@ func (x *mockWriteCache) GetStream(addr oid.Address) (*object.Object, io.ReadClo return val.obj.CutPayload(), io.NopCloser(bytes.NewReader(val.obj.Payload())), val.err } +func (x *mockWriteCache) Head(addr oid.Address) (*object.Object, error) { + val, ok := x.head[addr] + if !ok { + return nil, errors.New("[test] unexpected object requested") + } + return &val.hdr, val.err +} + type unimplementedBLOBStore struct{} func (unimplementedBLOBStore) Open(bool) error { diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go index 86f4ac4b5d..dff7e83a34 100644 --- a/pkg/metrics/engine.go +++ b/pkg/metrics/engine.go @@ -22,6 +22,7 @@ type ( listObjectsDuration prometheus.Histogram getECPartDuration prometheus.Histogram getECPartRangeDuration prometheus.Histogram + headECPartDuration prometheus.Histogram containerSize prometheus.GaugeVec payloadSize prometheus.GaugeVec @@ -130,6 +131,13 @@ func newEngineMetrics() engineMetrics { Name: "get_ec_part__range_time", Help: "Engine 'get EC part range' operations handling time", }) + + headECPartDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: storageNodeNameSpace, + Subsystem: engineSubsystem, + Name: "head_ec_part_time", + Help: "Engine 'head EC part' operations handling time", + }) ) var ( @@ -170,6 +178,7 @@ func newEngineMetrics() engineMetrics { listObjectsDuration: listObjectsDuration, getECPartDuration: getECPartDuration, getECPartRangeDuration: getECPartRangeDuration, + headECPartDuration: headECPartDuration, containerSize: *containerSize, payloadSize: *payloadSize, capacitySize: *capacitySize, @@ -191,6 +200,7 @@ func (m engineMetrics) register() { prometheus.MustRegister(m.listObjectsDuration) prometheus.MustRegister(m.getECPartDuration) prometheus.MustRegister(m.getECPartRangeDuration) + prometheus.MustRegister(m.headECPartDuration) prometheus.MustRegister(m.containerSize) prometheus.MustRegister(m.payloadSize) prometheus.MustRegister(m.capacitySize) @@ -252,6 +262,10 @@ func (m engineMetrics) AddGetECPartRangeDuration(d time.Duration) { m.getECPartRangeDuration.Observe(d.Seconds()) } +func (m engineMetrics) AddHeadECPartDuration(d time.Duration) { + m.headECPartDuration.Observe(d.Seconds()) +} + func (m engineMetrics) AddToContainerSize(cnrID string, size int64) { m.containerSize.With( prometheus.Labels{ diff --git a/pkg/services/object/get/ec.go b/pkg/services/object/get/ec.go index 12fabdcd95..bac8a25418 100644 --- a/pkg/services/object/get/ec.go +++ b/pkg/services/object/get/ec.go @@ -50,6 +50,20 @@ func (s *Service) copyLocalECPart(dst ObjectWriter, cnr cid.ID, parent oid.ID, p return nil } +// similar to copyLocalECPart but returns only the header. +func (s *Service) copyLocalECPartHeader(dst internal.HeaderWriter, cnr cid.ID, parent oid.ID, pi iec.PartInfo) error { + hdr, err := s.localObjects.HeadECPart(cnr, parent, pi) + if err != nil { + return fmt.Errorf("get object header from local storage: %w", err) + } + + if err := dst.WriteHeader(&hdr); err != nil { + return fmt.Errorf("write header: %w", err) + } + + return nil +} + func (s *Service) copyECObjectHeader(ctx context.Context, dst internal.HeaderWriter, cnr cid.ID, parent oid.ID, sTok *session.Object, ecRules []iec.Rule, sortedNodeLists [][]netmap.NodeInfo) error { hdr, err := s.getECObjectHeader(ctx, cnr, parent, sTok, ecRules, sortedNodeLists) diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 0bf40b7a51..6e2697fe16 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -125,11 +125,26 @@ func (s *Service) GetRangeHash(ctx context.Context, prm RangeHashPrm) (*RangeHas // Returns ErrNotFound if the header was not received for the call. // Returns SplitInfoError if object is virtual and raw flag is set. func (s *Service) Head(ctx context.Context, prm HeadPrm) error { + pi, err := checkECPartInfoRequest(prm.common.XHeaders()) + if err != nil { + // TODO: track https://github.com/nspcc-dev/neofs-api/issues/269. + return fmt.Errorf("invalid request: %w", err) + } + nodeLists, repRules, ecRules, err := s.neoFSNet.GetNodesForObject(prm.addr) if err != nil { return fmt.Errorf("get nodes for object: %w", err) } + if pi.RuleIndex >= 0 { + if err := checkPartRequestAgainstPolicy(ecRules, pi); err != nil { + // TODO: track https://github.com/nspcc-dev/neofs-api/issues/269. + return fmt.Errorf("invalid request: %w", err) + } + // TODO: deny if node is not in the container? + return s.copyLocalECPartHeader(prm.objWriter, prm.addr.Container(), prm.addr.Object(), pi) + } + if prm.common.LocalOnly() { return s.copyLocalObjectHeader(prm.objWriter, prm.addr.Container(), prm.addr.Object(), prm.raw) } diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index da1f08b767..a1178ed684 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -80,6 +80,8 @@ type cfg struct { // Returns [apistatus.ErrObjectNotFound] if the range is out of payload bounds. GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64) (uint64, io.ReadCloser, error) Head(oid.Address, bool) (*object.Object, error) + // HeadECPart is similar to GetECPart but returns only the header. + HeadECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo) (object.Object, error) } localStorage interface { get(*execCtx) (*object.Object, io.ReadCloser, error) diff --git a/pkg/services/object/get/service_test.go b/pkg/services/object/get/service_test.go index f067515db6..715022b687 100644 --- a/pkg/services/object/get/service_test.go +++ b/pkg/services/object/get/service_test.go @@ -190,3 +190,7 @@ func (unimplementedLocalStorage) GetECPart(cid.ID, oid.ID, iec.PartInfo) (object func (unimplementedLocalStorage) Head(oid.Address, bool) (*object.Object, error) { panic("unimplemented") } + +func (unimplementedLocalStorage) HeadECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, error) { + panic("unimplemented") +} diff --git a/pkg/services/object/head/remote.go b/pkg/services/object/head/remote.go index 4191b4b19b..f42a8b5381 100644 --- a/pkg/services/object/head/remote.go +++ b/pkg/services/object/head/remote.go @@ -3,11 +3,15 @@ package headsvc import ( "context" "fmt" + "io" + "math" clientcore "github.com/nspcc-dev/neofs-node/pkg/core/client" netmapCore "github.com/nspcc-dev/neofs-node/pkg/core/netmap" "github.com/nspcc-dev/neofs-node/pkg/services/object/util" "github.com/nspcc-dev/neofs-sdk-go/client" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -31,6 +35,10 @@ type RemoteHeadPrm struct { commonHeadPrm *Prm node netmap.NodeInfo + + xs []string + + checkOID bool } // NewRemoteHeader creates, initializes and returns new RemoteHeader instance. @@ -59,6 +67,16 @@ func (p *RemoteHeadPrm) WithObjectAddress(v oid.Address) *RemoteHeadPrm { return p } +// WithXHeaders sets X-headers. +func (p *RemoteHeadPrm) WithXHeaders(xs []string) { + p.xs = xs +} + +// WithIDVerification specifies whether to check object ID match. +func (p *RemoteHeadPrm) WithIDVerification(f bool) { + p.checkOID = f +} + // Head requests object header from the remote node. Returns: // - [apistatus.ErrObjectNotFound] error if the requested object is missing // - [apistatus.ErrNodeUnderMaintenance] error if remote node is currently under maintenance @@ -82,6 +100,10 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob var opts client.PrmObjectHead opts.MarkLocal() + opts.WithXHeaders(prm.xs...) + if !prm.checkOID { + opts.SkipChecksumVerification() + } res, err := c.ObjectHead(ctx, prm.commonHeadPrm.addr.Container(), prm.commonHeadPrm.addr.Object(), user.NewAutoIDSigner(*key), opts) if err != nil { @@ -90,3 +112,68 @@ func (h *RemoteHeader) Head(ctx context.Context, prm *RemoteHeadPrm) (*object.Ob return res, nil } + +// GetRange requests object payload range from the remote node. +func (h *RemoteHeader) GetRange(ctx context.Context, node netmap.NodeInfo, cnr cid.ID, id oid.ID, ln, off uint64, xs []string) (io.ReadCloser, error) { + key, err := h.keyStorage.GetKey(nil) + if err != nil { + return nil, fmt.Errorf("get local SN private key: %w", err) + } + + var info clientcore.NodeInfo + + err = clientcore.NodeInfoFromRawNetmapElement(&info, netmapCore.Node(node)) + if err != nil { + return nil, fmt.Errorf("parse client node info: %w", err) + } + + conn, err := h.clientCache.Get(info) + if err != nil { + return nil, fmt.Errorf("get conn: %w", err) + } + + // TODO: Use GetRange after https://github.com/nspcc-dev/neofs-node/issues/3547. + var opts client.PrmObjectGet + opts.MarkLocal() + opts.SkipChecksumVerification() // TODO: see same place in GET service + opts.WithXHeaders(xs...) + + hdr, rc, err := conn.ObjectGetInit(ctx, cnr, id, user.NewAutoIDSigner(*key), opts) + if err != nil { + return nil, fmt.Errorf("call Get API: %w", err) + } + + if ln == 0 && off == 0 { + return rc, nil + } + + full := hdr.PayloadSize() + if off == 0 && ln == full { + return rc, nil + } + + if off >= full || full-off < ln { + rc.Close() + return nil, apistatus.ErrObjectOutOfRange + } + + if off > math.MaxInt64 || ln > math.MaxInt64 { + rc.Close() + return nil, fmt.Errorf("too big range for this server: off=%d,len=%d", off, ln) + } + + if off > 0 { + if _, err := io.CopyN(io.Discard, rc, int64(off)); err != nil { + rc.Close() + return nil, fmt.Errorf("seek offset in Get API stream: %w", err) + } + } + + return struct { + io.Reader + io.Closer + }{ + Reader: io.LimitReader(rc, int64(ln)), + Closer: rc, + }, nil +} diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index 34412ec12b..eed8040508 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -120,7 +120,7 @@ func (p *Policer) processObject(ctx context.Context, addrWithAttrs objectcore.Ad if ecp.RuleIndex >= 0 { if len(ecRules) > 0 { - p.processECPart(ctx, addr, ecp, ecRules, nn[len(repRules):]) + p.processECPart(ctx, addr, selectNodesAddr.Object(), ecp, ecRules, nn[len(repRules):]) return } p.log.Info("object with EC attributes in container without EC rules detected, deleting", @@ -300,7 +300,7 @@ func (p *Policer) processNodes(ctx context.Context, plc *processPlacementContext callCtx, cancel := context.WithTimeout(ctx, headTimeout) - _, err := p.apiConns.headObject(callCtx, nodes[i], plc.object.Address) + _, err := p.apiConns.headObject(callCtx, nodes[i], plc.object.Address, true, nil) cancel() diff --git a/pkg/services/policer/ec.go b/pkg/services/policer/ec.go index 38411be3fd..e96bacb05d 100644 --- a/pkg/services/policer/ec.go +++ b/pkg/services/policer/ec.go @@ -3,17 +3,24 @@ package policer import ( "context" "errors" + "fmt" + "io" "slices" + "strconv" + "sync" "time" iec "github.com/nspcc-dev/neofs-node/internal/ec" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/netmap" + "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + "github.com/panjf2000/ants/v2" "go.uber.org/zap" ) -func (p *Policer) processECPart(ctx context.Context, addr oid.Address, pi iec.PartInfo, ecRules []iec.Rule, nodeLists [][]netmap.NodeInfo) { +func (p *Policer) processECPart(ctx context.Context, addr oid.Address, parent oid.ID, pi iec.PartInfo, ecRules []iec.Rule, nodeLists [][]netmap.NodeInfo) { if pi.RuleIndex >= len(ecRules) { p.log.Warn("local object with invalid EC rule index detected, deleting", zap.Stringer("object", addr), zap.Int("ruleIdx", pi.RuleIndex), zap.Int("totalRules", len(ecRules))) @@ -35,6 +42,8 @@ func (p *Policer) processECPart(ctx context.Context, addr oid.Address, pi iec.Pa return } + p.tryScheduleCheckECPartsTask(ctx, addr.Container(), parent, rule, addr.Object(), pi) + p.processECPartByRule(ctx, rule, addr, pi.Index, nodeLists[pi.RuleIndex]) } @@ -59,7 +68,7 @@ func (p *Policer) processECPartByRule(ctx context.Context, rule iec.Rule, addr o } callCtx, cancel := context.WithTimeout(ctx, headTimeout) - _, err := p.apiConns.headObject(callCtx, nodes[i], addr) + _, err := p.apiConns.headObject(callCtx, nodes[i], addr, true, nil) cancel() if err == nil { @@ -133,3 +142,410 @@ func (x *singleReplication) SubmitSuccessfulReplication(node netmap.NodeInfo) { x.done = true x.netAddresses = slices.Collect(node.NetworkEndpoints()) } + +func (p *Policer) tryScheduleCheckECPartsTask(ctx context.Context, cnr cid.ID, parent oid.ID, rule iec.Rule, localPartID oid.ID, localPartInfo iec.PartInfo) { + p.checkECPartsProgressMtx.Lock() + defer p.checkECPartsProgressMtx.Unlock() + + addr := oid.NewAddress(cnr, parent) + if _, ok := p.checkECPartsProgressMap[addr]; ok { + return + } + + err := p.checkECPartsWorkerPool.Submit(func() { + defer func() { + p.checkECPartsProgressMtx.Lock() + delete(p.checkECPartsProgressMap, addr) + p.checkECPartsProgressMtx.Unlock() + }() + + p.checkECParts(ctx, cnr, parent, rule, localPartInfo.RuleIndex, localPartInfo.Index, localPartID) + }) + if err != nil { + if errors.Is(err, ants.ErrPoolOverload) { + p.log.Info("pool of workers for EC parts checking is full, skip the task", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("local_part_id", localPartID)) + return + } + + p.log.Warn("unexpected error returned from pool of workers for EC part checking", zap.Error(err)) + return + } + + p.checkECPartsProgressMap[addr] = struct{}{} +} + +func (p *Policer) checkECParts(ctx context.Context, cnr cid.ID, parent oid.ID, rule iec.Rule, ruleIdx, localPartIdx int, localPartID oid.ID) { + sortedNodeLists, repRules, ecRules, err := p.network.GetNodesForObject(oid.NewAddress(cnr, parent)) + if err != nil { + p.log.Warn("failed to select nodes for EC parent to check its parts", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), + zap.Stringer("rule", rule), zap.Error(err)) + return + } + + if ruleIdx >= len(ecRules) { + p.log.Error("rule index overflows total number of EC rules in policy", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("rule", rule), + zap.Int("rule_idx", ruleIdx), zap.Int("total_rules", len(ecRules))) + return + } + + totalParts := int(rule.DataPartNum + rule.ParityPartNum) + if localPartIdx >= totalParts { + p.log.Error("part index overflows total number of parts in the EC rule", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("rule", rule), + zap.Int("rule_idx", ruleIdx), zap.Int("total_parts", totalParts)) + return + } + + var missingIdx, skipIdx []int + var parentHdr object.Object + var partLen uint64 + mPartID := make(map[int]oid.ID, totalParts) + ruleIdxAttr := strconv.Itoa(ruleIdx) + headTimeout := p.getHeadTimeout() + sortedNodes := sortedNodeLists[len(repRules)+ruleIdx] + +headNextPart: + for partIdx := range totalParts { + if partIdx == localPartIdx { + mPartID[partIdx] = localPartID + continue + } + + var partIdxAttr string + + for nodeIdx := range iec.NodeSequenceForPart(partIdx, totalParts, len(sortedNodes)) { + var hdr object.Object + local := p.network.IsLocalNodePublicKey(sortedNodes[nodeIdx].PublicKey()) + if local { + hdr, err = p.localStorage.HeadECPart(cnr, parent, iec.PartInfo{RuleIndex: ruleIdx, Index: partIdx}) + } else { + if partIdxAttr == "" { + partIdxAttr = strconv.Itoa(partIdx) + } + hdr, err = p.headECPart(ctx, headTimeout, sortedNodes[nodeIdx], cnr, parent, ruleIdxAttr, partIdxAttr) + } + if err == nil { + if parentHdr.GetID().IsZero() { + ph := hdr.Parent() + if ph == nil { + p.log.Error("missing parent header in received EC part object", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("rule", rule), + zap.Int("ruleIdx", ruleIdx), zap.Int("partIdx", partIdx), zap.Bool("local", local), + zap.String("node", netmap.StringifyPublicKey(sortedNodes[nodeIdx]))) + + return + } + + parentHdr = *ph + partLen = (parentHdr.PayloadSize() + uint64(rule.DataPartNum) - 1) / uint64(rule.DataPartNum) + } + + if got := hdr.PayloadSize(); got != partLen { + p.log.Error("unexpected payload len of EC part object received", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("rule", rule), + zap.Int("ruleIdx", ruleIdx), zap.Int("partIdx", partIdx), zap.Bool("local", local), + zap.String("node", netmap.StringifyPublicKey(sortedNodes[nodeIdx])), zap.Uint64("expected", partLen), + zap.Uint64("got", got)) + return + } + + mPartID[partIdx] = hdr.GetID() + continue headNextPart + } + + switch { + case errors.Is(err, apistatus.ErrObjectAlreadyRemoved): + return + case errors.Is(err, apistatus.ErrNodeUnderMaintenance): + // Server may store the part. We consider it unavailable, but we don't attempt to recreate it. + // Once SN finishes maintenance, the part will likely become available. + if len(missingIdx)+len(skipIdx) >= int(rule.ParityPartNum) { + p.log.Warn("too many EC parts unavailable, recreation is impossible", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("rule", rule), + zap.Int("unavailable", len(missingIdx)+len(skipIdx))) + return + } + + skipIdx = append(skipIdx, partIdx) + continue headNextPart + case errors.Is(err, apistatus.ErrObjectNotFound): + default: + p.log.Info("failed to get EC part header", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("rule", rule), + zap.Int("ruleIdx", ruleIdx), zap.Int("partIdx", partIdx), zap.Bool("local", local), zap.Error(err)) + } + } + + if len(missingIdx)+len(skipIdx) >= int(rule.ParityPartNum) { + p.log.Warn("too many EC parts unavailable, recreation is impossible", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("rule", rule), + zap.Int("unavailable", len(missingIdx)+len(skipIdx))) + return + } + + missingIdx = append(missingIdx, partIdx) + } + + if len(missingIdx) == 0 { + return + } + + parts := make([][]byte, totalParts) + required := make([]bool, totalParts) + +getNextPart: + for partIdx := range totalParts { + if slices.Contains(skipIdx, partIdx) { + continue + } + if slices.Contains(missingIdx, partIdx) { + required[partIdx] = true + continue + } + + partID, ok := mPartID[partIdx] + if !ok { + panic(fmt.Sprintf("missing ID of part#%d after successful HEAD", partIdx)) + } + + if partIdx == localPartIdx { + b, err := p.localStorage.GetRange(oid.NewAddress(cnr, partID), 0, 0) + if err == nil { + parts[partIdx] = b + continue + } + + p.log.Info("failed to RANGE EC part from local storage", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("rule", rule), + zap.Int("ruleIdx", ruleIdx), zap.Int("partIdx", partIdx), zap.Stringer("partID", partID), zap.Error(err)) + } + + var partIdxAttr string + var off, ln uint64 + for nodeIdx := range iec.NodeSequenceForPart(partIdx, totalParts, len(sortedNodes)) { + if p.network.IsLocalNodePublicKey(sortedNodes[nodeIdx].PublicKey()) { + if partIdx == localPartIdx { // done above + continue + } + + b, err := p.localStorage.GetRange(oid.NewAddress(cnr, partID), off, ln) + if err != nil { + p.log.Info("failed to RANGE EC part from local storage", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("rule", rule), + zap.Int("ruleIdx", ruleIdx), zap.Int("partIdx", partIdx), zap.Stringer("partID", partID), zap.Error(err)) + continue + } + + copy(parts[partIdx][off:], b) + continue getNextPart + } + + if partIdxAttr == "" { + partIdxAttr = strconv.Itoa(partIdx) + } + + // TODO: this is the 1st place where we known IDs of EC parts in advance. + // Consider supporting direct requests of EC parts by ID, they are more lightweight. + rc, err := p.apiConns.GetRange(ctx, sortedNodes[nodeIdx], cnr, parent, off, ln, []string{ + iec.AttributeRuleIdx, ruleIdxAttr, + iec.AttributePartIdx, partIdxAttr, + }) + if err != nil { + p.log.Info("failed to open RANGE stream for EC part from remote node", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("rule", rule), + zap.Int("ruleIdx", ruleIdx), zap.Int("partIdx", partIdx), zap.Stringer("partID", partID), zap.Error(err)) + continue + } + + defer rc.Close() + + if parts[partIdx] == nil { + parts[partIdx] = make([]byte, partLen) + } + + n, err := io.ReadFull(rc, parts[partIdx][off:]) + if err == nil { + continue getNextPart + } + if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) { + return + } + + if errors.Is(err, io.EOF) { + err = io.ErrUnexpectedEOF + } + + p.log.Info("failed to RANGE EC part from remote node", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("rule", rule), + zap.Int("ruleIdx", ruleIdx), zap.Int("partIdx", partIdx), + zap.Stringer("partID", partID), zap.Uint64("off", off), zap.Uint64("len", ln), zap.Error(err)) + + if n > 0 { + off += uint64(n) + ln = partLen - off + } + } + + if len(missingIdx)+len(skipIdx) >= int(rule.ParityPartNum) { + p.log.Warn("too many EC parts unavailable, recreation is impossible", + zap.Stringer("container", cnr), zap.Stringer("parent", parent), zap.Stringer("rule", rule), + zap.Int("unavailable", len(missingIdx)+len(skipIdx))) + return + } + + parts[partIdx] = nil + + // Exists and may be OK later, so do not recreate. + skipIdx = append(skipIdx, partIdx) + } + + p.recreateECParts(ctx, parentHdr, rule, ruleIdx, parts, missingIdx) +} + +// returns part ID and parent header. +func (p *Policer) headECPart(ctx context.Context, timeout time.Duration, node netmap.NodeInfo, cnr cid.ID, parent oid.ID, + ruleIdx, partIdx string) (object.Object, error) { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + hdr, err := p.apiConns.headObject(ctx, node, oid.NewAddress(cnr, parent), false, []string{ + iec.AttributeRuleIdx, ruleIdx, + iec.AttributePartIdx, partIdx, + }) + if err != nil { + return object.Object{}, err + } + + if got := hdr.GetParentID(); got != parent { + return object.Object{}, fmt.Errorf("wrong parent ID in received object %s", got) + } + + if err = checkECAttributesInReceivedObject(hdr, ruleIdx, partIdx); err != nil { + return object.Object{}, err + } + + return hdr, nil +} + +func (p *Policer) recreateECParts(ctx context.Context, parent object.Object, rule iec.Rule, ruleIdx int, parts [][]byte, missingIdx []int) { + sortedNodeLists, _, ecRules, err := p.network.GetNodesForObject(oid.NewAddress(parent.GetContainerID(), parent.GetID())) + if err != nil { + p.log.Error("failed to select nodes for EC parent to recreate its parts", + zap.Stringer("container", parent.GetContainerID()), zap.Stringer("parent", parent.GetID()), + zap.Stringer("rule", rule), zap.Error(err)) + return + } + + if ruleIdx >= len(ecRules) { + p.log.Error("rule index overflows total number of EC rules in policy", + zap.Stringer("container", parent.GetContainerID()), zap.Stringer("parent", parent.GetID()), + zap.Stringer("rule", rule), zap.Int("rule_idx", ruleIdx), zap.Int("total_rules", len(ecRules))) + return + } + + p.recreateECPartsIdx(ctx, parent, rule, ruleIdx, sortedNodeLists[ruleIdx], parts, missingIdx) +} + +func (p *Policer) recreateECPartsIdx(ctx context.Context, parent object.Object, rule iec.Rule, ruleIdx int, sortedNodes []netmap.NodeInfo, + parts [][]byte, missingIdx []int) { + if err := iec.DecodeIndexes(rule, parts, missingIdx); err != nil { // should never happen if we count parts correctly + p.log.Error("missing EC parts cannot be re-calculated", + zap.Stringer("container", parent.GetContainerID()), zap.Stringer("parent", parent.GetID()), + zap.Stringer("rule", rule), zap.Error(err)) + return + } + + var wg sync.WaitGroup + wg.Add(len(missingIdx)) + + for _, partIdx := range missingIdx { + go func(partIdx int) { + defer wg.Done() + + p.recreateECPart(ctx, parent, rule, ruleIdx, partIdx, parts[partIdx], sortedNodes) + }(partIdx) + } + + wg.Wait() +} + +func (p *Policer) recreateECPart(ctx context.Context, parent object.Object, rule iec.Rule, ruleIdx, partIdx int, part []byte, sortedNodes []netmap.NodeInfo) { + partObj, err := iec.FormObjectForECPart(p.signer, parent, part, iec.PartInfo{ + RuleIndex: ruleIdx, + Index: partIdx, + }) + if err != nil { + p.log.Info("failed to form object with restored EC part", + zap.Stringer("container", parent.GetContainerID()), zap.Stringer("parent", parent.GetID()), + zap.Stringer("rule", rule), zap.Int("part_idx", partIdx), zap.Error(err)) + return + } + + for i := range iec.NodeSequenceForPart(partIdx, int(rule.DataPartNum)+int(rule.ParityPartNum), len(sortedNodes)) { + if p.network.IsLocalNodePublicKey(sortedNodes[i].PublicKey()) { + err = p.localStorage.Put(&partObj, nil) + if err == nil { + p.log.Info("EC part successfully recreated and put to local storage", + zap.Stringer("container", parent.GetContainerID()), zap.Stringer("parent", parent.GetID()), + zap.Stringer("rule", rule), zap.Int("part_idx", partIdx), zap.Stringer("part_id", partObj.GetID())) + return + } + + p.log.Info("failed to put recreated EC part to local storage", + zap.Stringer("container", parent.GetContainerID()), zap.Stringer("parent", parent.GetID()), + zap.Stringer("rule", rule), zap.Int("part_idx", partIdx), zap.Stringer("part_id", partObj.GetID()), + zap.Error(err)) + continue + } + + err = p.replicator.PutObjectToNode(ctx, partObj, sortedNodes[i]) + if err == nil { + p.log.Info("EC part successfully recreated and put to remote node", + zap.Stringer("container", parent.GetContainerID()), zap.Stringer("parent", parent.GetID()), + zap.Stringer("rule", rule), zap.Int("part_idx", partIdx), zap.Stringer("part_id", partObj.GetID()), + zap.String("node_pub", netmap.StringifyPublicKey(sortedNodes[i]))) + return + } + + if errors.Is(err, ctx.Err()) { + return + } + + p.log.Info("failed to put recreated EC part to remote node", + zap.Stringer("container", parent.GetContainerID()), zap.Stringer("parent", parent.GetID()), + zap.Stringer("rule", rule), zap.Int("part_idx", partIdx), zap.Stringer("part_id", partObj.GetID()), + zap.String("node_pub", netmap.StringifyPublicKey(sortedNodes[i])), zap.Error(err)) + } +} + +func checkECAttributesInReceivedObject(hdr object.Object, ruleIdx, partIdx string) error { + // copy-paste from GET service + var found uint8 + const expected = 2 + + attrs := hdr.Attributes() + for i := range attrs { + switch attrs[i].Key() { + default: + continue + case iec.AttributeRuleIdx: + if attrs[i].Value() != ruleIdx { + return fmt.Errorf("wrong EC rule index attribute in received object for part: requested %q, got %q", ruleIdx, attrs[i].Value()) + } + case iec.AttributePartIdx: + if attrs[i].Value() != partIdx { + return fmt.Errorf("wrong EC part index attribute in received object for part: requested %q, got %q", partIdx, attrs[i].Value()) + } + } + + found++ + if found == expected { + return nil + } + } + + return fmt.Errorf("not all EC attributes received: requested %d, got %d", expected, found) +} diff --git a/pkg/services/policer/policer.go b/pkg/services/policer/policer.go index 804aac77ae..fd3879558d 100644 --- a/pkg/services/policer/policer.go +++ b/pkg/services/policer/policer.go @@ -2,6 +2,8 @@ package policer import ( "context" + "fmt" + "io" "sync" "time" @@ -10,6 +12,8 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/engine" headsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/head" "github.com/nspcc-dev/neofs-node/pkg/services/replicator" + cid "github.com/nspcc-dev/neofs-sdk-go/container/id" + neofscrypto "github.com/nspcc-dev/neofs-sdk-go/crypto" netmapsdk "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -26,17 +30,22 @@ type nodeLoader interface { // interface of [replicator.Replicator] used by [Policer] for overriding in tests. type replicatorIface interface { HandleTask(context.Context, replicator.Task, replicator.TaskResult) + PutObjectToNode(context.Context, object.Object, netmapsdk.NodeInfo) error } // interface of [engine.StorageEngine] used by [Policer] for overriding in tests. type localStorage interface { ListWithCursor(uint32, *engine.Cursor, ...string) ([]objectcore.AddressWithAttributes, *engine.Cursor, error) Delete(oid.Address) error + Put(*object.Object, []byte) error + HeadECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, error) + GetRange(oid.Address, uint64, uint64) ([]byte, error) } // interface of [headsvc.RemoteHeader] used by [Policer] for overriding in tests. type apiConnections interface { - headObject(context.Context, netmapsdk.NodeInfo, oid.Address) (object.Object, error) + headObject(context.Context, netmapsdk.NodeInfo, oid.Address, bool, []string) (object.Object, error) + GetRange(ctx context.Context, node netmapsdk.NodeInfo, cnr cid.ID, id oid.ID, ln, off uint64, xs []string) (io.ReadCloser, error) } type objectsInWork struct { @@ -70,6 +79,12 @@ type Policer struct { *cfg objsInWork *objectsInWork + + signer neofscrypto.Signer + + checkECPartsProgressMtx sync.Mutex + checkECPartsProgressMap map[oid.Address]struct{} + checkECPartsWorkerPool *ants.Pool } // Option is an option for Policer constructor. @@ -144,7 +159,12 @@ func defaultCfg() *cfg { } // New creates, initializes and returns Policer instance. -func New(opts ...Option) *Policer { +func New(signer neofscrypto.Signer, opts ...Option) *Policer { + checkECPartsWorkerPool, err := ants.NewPool(100, ants.WithNonblocking(true)) + if err != nil { + panic(fmt.Errorf("ants.NewPool: %w", err)) + } + c := defaultCfg() for i := range opts { @@ -158,6 +178,9 @@ func New(opts ...Option) *Policer { objsInWork: &objectsInWork{ objs: make(map[oid.Address]struct{}, c.maxCapacity), }, + signer: signer, + checkECPartsProgressMap: make(map[oid.Address]struct{}, checkECPartsWorkerPool.Cap()), + checkECPartsWorkerPool: checkECPartsWorkerPool, } } @@ -189,13 +212,17 @@ func WithLocalStorage(v *engine.StorageEngine) Option { } } -type remoteHeader headsvc.RemoteHeader +type remoteHeader struct { + *headsvc.RemoteHeader +} -func (x *remoteHeader) headObject(ctx context.Context, node netmapsdk.NodeInfo, addr oid.Address) (object.Object, error) { +func (x *remoteHeader) headObject(ctx context.Context, node netmapsdk.NodeInfo, addr oid.Address, checkOID bool, xs []string) (object.Object, error) { var p headsvc.RemoteHeadPrm p.WithNodeInfo(node) p.WithObjectAddress(addr) - hdr, err := (*headsvc.RemoteHeader)(x).Head(ctx, &p) + p.WithXHeaders(xs) + p.WithIDVerification(checkOID) + hdr, err := x.Head(ctx, &p) if err != nil { return object.Object{}, err } @@ -206,7 +233,7 @@ func (x *remoteHeader) headObject(ctx context.Context, node netmapsdk.NodeInfo, // WithRemoteHeader returns option to set object header receiver of Policer. func WithRemoteHeader(v *headsvc.RemoteHeader) Option { return func(c *cfg) { - c.apiConns = (*remoteHeader)(v) + c.apiConns = &remoteHeader{v} } } diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go index f192daa8af..2cc11ffcb9 100644 --- a/pkg/services/policer/policer_test.go +++ b/pkg/services/policer/policer_test.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "encoding/json" "errors" + "io" "maps" "slices" "strconv" @@ -22,6 +23,7 @@ import ( apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" + neofscryptotest "github.com/nspcc-dev/neofs-sdk-go/crypto/test" "github.com/nspcc-dev/neofs-sdk-go/netmap" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -506,7 +508,7 @@ func testRepCheck(t *testing.T, rep uint, localObj objectcore.AddressWithAttribu } l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) - p := New( + p := New(neofscryptotest.Signer(), WithPool(wp), WithReplicationCooldown(time.Hour), // any huge time to cancel process repeat WithNodeLoader(nopNodeLoader{}), @@ -941,7 +943,7 @@ func testECCheckWithNetworkAndShortage(t *testing.T, mockNet *mockNetwork, local } l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) - p := New( + p := New(neofscryptotest.Signer(), WithPool(wp), WithReplicationCooldown(time.Hour), // any huge time to cancel process repeat WithNodeLoader(nopNodeLoader{}), @@ -1013,6 +1015,10 @@ func (x *testReplicator) HandleTask(ctx context.Context, task replicator.Task, r close(x.gotTaskCh) } +func (x *testReplicator) PutObjectToNode(context.Context, object.Object, netmap.NodeInfo) error { + panic("unimplemented") +} + type testLocalNode struct { objList []objectcore.AddressWithAttributes @@ -1066,6 +1072,18 @@ func (x *testLocalNode) Delete(addr oid.Address) error { return nil } +func (x *testLocalNode) Put(*object.Object, []byte) error { + panic("unimplemented") +} + +func (x *testLocalNode) HeadECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, error) { + panic("unimplemented") +} + +func (x *testLocalNode) GetRange(oid.Address, uint64, uint64) ([]byte, error) { + panic("unimplemented") +} + type getNodesKey struct { cnr cid.ID obj oid.ID @@ -1133,7 +1151,7 @@ func (x *mockAPIConnections) setHeadResult(node netmap.NodeInfo, addr oid.Addres x.head[newConnKey(node, addr)] = err } -func (x *mockAPIConnections) headObject(_ context.Context, node netmap.NodeInfo, addr oid.Address) (object.Object, error) { +func (x *mockAPIConnections) headObject(_ context.Context, node netmap.NodeInfo, addr oid.Address, _ bool, _ []string) (object.Object, error) { v, ok := x.head[newConnKey(node, addr)] if !ok { return object.Object{}, errors.New("[test] unexpected conn/object accessed") @@ -1141,6 +1159,10 @@ func (x *mockAPIConnections) headObject(_ context.Context, node netmap.NodeInfo, return object.Object{}, v } +func (x *mockAPIConnections) GetRange(context.Context, netmap.NodeInfo, cid.ID, oid.ID, uint64, uint64, []string) (io.ReadCloser, error) { + panic("unimplemented") +} + func newMockAPIConnections() *mockAPIConnections { return &mockAPIConnections{ head: make(map[connObjectKey]error), diff --git a/pkg/services/replicator/process.go b/pkg/services/replicator/process.go index 22a86953de..9b875d1540 100644 --- a/pkg/services/replicator/process.go +++ b/pkg/services/replicator/process.go @@ -96,3 +96,16 @@ func (p *Replicator) HandleTask(ctx context.Context, task Task, res TaskResult) } } } + +// PutObjectToNode attempts to put given object to the specified node within +// configured timeout. +func (p *Replicator) PutObjectToNode(ctx context.Context, obj object.Object, node netmap.NodeInfo) error { + ctx, cancel := context.WithTimeout(ctx, p.putTimeout) + defer cancel() + + var pp putsvc.RemotePutPrm + pp.WithObject(&obj) + pp.WithNodeInfo(node) + + return p.remoteSender.PutObject(ctx, &pp) +}