Skip to content

Commit fc68583

Browse files
committed
wip fsm
1 parent 6a1873f commit fc68583

File tree

8 files changed

+614
-56
lines changed

8 files changed

+614
-56
lines changed

apis/go/mlops/scheduler/storage.pb.go

Lines changed: 94 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

apis/mlops/scheduler/storage.proto

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,8 @@ message ExperimentSnapshot {
2020
// on restart, which would guard against lost events in communication.
2121
bool deleted = 2;
2222
}
23+
24+
message ModelSnapshot{
25+
repeated ModelVersionStatus versions = 1;
26+
bool deleted = 2;
27+
}
Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,20 @@
1+
/*
2+
Copyright (c) 2024 Seldon Technologies Ltd.
3+
4+
Use of this software is governed BY
5+
(1) the license included in the LICENSE file or
6+
(2) if the license included in the LICENSE file is the Business Source License 1.1,
7+
the Change License after the Change Date as each is defined in accordance with the LICENSE file.
8+
*/
9+
110
package fsm
211

312
import (
413
"context"
514
"fmt"
615

716
pb "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler"
17+
"github.com/seldonio/seldon-core/scheduler/v2/pkg/fsm/statemachine"
818
)
919

1020
// Concrete event types
@@ -23,36 +33,29 @@ func (e *LoadModelEvent) Marshal() ([]byte, error) {
2333

2434
type LoadModelEventHandler struct {
2535
store KVStore
36+
fsm statemachine.FSM
2637
}
2738

2839
func NewLoadModelEventHandler(store KVStore) *LoadModelEventHandler {
29-
return &LoadModelEventHandler{store: store}
40+
return &LoadModelEventHandler{store: store, fsm: statemachine.NewStateMachine()}
3041
}
3142

3243
// Handle implementations (business logic goes here)
3344
func (e *LoadModelEventHandler) Handle(ctx context.Context, event Event) ([]OutputEvent, error) {
3445
loadEvent, ok := event.(*LoadModelEvent)
3546
if !ok {
36-
return nil, fmt.Errorf("invalid event type")
47+
return nil, fmt.Errorf("invalid event type, expected LoadModelEvent")
3748
}
3849

39-
// save new model to storage if new
40-
41-
// if not new update model storage
42-
43-
// generate model event status for other services to ingest
50+
// 1. Load current cluster state from KVStore
4451

45-
// schedule model into server
52+
// 2. Call pure business logic from state machine statemachine.ApplyLoadModel
4653

47-
// retrieve servers and their models
54+
// 3. Get resulting state in cluster
4855

49-
// filter out the server
56+
// 4. convert domain events into infrastructure events
5057

51-
// find a suitable server to schedule
52-
53-
// generate agent schedule event
54-
55-
// generate event to update pipeline status if pipeline is affected by model
58+
// 5. save cluster state and fan out events
5659

5760
/*
5861
Errors:
@@ -69,12 +72,5 @@ func (e *LoadModelEventHandler) Handle(ctx context.Context, event Event) ([]Outp
6972
7073
*/
7174

72-
// TODO: Implement business logic
73-
// - Validate request
74-
// - Update model state in KVStore
75-
// - Generate output events (e.g., "ModelLoadStarted", "NotifyServer", etc.)
76-
77-
_ = loadEvent // use the event
78-
7975
return []OutputEvent{}, nil
8076
}

scheduler/pkg/fsm/fsm.go

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
/*
2+
Copyright (c) 2024 Seldon Technologies Ltd.
3+
4+
Use of this software is governed BY
5+
(1) the license included in the LICENSE file or
6+
(2) if the license included in the LICENSE file is the Business Source License 1.1,
7+
the Change License after the Change Date as each is defined in accordance with the LICENSE file.
8+
*/
9+
110
package fsm
211

312
import (
@@ -11,26 +20,11 @@ import (
1120
type KVStore interface {
1221
// Get retrieves value and current version
1322
Get(ctx context.Context, key string) (value []byte, version int64, err error)
14-
// Set with version check (returns ErrVersionConflict if version doesn't match)
15-
SetWithVersion(ctx context.Context, key string, value []byte, expectedVersion int64) (newVersion int64, err error)
1623
// Set without version check (for initial writes)
1724
Set(ctx context.Context, key string, value []byte) (int64, error)
1825
Delete(ctx context.Context, key string) error
1926
}
2027

21-
// ErrVersionConflict indicates an optimistic locking conflict
22-
var ErrVersionConflict = fmt.Errorf("version conflict")
23-
24-
// EventLog interface for append-only event storage
25-
type EventLog interface {
26-
// Append writes an event to the log and returns its sequence number
27-
Append(ctx context.Context, event Event) (int64, error)
28-
// MarkCommitted marks an event as successfully processed
29-
MarkCommitted(ctx context.Context, seqNum int64) error
30-
// GetUncommitted returns all uncommitted events (for crash recovery)
31-
GetUncommitted(ctx context.Context) ([]Event, error)
32-
}
33-
3428
// Event represents an incoming event to the FSM
3529
type Event interface {
3630
Type() EventType
@@ -53,19 +47,17 @@ type OutputEvent interface {
5347
type Fsm struct {
5448
name string
5549
store KVStore
56-
log EventLog
5750
handlers map[EventType]Handler
5851
}
5952

6053
type Handler interface {
6154
Handle(ctx context.Context, ev Event) ([]OutputEvent, error)
6255
}
6356

64-
func NewFSM(name string, store KVStore, log EventLog) *Fsm {
57+
func NewFSM(name string, store KVStore) *Fsm {
6558
fsm := &Fsm{
6659
name: name,
6760
store: store,
68-
log: log,
6961
handlers: make(map[EventType]Handler),
7062
}
7163

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package statemachine
2+
3+
import pb "github.com/seldonio/seldon-core/apis/go/v2/mlops/scheduler"
4+
5+
// ClusterState represents the state of things in the cluster needed for an event to be applied
6+
type ClusterState struct {
7+
Model map[string]*pb.ModelSnapshot
8+
Pipelines map[string]*pb.PipelineSnapshot
9+
Experiments map[string]*pb.ExperimentSnapshot
10+
}

0 commit comments

Comments
 (0)