Skip to content
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (t *collectClusterCapacityTask) updateCellCapacity(
return err
}

createdAt := time.Now()

var capacities []storage.ClusterCapacity
for _, info := range capacityInfos {
capacities = append(capacities, storage.ClusterCapacity{
Expand All @@ -151,6 +153,7 @@ func (t *collectClusterCapacityTask) updateCellCapacity(
Kind: info.DiskKind,
FreeBytes: info.FreeBytes,
TotalBytes: info.TotalBytes,
CreatedAt: createdAt,
})
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,29 @@ import (

////////////////////////////////////////////////////////////////////////////////

func matchClusterCapacities(t *testing.T, want []cells_storage.ClusterCapacity) interface{} {
return mock.MatchedBy(func(actual []cells_storage.ClusterCapacity) bool {
if len(want) != len(actual) {
return false
}

for i := range want {
expected := want[i]
got := actual[i]

if expected.FreeBytes != got.FreeBytes ||
expected.TotalBytes != got.TotalBytes ||
expected.CellID != got.CellID || expected.Kind != got.Kind ||
expected.ZoneID != got.ZoneID {
return false
}
}
return true
})
}

////////////////////////////////////////////////////////////////////////////////

func TestCollectClusterCapacityTask(t *testing.T) {
ctx := newContext()
execCtx := tasks_mocks.NewExecutionContextMock()
Expand Down Expand Up @@ -56,41 +79,50 @@ func TestCollectClusterCapacityTask(t *testing.T) {

storage.On("UpdateClusterCapacities",
mock.Anything, // ctx.
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-a",
CellID: "zone-a-cell1",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
matchClusterCapacities(
t,
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-a",
CellID: "zone-a-cell1",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
},
},
},
),
deleteOlderThanExpectation,
).Return(nil).Once()
storage.On("UpdateClusterCapacities",
mock.Anything, // ctx.
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-a",
CellID: "zone-a",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
matchClusterCapacities(
t,
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-a",
CellID: "zone-a",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
},
},
},
),
deleteOlderThanExpectation,
).Return(nil).Once()
storage.On("UpdateClusterCapacities",
mock.Anything, // ctx.
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-b",
CellID: "zone-b",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
matchClusterCapacities(
t,
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-b",
CellID: "zone-b",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
},
},
},
),
deleteOlderThanExpectation,
).Return(nil).Once()

Expand Down Expand Up @@ -163,15 +195,18 @@ func TestCollectClusterCapacityFailureNbsReturnsError(t *testing.T) {
// Only the successful cell should be updated
storage.On("UpdateClusterCapacities",
mock.Anything, // ctx.
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-a",
CellID: "zone-a",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
matchClusterCapacities(
t,
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-a",
CellID: "zone-a",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
},
},
},
),
mock.Anything, // deleteOlderThan.
).Return(nil).Once()

Expand Down Expand Up @@ -232,28 +267,34 @@ func TestCollectClusterCapacityFailureStorageReturnsError(t *testing.T) {

storage.On("UpdateClusterCapacities",
mock.Anything, // ctx.
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-a",
CellID: "zone-a",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
matchClusterCapacities(
t,
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-a",
CellID: "zone-a",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
},
},
},
),
mock.Anything, // deleteOlderThan.
).Return(nil).Once()
storage.On("UpdateClusterCapacities",
mock.Anything,
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-a",
CellID: "zone-a-cell1",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
matchClusterCapacities(
t,
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-a",
CellID: "zone-a-cell1",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
},
},
},
),
mock.Anything, // deleteOlderThan.
).Return(assert.AnError).Once()

Expand Down Expand Up @@ -313,15 +354,18 @@ func TestCollectClusterCapacityOneCellHasAlreadyBeenProcessed(t *testing.T) {

storage.On("UpdateClusterCapacities",
mock.Anything, // ctx.
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-a",
CellID: "zone-a",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
matchClusterCapacities(
t,
[]cells_storage.ClusterCapacity{
{
ZoneID: "zone-a",
CellID: "zone-a",
Kind: types.DiskKind_DISK_KIND_SSD,
FreeBytes: 1024,
TotalBytes: 2048,
},
},
},
),
mock.Anything, // deleteOlderThan.
).Return(nil).Once()

Expand Down
58 changes: 19 additions & 39 deletions cloud/disk_manager/internal/pkg/cells/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,42 +19,17 @@ type ClusterCapacity struct {
Kind types.DiskKind
TotalBytes uint64
FreeBytes uint64
}

////////////////////////////////////////////////////////////////////////////////

type clusterCapacityState struct {
ZoneID string
CellID string
Kind string
TotalBytes uint64
FreeBytes uint64
CreatedAt time.Time
}

////////////////////////////////////////////////////////////////////////////////

func (c *clusterCapacityState) toClusterCapacity() (ClusterCapacity, error) {
kind, err := common.DiskKindFromString(c.Kind)
if err != nil {
return ClusterCapacity{}, err
}

return ClusterCapacity{
ZoneID: c.ZoneID,
CellID: c.CellID,
Kind: kind,
TotalBytes: c.TotalBytes,
FreeBytes: c.FreeBytes,
}, nil
}

// Returns ydb entity of the node object.
func (c *clusterCapacityState) structValue() persistence.Value {
func (c *ClusterCapacity) structValue() persistence.Value {
return persistence.StructValue(
persistence.StructFieldValue("zone_id", persistence.UTF8Value(c.ZoneID)),
persistence.StructFieldValue("cell_id", persistence.UTF8Value(c.CellID)),
persistence.StructFieldValue("kind", persistence.UTF8Value(c.Kind)),
persistence.StructFieldValue("kind", persistence.UTF8Value(common.DiskKindToString(c.Kind))),
persistence.StructFieldValue("total_bytes", persistence.Uint64Value(c.TotalBytes)),
persistence.StructFieldValue("free_bytes", persistence.Uint64Value(c.FreeBytes)),
persistence.StructFieldValue("created_at", persistence.TimestampValue(c.CreatedAt)),
Expand All @@ -63,7 +38,7 @@ func (c *clusterCapacityState) structValue() persistence.Value {

////////////////////////////////////////////////////////////////////////////////

func clusterCapacityStateStructTypeString() string {
func clusterCapacityStructTypeString() string {
return `Struct<
zone_id: Utf8,
cell_id: Utf8,
Expand All @@ -73,7 +48,7 @@ func clusterCapacityStateStructTypeString() string {
created_at: Timestamp>`
}

func clusterCapacityStateTableDescription() persistence.CreateTableDescription {
func clusterCapacityTableDescription() persistence.CreateTableDescription {
return persistence.NewCreateTableDescription(
persistence.WithColumn("zone_id", persistence.Optional(persistence.TypeUTF8)),
persistence.WithColumn("cell_id", persistence.Optional(persistence.TypeUTF8)),
Expand All @@ -89,17 +64,27 @@ func clusterCapacityStateTableDescription() persistence.CreateTableDescription {

func scanClusterCapacity(
res persistence.Result,
) (capacity clusterCapacityState, err error) {
) (capacity ClusterCapacity, err error) {

var kind string
err = res.ScanNamed(
persistence.OptionalWithDefault("zone_id", &capacity.ZoneID),
persistence.OptionalWithDefault("cell_id", &capacity.CellID),
persistence.OptionalWithDefault("kind", &capacity.Kind),
persistence.OptionalWithDefault("kind", &kind),
persistence.OptionalWithDefault("total_bytes", &capacity.TotalBytes),
persistence.OptionalWithDefault("free_bytes", &capacity.FreeBytes),
persistence.OptionalWithDefault("created_at", &capacity.CreatedAt),
)
if err != nil {
return ClusterCapacity{}, err
}

capacity.Kind, err = common.DiskKindFromString(kind)
if err != nil {
return ClusterCapacity{}, err
}

return capacity, err
return capacity, nil
}

func scanClusterCapacities(
Expand All @@ -111,12 +96,7 @@ func scanClusterCapacities(

for res.NextResultSet(ctx) {
for res.NextRow() {
capacityState, err := scanClusterCapacity(res)
if err != nil {
return nil, err
}

capacity, err := capacityState.toClusterCapacity()
capacity, err := scanClusterCapacity(res)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -163,7 +143,7 @@ func CreateYDBTables(
ctx,
config.GetStorageFolder(),
"cluster_capacity",
clusterCapacityStateTableDescription(),
clusterCapacityTableDescription(),
dropUnusedColumns,
)
if err != nil {
Expand Down
Loading
Loading