Skip to content

Commit 8f5422b

Browse files
committed
experiment with proto storage
1 parent 9c4cd51 commit 8f5422b

File tree

4 files changed

+259
-17
lines changed

4 files changed

+259
-17
lines changed

apis/go/mlops/scheduler/storage.pb.go

Lines changed: 94 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apis/mlops/scheduler/storage.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,9 @@ message ExperimentSnapshot {
2020
// on restart, which would guard against lost events in communication.
2121
bool deleted = 2;
2222
}
23+
24+
25+
message ModelSnapshot{
26+
repeated ModelVersionStatus versions = 1;
27+
bool deleted = 2;
28+
}

scheduler/pkg/store/manager.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/*
2+
Copyright (c) 2024 Seldon Technologies Ltd.
3+
4+
Use of this software is governed BY
5+
(1) the license included in the LICENSE file or
6+
(2) if the license included in the LICENSE file is the Business Source License 1.1,
7+
the Change License after the Change Date as each is defined in accordance with the LICENSE file.
8+
*/
9+
10+
package store
11+
12+
import (
13+
"context"
14+
"fmt"
15+
"sync"
16+
17+
pb "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler"
18+
"github.com/seldonio/seldon-core/scheduler/v2/pkg/coordinator"
19+
"github.com/seldonio/seldon-core/scheduler/v2/pkg/store/utils"
20+
log "github.com/sirupsen/logrus"
21+
)
22+
23+
type ModelServerStore interface {
24+
GetModel(ctx context.Context, key string) (*pb.ModelSnapshot, error)
25+
PutModel(ctx context.Context, key string, value *pb.ModelSnapshot) error
26+
}
27+
28+
type ServerStore interface {
29+
GetServer(ctx context.Context)
30+
}
31+
32+
type manager struct {
33+
mu sync.RWMutex
34+
storage ModelServerStore
35+
logger log.FieldLogger
36+
eventHub *coordinator.EventHub
37+
}
38+
39+
type ModelServerManager interface {
40+
UpdateModel(ctx context.Context, req *pb.LoadModelRequest) error
41+
}
42+
43+
func (m *manager) UpdateModel(ctx context.Context, req *pb.LoadModelRequest) error {
44+
m.mu.Lock()
45+
defer m.mu.Unlock()
46+
47+
modelName := req.GetModel().GetMeta().GetName()
48+
validName := utils.CheckName(modelName)
49+
if !validName {
50+
return fmt.Errorf(
51+
"Model %s does not have a valid name - it must be alphanumeric and not contains dots (.)",
52+
modelName,
53+
)
54+
}
55+
56+
modelSnap, err := m.storage.GetModel(ctx, modelName)
57+
if err != nil {
58+
return fmt.Errorf("could not update model %s: %v", modelName, err)
59+
}
60+
61+
if modelSnap == nil {
62+
err = m.storage.PutModel(ctx, modelName, NewModelSnapshot(req.GetModel()))
63+
if err != nil {
64+
return fmt.Errorf("could not create a new model %s: %v", modelName, err)
65+
}
66+
return nil
67+
}
68+
69+
if modelSnap.GetDeleted() {
70+
if ModelInactive(modelSnap) {
71+
return fmt.Errorf("model %s is in process of deletion - new model can not be created", modelName)
72+
}
73+
74+
modelSnap = CreateNextModelVersion(modelSnap, req.GetModel())
75+
76+
}
77+
78+
// todo: do Model EqualityCheck
79+
80+
err = m.storage.PutModel(ctx, modelName, NewModelSnapshot(req.GetModel()))
81+
if err != nil {
82+
return fmt.Errorf("could not create a new model %s: %v", modelName, err)
83+
}
84+
return nil
85+
86+
}

scheduler/pkg/store/mesh.go

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ type ModelStatus struct {
6767
ModelGwReason string
6868
AvailableReplicas uint32
6969
UnavailableReplicas uint32
70-
DrainingReplicas uint32
70+
DrainingReplicas uint32 //this field is never used and was never added to proto
7171
Timestamp time.Time
7272
}
7373

@@ -90,6 +90,66 @@ func NewDefaultModelVersion(model *pb.Model, version uint32) *ModelVersion {
9090
}
9191
}
9292

93+
func NewModelSnapshot(model *pb.Model) *pb.ModelSnapshot {
94+
95+
generation := model.GetMeta().GetKubernetesMeta().GetGeneration()
96+
version := max(uint32(1), uint32(generation))
97+
98+
var versions []*pb.ModelVersionStatus
99+
versions = append(versions, &pb.ModelVersionStatus{
100+
Version: version,
101+
ServerName: "",
102+
KubernetesMeta: nil,
103+
ModelReplicaState: make(map[int32]*pb.ModelReplicaStatus),
104+
State: &pb.ModelStatus{
105+
State: pb.ModelStatus_ModelStateUnknown,
106+
Reason: "",
107+
AvailableReplicas: 0,
108+
UnavailableReplicas: 0,
109+
LastChangeTimestamp: nil,
110+
ModelGwState: pb.ModelStatus_ModelCreate,
111+
ModelGwReason: "",
112+
},
113+
ModelDefn: model,
114+
})
115+
116+
return &pb.ModelSnapshot{
117+
Versions: versions,
118+
Deleted: false,
119+
}
120+
}
121+
122+
func CreateNextModelVersion(snapshot *pb.ModelSnapshot, model *pb.Model) *pb.ModelSnapshot {
123+
if snapshot == nil {
124+
return NewModelSnapshot(model)
125+
}
126+
127+
if len(snapshot.Versions) < 1 {
128+
return NewModelSnapshot(model)
129+
}
130+
131+
version := snapshot.Versions[len(snapshot.Versions)-1].Version
132+
133+
snapshot.Versions = append(snapshot.Versions, &pb.ModelVersionStatus{
134+
Version: version,
135+
ServerName: "",
136+
KubernetesMeta: nil,
137+
ModelReplicaState: make(map[int32]*pb.ModelReplicaStatus),
138+
State: &pb.ModelStatus{
139+
State: pb.ModelStatus_ModelStateUnknown,
140+
Reason: "",
141+
AvailableReplicas: 0,
142+
UnavailableReplicas: 0,
143+
LastChangeTimestamp: nil,
144+
ModelGwState: pb.ModelStatus_ModelCreate,
145+
ModelGwReason: "",
146+
},
147+
ModelDefn: model,
148+
})
149+
150+
return snapshot
151+
}
152+
93153
// TODO: remove deleted from here and reflect in callers
94154
// This is only used in tests, thus we don't need to worry about modelGWState
95155
func NewModelVersion(model *pb.Model, version uint32, server string, replicas map[int]ReplicaStatus, deleted bool, state ModelState) *ModelVersion {
@@ -396,6 +456,18 @@ func (m *Model) Inactive() bool {
396456
return m.Latest().Inactive()
397457
}
398458

459+
func ModelInactive(snap *pb.ModelSnapshot) bool {
460+
461+
}
462+
463+
func ModelLatest(snap *pb.ModelSnapshot) *pb.ModelVersionStatus {
464+
if len(snap.Versions) > 0 {
465+
return snap.Versions[len(snap.Versions)-1]
466+
} else {
467+
return nil
468+
}
469+
}
470+
399471
func (m *Model) IsDeleted() bool {
400472
return m.deleted.Load()
401473
}

0 commit comments

Comments
 (0)