diff --git a/apis/go/mlops/scheduler/storage.pb.go b/apis/go/mlops/scheduler/storage.pb.go index a36ddb1861..d73872a8fb 100644 --- a/apis/go/mlops/scheduler/storage.pb.go +++ b/apis/go/mlops/scheduler/storage.pb.go @@ -158,6 +158,61 @@ func (x *ExperimentSnapshot) GetDeleted() bool { return false } +type ModelSnapshot struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Versions []*ModelVersionStatus `protobuf:"bytes,1,rep,name=versions,proto3" json:"versions,omitempty"` + Deleted bool `protobuf:"varint,2,opt,name=deleted,proto3" json:"deleted,omitempty"` +} + +func (x *ModelSnapshot) Reset() { + *x = ModelSnapshot{} + if protoimpl.UnsafeEnabled { + mi := &file_mlops_scheduler_storage_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ModelSnapshot) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ModelSnapshot) ProtoMessage() {} + +func (x *ModelSnapshot) ProtoReflect() protoreflect.Message { + mi := &file_mlops_scheduler_storage_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ModelSnapshot.ProtoReflect.Descriptor instead. +func (*ModelSnapshot) Descriptor() ([]byte, []int) { + return file_mlops_scheduler_storage_proto_rawDescGZIP(), []int{2} +} + +func (x *ModelSnapshot) GetVersions() []*ModelVersionStatus { + if x != nil { + return x.Versions + } + return nil +} + +func (x *ModelSnapshot) GetDeleted() bool { + if x != nil { + return x.Deleted + } + return false +} + var File_mlops_scheduler_storage_proto protoreflect.FileDescriptor var file_mlops_scheduler_storage_proto_rawDesc = []byte{ @@ -184,11 +239,19 @@ var file_mlops_scheduler_storage_proto_rawDesc = []byte{ 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x2e, 0x45, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x52, 0x0a, 0x65, 0x78, 0x70, 0x65, 0x72, 0x69, 0x6d, 0x65, 0x6e, 0x74, 0x12, 0x18, 0x0a, 0x07, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, - 0x07, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x42, 0x3c, 0x5a, 0x3a, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x6c, 0x64, 0x6f, 0x6e, 0x69, 0x6f, 0x2f, - 0x73, 0x65, 0x6c, 0x64, 0x6f, 0x6e, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, 0x70, 0x69, 0x73, - 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x32, 0x2f, 0x6d, 0x6c, 0x6f, 0x70, 0x73, 0x2f, 0x73, 0x63, 0x68, - 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x07, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x22, 0x71, 0x0a, 0x0d, 0x4d, 0x6f, 0x64, 0x65, + 0x6c, 0x53, 0x6e, 0x61, 0x70, 0x73, 0x68, 0x6f, 0x74, 0x12, 0x46, 0x0a, 0x08, 0x76, 0x65, 0x72, + 0x73, 0x69, 0x6f, 0x6e, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x2a, 0x2e, 0x73, 0x65, + 0x6c, 0x64, 0x6f, 0x6e, 0x2e, 0x6d, 0x6c, 0x6f, 0x70, 0x73, 0x2e, 0x73, 0x63, 0x68, 0x65, 0x64, + 0x75, 0x6c, 0x65, 0x72, 0x2e, 0x4d, 0x6f, 0x64, 0x65, 0x6c, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, + 0x6e, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x08, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, + 0x73, 0x12, 0x18, 0x0a, 0x07, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x08, 0x52, 0x07, 0x64, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x64, 0x42, 0x3c, 0x5a, 0x3a, 0x67, + 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x6c, 0x64, 0x6f, 0x6e, + 0x69, 0x6f, 0x2f, 0x73, 0x65, 0x6c, 0x64, 0x6f, 0x6e, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, + 0x70, 0x69, 0x73, 0x2f, 0x67, 0x6f, 0x2f, 0x76, 0x32, 0x2f, 0x6d, 0x6c, 0x6f, 0x70, 0x73, 0x2f, + 0x73, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, } var ( @@ -203,21 +266,24 @@ func file_mlops_scheduler_storage_proto_rawDescGZIP() []byte { return file_mlops_scheduler_storage_proto_rawDescData } -var file_mlops_scheduler_storage_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_mlops_scheduler_storage_proto_msgTypes = make([]protoimpl.MessageInfo, 3) var file_mlops_scheduler_storage_proto_goTypes = []any{ (*PipelineSnapshot)(nil), // 0: seldon.mlops.scheduler.PipelineSnapshot (*ExperimentSnapshot)(nil), // 1: seldon.mlops.scheduler.ExperimentSnapshot - (*PipelineWithState)(nil), // 2: seldon.mlops.scheduler.PipelineWithState - (*Experiment)(nil), // 3: seldon.mlops.scheduler.Experiment + (*ModelSnapshot)(nil), // 2: seldon.mlops.scheduler.ModelSnapshot + (*PipelineWithState)(nil), // 3: seldon.mlops.scheduler.PipelineWithState + (*Experiment)(nil), // 4: seldon.mlops.scheduler.Experiment + (*ModelVersionStatus)(nil), // 5: seldon.mlops.scheduler.ModelVersionStatus } var file_mlops_scheduler_storage_proto_depIdxs = []int32{ - 2, // 0: seldon.mlops.scheduler.PipelineSnapshot.versions:type_name -> seldon.mlops.scheduler.PipelineWithState - 3, // 1: seldon.mlops.scheduler.ExperimentSnapshot.experiment:type_name -> seldon.mlops.scheduler.Experiment - 2, // [2:2] is the sub-list for method output_type - 2, // [2:2] is the sub-list for method input_type - 2, // [2:2] is the sub-list for extension type_name - 2, // [2:2] is the sub-list for extension extendee - 0, // [0:2] is the sub-list for field type_name + 3, // 0: seldon.mlops.scheduler.PipelineSnapshot.versions:type_name -> seldon.mlops.scheduler.PipelineWithState + 4, // 1: seldon.mlops.scheduler.ExperimentSnapshot.experiment:type_name -> seldon.mlops.scheduler.Experiment + 5, // 2: seldon.mlops.scheduler.ModelSnapshot.versions:type_name -> seldon.mlops.scheduler.ModelVersionStatus + 3, // [3:3] is the sub-list for method output_type + 3, // [3:3] is the sub-list for method input_type + 3, // [3:3] is the sub-list for extension type_name + 3, // [3:3] is the sub-list for extension extendee + 0, // [0:3] is the sub-list for field type_name } func init() { file_mlops_scheduler_storage_proto_init() } @@ -251,6 +317,18 @@ func file_mlops_scheduler_storage_proto_init() { return nil } } + file_mlops_scheduler_storage_proto_msgTypes[2].Exporter = func(v any, i int) any { + switch v := v.(*ModelSnapshot); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -258,7 +336,7 @@ func file_mlops_scheduler_storage_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_mlops_scheduler_storage_proto_rawDesc, NumEnums: 0, - NumMessages: 2, + NumMessages: 3, NumExtensions: 0, NumServices: 0, }, diff --git a/apis/mlops/scheduler/storage.proto b/apis/mlops/scheduler/storage.proto index f262a4c055..2353290813 100644 --- a/apis/mlops/scheduler/storage.proto +++ b/apis/mlops/scheduler/storage.proto @@ -20,3 +20,9 @@ message ExperimentSnapshot { // on restart, which would guard against lost events in communication. bool deleted = 2; } + + +message ModelSnapshot{ + repeated ModelVersionStatus versions = 1; + bool deleted = 2; +} \ No newline at end of file diff --git a/scheduler/pkg/store/manager.go b/scheduler/pkg/store/manager.go new file mode 100644 index 0000000000..895bd1514c --- /dev/null +++ b/scheduler/pkg/store/manager.go @@ -0,0 +1,86 @@ +/* +Copyright (c) 2024 Seldon Technologies Ltd. + +Use of this software is governed BY +(1) the license included in the LICENSE file or +(2) if the license included in the LICENSE file is the Business Source License 1.1, +the Change License after the Change Date as each is defined in accordance with the LICENSE file. +*/ + +package store + +import ( + "context" + "fmt" + "sync" + + pb "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler" + "github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator" + "github.com/seldonio/seldon-core/scheduler/v2/pkg/store/utils" + log "github.com/sirupsen/logrus" +) + +type ModelServerStore interface { + GetModel(ctx context.Context, key string) (*pb.ModelSnapshot, error) + PutModel(ctx context.Context, key string, value *pb.ModelSnapshot) error +} + +type ServerStore interface { + GetServer(ctx context.Context) +} + +type manager struct { + mu sync.RWMutex + storage ModelServerStore + logger log.FieldLogger + eventHub *coordinator.EventHub +} + +type ModelServerManager interface { + UpdateModel(ctx context.Context, req *pb.LoadModelRequest) error +} + +func (m *manager) UpdateModel(ctx context.Context, req *pb.LoadModelRequest) error { + m.mu.Lock() + defer m.mu.Unlock() + + modelName := req.GetModel().GetMeta().GetName() + validName := utils.CheckName(modelName) + if !validName { + return fmt.Errorf( + "Model %s does not have a valid name - it must be alphanumeric and not contains dots (.)", + modelName, + ) + } + + modelSnap, err := m.storage.GetModel(ctx, modelName) + if err != nil { + return fmt.Errorf("could not update model %s: %v", modelName, err) + } + + if modelSnap == nil { + err = m.storage.PutModel(ctx, modelName, NewModelSnapshot(req.GetModel())) + if err != nil { + return fmt.Errorf("could not create a new model %s: %v", modelName, err) + } + return nil + } + + if modelSnap.GetDeleted() { + if ModelInactive(modelSnap) { + return fmt.Errorf("model %s is in process of deletion - new model can not be created", modelName) + } + + modelSnap = CreateNextModelVersion(modelSnap, req.GetModel()) + + } + + // todo: do Model EqualityCheck + + err = m.storage.PutModel(ctx, modelName, NewModelSnapshot(req.GetModel())) + if err != nil { + return fmt.Errorf("could not create a new model %s: %v", modelName, err) + } + return nil + +} diff --git a/scheduler/pkg/store/mesh.go b/scheduler/pkg/store/mesh.go index 31ca15fbb4..5cdadd9bf9 100644 --- a/scheduler/pkg/store/mesh.go +++ b/scheduler/pkg/store/mesh.go @@ -67,7 +67,7 @@ type ModelStatus struct { ModelGwReason string AvailableReplicas uint32 UnavailableReplicas uint32 - DrainingReplicas uint32 + DrainingReplicas uint32 //this field is never used and was never added to proto Timestamp time.Time } @@ -90,6 +90,66 @@ func NewDefaultModelVersion(model *pb.Model, version uint32) *ModelVersion { } } +func NewModelSnapshot(model *pb.Model) *pb.ModelSnapshot { + + generation := model.GetMeta().GetKubernetesMeta().GetGeneration() + version := max(uint32(1), uint32(generation)) + + var versions []*pb.ModelVersionStatus + versions = append(versions, &pb.ModelVersionStatus{ + Version: version, + ServerName: "", + KubernetesMeta: nil, + ModelReplicaState: make(map[int32]*pb.ModelReplicaStatus), + State: &pb.ModelStatus{ + State: pb.ModelStatus_ModelStateUnknown, + Reason: "", + AvailableReplicas: 0, + UnavailableReplicas: 0, + LastChangeTimestamp: nil, + ModelGwState: pb.ModelStatus_ModelCreate, + ModelGwReason: "", + }, + ModelDefn: model, + }) + + return &pb.ModelSnapshot{ + Versions: versions, + Deleted: false, + } +} + +func CreateNextModelVersion(snapshot *pb.ModelSnapshot, model *pb.Model) *pb.ModelSnapshot { + if snapshot == nil { + return NewModelSnapshot(model) + } + + if len(snapshot.Versions) < 1 { + return NewModelSnapshot(model) + } + + version := snapshot.Versions[len(snapshot.Versions)-1].Version + + snapshot.Versions = append(snapshot.Versions, &pb.ModelVersionStatus{ + Version: version, + ServerName: "", + KubernetesMeta: nil, + ModelReplicaState: make(map[int32]*pb.ModelReplicaStatus), + State: &pb.ModelStatus{ + State: pb.ModelStatus_ModelStateUnknown, + Reason: "", + AvailableReplicas: 0, + UnavailableReplicas: 0, + LastChangeTimestamp: nil, + ModelGwState: pb.ModelStatus_ModelCreate, + ModelGwReason: "", + }, + ModelDefn: model, + }) + + return snapshot +} + // TODO: remove deleted from here and reflect in callers // This is only used in tests, thus we don't need to worry about modelGWState func NewModelVersion(model *pb.Model, version uint32, server string, replicas map[int]ReplicaStatus, deleted bool, state ModelState) *ModelVersion { @@ -396,6 +456,18 @@ func (m *Model) Inactive() bool { return m.Latest().Inactive() } +func ModelInactive(snap *pb.ModelSnapshot) bool { + +} + +func ModelLatest(snap *pb.ModelSnapshot) *pb.ModelVersionStatus { + if len(snap.Versions) > 0 { + return snap.Versions[len(snap.Versions)-1] + } else { + return nil + } +} + func (m *Model) IsDeleted() bool { return m.deleted.Load() }