Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 94 additions & 16 deletions apis/go/mlops/scheduler/storage.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions apis/mlops/scheduler/storage.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,9 @@ message ExperimentSnapshot {
// on restart, which would guard against lost events in communication.
bool deleted = 2;
}


message ModelSnapshot{
repeated ModelVersionStatus versions = 1;
bool deleted = 2;
}
86 changes: 86 additions & 0 deletions scheduler/pkg/store/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
Copyright (c) 2024 Seldon Technologies Ltd.

Use of this software is governed BY
(1) the license included in the LICENSE file or
(2) if the license included in the LICENSE file is the Business Source License 1.1,
the Change License after the Change Date as each is defined in accordance with the LICENSE file.
*/

package store

import (
"context"
"fmt"
"sync"

pb "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator"
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store/utils"
log "github.com/sirupsen/logrus"
)

type ModelServerStore interface {
GetModel(ctx context.Context, key string) (*pb.ModelSnapshot, error)
PutModel(ctx context.Context, key string, value *pb.ModelSnapshot) error
}

type ServerStore interface {
GetServer(ctx context.Context)
}

type manager struct {
mu sync.RWMutex
storage ModelServerStore
logger log.FieldLogger
eventHub *coordinator.EventHub
}

type ModelServerManager interface {
UpdateModel(ctx context.Context, req *pb.LoadModelRequest) error
}

func (m *manager) UpdateModel(ctx context.Context, req *pb.LoadModelRequest) error {
m.mu.Lock()
defer m.mu.Unlock()

modelName := req.GetModel().GetMeta().GetName()
validName := utils.CheckName(modelName)
if !validName {
return fmt.Errorf(
"Model %s does not have a valid name - it must be alphanumeric and not contains dots (.)",
modelName,
)
}

modelSnap, err := m.storage.GetModel(ctx, modelName)
if err != nil {
return fmt.Errorf("could not update model %s: %v", modelName, err)
}

if modelSnap == nil {
err = m.storage.PutModel(ctx, modelName, NewModelSnapshot(req.GetModel()))
if err != nil {
return fmt.Errorf("could not create a new model %s: %v", modelName, err)
}
return nil
}

if modelSnap.GetDeleted() {
if ModelInactive(modelSnap) {
return fmt.Errorf("model %s is in process of deletion - new model can not be created", modelName)
}

modelSnap = CreateNextModelVersion(modelSnap, req.GetModel())

}

// todo: do Model EqualityCheck

err = m.storage.PutModel(ctx, modelName, NewModelSnapshot(req.GetModel()))
if err != nil {
return fmt.Errorf("could not create a new model %s: %v", modelName, err)
}
return nil

}
74 changes: 73 additions & 1 deletion scheduler/pkg/store/mesh.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type ModelStatus struct {
ModelGwReason string
AvailableReplicas uint32
UnavailableReplicas uint32
DrainingReplicas uint32
DrainingReplicas uint32 //this field is never used and was never added to proto
Timestamp time.Time
}

Expand All @@ -90,6 +90,66 @@ func NewDefaultModelVersion(model *pb.Model, version uint32) *ModelVersion {
}
}

func NewModelSnapshot(model *pb.Model) *pb.ModelSnapshot {

generation := model.GetMeta().GetKubernetesMeta().GetGeneration()
version := max(uint32(1), uint32(generation))

var versions []*pb.ModelVersionStatus
versions = append(versions, &pb.ModelVersionStatus{
Version: version,
ServerName: "",
KubernetesMeta: nil,
ModelReplicaState: make(map[int32]*pb.ModelReplicaStatus),
State: &pb.ModelStatus{
State: pb.ModelStatus_ModelStateUnknown,
Reason: "",
AvailableReplicas: 0,
UnavailableReplicas: 0,
LastChangeTimestamp: nil,
ModelGwState: pb.ModelStatus_ModelCreate,
ModelGwReason: "",
},
ModelDefn: model,
})

return &pb.ModelSnapshot{
Versions: versions,
Deleted: false,
}
}

func CreateNextModelVersion(snapshot *pb.ModelSnapshot, model *pb.Model) *pb.ModelSnapshot {
if snapshot == nil {
return NewModelSnapshot(model)
}

if len(snapshot.Versions) < 1 {
return NewModelSnapshot(model)
}

version := snapshot.Versions[len(snapshot.Versions)-1].Version

snapshot.Versions = append(snapshot.Versions, &pb.ModelVersionStatus{
Version: version,
ServerName: "",
KubernetesMeta: nil,
ModelReplicaState: make(map[int32]*pb.ModelReplicaStatus),
State: &pb.ModelStatus{
State: pb.ModelStatus_ModelStateUnknown,
Reason: "",
AvailableReplicas: 0,
UnavailableReplicas: 0,
LastChangeTimestamp: nil,
ModelGwState: pb.ModelStatus_ModelCreate,
ModelGwReason: "",
},
ModelDefn: model,
})

return snapshot
}

// TODO: remove deleted from here and reflect in callers
// This is only used in tests, thus we don't need to worry about modelGWState
func NewModelVersion(model *pb.Model, version uint32, server string, replicas map[int]ReplicaStatus, deleted bool, state ModelState) *ModelVersion {
Expand Down Expand Up @@ -396,6 +456,18 @@ func (m *Model) Inactive() bool {
return m.Latest().Inactive()
}

func ModelInactive(snap *pb.ModelSnapshot) bool {

}

func ModelLatest(snap *pb.ModelSnapshot) *pb.ModelVersionStatus {
if len(snap.Versions) > 0 {
return snap.Versions[len(snap.Versions)-1]
} else {
return nil
}
}

func (m *Model) IsDeleted() bool {
return m.deleted.Load()
}
Expand Down
Loading