Skip to content

Commit ad3767e

Browse files
committed
Add models and migration for Create/Delete/Get Tasks
1 parent b02d299 commit ad3767e

35 files changed

+1755
-58
lines changed

Tiltfile

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ if config.tilt_subcommand == "ci":
2020
custom_build(
2121
'rust-log-service',
2222
'docker image tag rust-log-service:ci $EXPECTED_REF',
23-
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
23+
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
2424
disable_push=True
2525
)
2626
else:
2727
docker_build(
2828
'rust-log-service',
2929
'.',
30-
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
30+
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
3131
dockerfile='./rust/Dockerfile',
3232
target='log_service'
3333
)
@@ -68,14 +68,14 @@ if config.tilt_subcommand == "ci":
6868
custom_build(
6969
'rust-frontend-service',
7070
'docker image tag rust-frontend-service:ci $EXPECTED_REF',
71-
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
71+
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
7272
disable_push=True
7373
)
7474
else:
7575
docker_build(
7676
'rust-frontend-service',
7777
'.',
78-
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
78+
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
7979
dockerfile='./rust/Dockerfile',
8080
target='cli'
8181
)
@@ -84,14 +84,14 @@ if config.tilt_subcommand == "ci":
8484
custom_build(
8585
'query-service',
8686
'docker image tag query-service:ci $EXPECTED_REF',
87-
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
87+
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
8888
disable_push=True
8989
)
9090
else:
9191
docker_build(
9292
'query-service',
9393
'.',
94-
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
94+
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
9595
dockerfile='./rust/Dockerfile',
9696
target='query_service'
9797
)
@@ -100,14 +100,14 @@ if config.tilt_subcommand == "ci":
100100
custom_build(
101101
'compaction-service',
102102
'docker image tag compactor-service:ci $EXPECTED_REF',
103-
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
103+
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
104104
disable_push=True
105105
)
106106
else:
107107
docker_build(
108108
'compaction-service',
109109
'.',
110-
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
110+
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
111111
dockerfile='./rust/Dockerfile',
112112
target='compaction_service'
113113
)
@@ -116,14 +116,14 @@ if config.tilt_subcommand == "ci":
116116
custom_build(
117117
'garbage-collector',
118118
'docker image tag garbage-collector:ci $EXPECTED_REF',
119-
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
119+
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
120120
disable_push=True
121121
)
122122
else:
123123
docker_build(
124124
'garbage-collector',
125125
'.',
126-
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
126+
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
127127
dockerfile='./rust/Dockerfile',
128128
target='garbage_collector'
129129
)
@@ -132,14 +132,14 @@ if config.tilt_subcommand == "ci":
132132
custom_build(
133133
'load-service',
134134
'docker image tag load-service:ci $EXPECTED_REF',
135-
['./rust/', './idl/', './Cargo.toml', './Cargo.lock'],
135+
['./rust/', './idl/', './Cargo.toml', './Cargo.lock', './go/pkg/sysdb/metastore/db/dbmodel/constants.go'],
136136
disable_push=True
137137
)
138138
else:
139139
docker_build(
140140
'load-service',
141141
'.',
142-
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock"],
142+
only=["rust/", "idl/", "Cargo.toml", "Cargo.lock", "go/pkg/sysdb/metastore/db/dbmodel/constants.go"],
143143
dockerfile='./rust/Dockerfile',
144144
target='load_service'
145145
)

go/pkg/common/errors.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,14 @@ var (
4848
// Segment metadata errors
4949
ErrUnknownSegmentMetadataType = errors.New("segment metadata value type not supported")
5050

51+
// Task errors
52+
ErrTaskAlreadyExists = errors.New("the task that was being created already exists for this collection")
53+
ErrTaskNotFound = errors.New("the requested task was not found")
54+
ErrInvalidTaskName = errors.New("task name cannot start with reserved prefix '_deleted_'")
55+
56+
// Operator errors
57+
ErrOperatorNotFound = errors.New("operator not found")
58+
5159
// Others
5260
ErrCompactionOffsetSomehowAhead = errors.New("system invariant was violated. Compaction offset in sysdb should always be behind or equal to offset in log")
5361
)

go/pkg/sysdb/coordinator/task.go

Lines changed: 264 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
package coordinator
2+
3+
import (
4+
"context"
5+
"strings"
6+
"time"
7+
8+
"github.com/chroma-core/chroma/go/pkg/common"
9+
"github.com/chroma-core/chroma/go/pkg/model"
10+
"github.com/chroma-core/chroma/go/pkg/proto/coordinatorpb"
11+
"github.com/chroma-core/chroma/go/pkg/sysdb/metastore/db/dbmodel"
12+
"github.com/chroma-core/chroma/go/pkg/types"
13+
"github.com/google/uuid"
14+
"github.com/pingcap/log"
15+
"go.uber.org/zap"
16+
"google.golang.org/grpc/codes"
17+
"google.golang.org/grpc/status"
18+
"google.golang.org/protobuf/proto"
19+
)
20+
21+
// CreateTask creates a new task in the database
22+
func (s *Coordinator) CreateTask(ctx context.Context, req *coordinatorpb.CreateTaskRequest) (*coordinatorpb.CreateTaskResponse, error) {
23+
// Validate task name doesn't start with soft-deletion reserved prefix
24+
if strings.HasPrefix(req.Name, "_deleted_") {
25+
log.Error("CreateTask: task name cannot start with _deleted_")
26+
return nil, common.ErrInvalidTaskName
27+
}
28+
29+
var taskID uuid.UUID
30+
31+
// Execute all database operations in a transaction
32+
err := s.catalog.txImpl.Transaction(ctx, func(txCtx context.Context) error {
33+
// Check if task already exists
34+
existingTask, err := s.catalog.metaDomain.TaskDb(txCtx).GetByName(req.InputCollectionId, req.Name)
35+
if err != nil {
36+
log.Error("CreateTask: failed to check task", zap.Error(err))
37+
return err
38+
}
39+
if existingTask != nil {
40+
log.Info("CreateTask: task already exists, returning existing")
41+
taskID = existingTask.ID
42+
return nil
43+
}
44+
45+
// Generate new task UUID
46+
taskID = uuid.New()
47+
outputCollectionName := req.OutputCollectionName
48+
49+
// Look up database_id from databases table using database name and tenant
50+
databases, err := s.catalog.metaDomain.DatabaseDb(txCtx).GetDatabases(req.TenantId, req.Database)
51+
if err != nil {
52+
log.Error("CreateTask: failed to get database", zap.Error(err))
53+
return err
54+
}
55+
if len(databases) == 0 {
56+
log.Error("CreateTask: database not found")
57+
return common.ErrDatabaseNotFound
58+
}
59+
60+
// Look up operator by name from the operators table
61+
operator, err := s.catalog.metaDomain.OperatorDb(txCtx).GetByName(req.OperatorName)
62+
if err != nil {
63+
log.Error("CreateTask: failed to get operator", zap.Error(err))
64+
return err
65+
}
66+
if operator == nil {
67+
log.Error("CreateTask: operator not found", zap.String("operator_name", req.OperatorName))
68+
return common.ErrOperatorNotFound
69+
}
70+
operatorID := operator.OperatorID
71+
72+
// Generate UUIDv7 for time-ordered nonce
73+
nextNonce, err := uuid.NewV7()
74+
if err != nil {
75+
return err
76+
}
77+
78+
// TODO(tanujnay112): Can combine the two collection checks into one
79+
// Check if input collection exists
80+
collections, err := s.catalog.metaDomain.CollectionDb(txCtx).GetCollections([]string{req.InputCollectionId}, nil, req.TenantId, req.Database, nil, nil, false)
81+
if err != nil {
82+
log.Error("CreateTask: failed to get input collection", zap.Error(err))
83+
return err
84+
}
85+
if len(collections) == 0 {
86+
log.Error("CreateTask: input collection not found")
87+
return common.ErrCollectionNotFound
88+
}
89+
90+
// Check if output collection already exists
91+
existingOutputCollections, err := s.catalog.metaDomain.CollectionDb(txCtx).GetCollections(nil, &outputCollectionName, req.TenantId, req.Database, nil, nil, false)
92+
if err != nil {
93+
log.Error("CreateTask: failed to check output collection", zap.Error(err))
94+
return err
95+
}
96+
if len(existingOutputCollections) > 0 {
97+
log.Error("CreateTask: output collection already exists")
98+
return common.ErrCollectionUniqueConstraintViolation
99+
}
100+
101+
now := time.Now()
102+
task := &dbmodel.Task{
103+
ID: taskID,
104+
Name: req.Name,
105+
TenantID: req.TenantId,
106+
DatabaseID: databases[0].ID,
107+
InputCollectionID: req.InputCollectionId,
108+
OutputCollectionName: req.OutputCollectionName,
109+
OperatorID: operatorID,
110+
OperatorParams: req.Params,
111+
CompletionOffset: 0,
112+
LastRun: nil,
113+
NextRun: nil, // Will be set to zero initially, scheduled by task scheduler
114+
MinRecordsForTask: int64(req.MinRecordsForTask),
115+
CurrentAttempts: 0,
116+
CreatedAt: now,
117+
UpdatedAt: now,
118+
NextNonce: nextNonce,
119+
OldestWrittenNonce: nil,
120+
}
121+
122+
// Try to insert task into database
123+
err = s.catalog.metaDomain.TaskDb(txCtx).Insert(task)
124+
if err != nil {
125+
// Check if it's a unique constraint violation (concurrent creation)
126+
if err == common.ErrTaskAlreadyExists {
127+
log.Error("CreateTask: task already exists")
128+
return common.ErrTaskAlreadyExists
129+
}
130+
log.Error("CreateTask: failed to insert task", zap.Error(err))
131+
return err
132+
}
133+
134+
log.Info("Task created successfully", zap.String("task_id", taskID.String()), zap.String("name", req.Name), zap.String("output_collection_name", outputCollectionName))
135+
return nil
136+
})
137+
138+
if err != nil {
139+
return nil, err
140+
}
141+
142+
return &coordinatorpb.CreateTaskResponse{
143+
TaskId: taskID.String(),
144+
}, nil
145+
}
146+
147+
// GetTaskByName retrieves a task by name from the database
148+
func (s *Coordinator) GetTaskByName(ctx context.Context, req *coordinatorpb.GetTaskByNameRequest) (*coordinatorpb.GetTaskByNameResponse, error) {
149+
// Can do both calls with a JOIN
150+
task, err := s.catalog.metaDomain.TaskDb(ctx).GetByName(req.InputCollectionId, req.TaskName)
151+
if err != nil {
152+
return nil, err
153+
}
154+
155+
// If task not found, return empty response
156+
if task == nil {
157+
return nil, common.ErrTaskNotFound
158+
}
159+
160+
// Look up operator name from operators table
161+
operator, err := s.catalog.metaDomain.OperatorDb(ctx).GetByID(task.OperatorID)
162+
if err != nil {
163+
log.Error("GetTaskByName: failed to get operator", zap.Error(err))
164+
return nil, err
165+
}
166+
if operator == nil {
167+
log.Error("GetTaskByName: operator not found", zap.String("operator_id", task.OperatorID.String()))
168+
return nil, common.ErrOperatorNotFound
169+
}
170+
171+
// Debug logging
172+
log.Info("Found task", zap.String("task_id", task.ID.String()), zap.String("name", task.Name), zap.String("input_collection_id", task.InputCollectionID), zap.String("output_collection_name", task.OutputCollectionName))
173+
174+
// Convert task to response
175+
response := &coordinatorpb.GetTaskByNameResponse{
176+
TaskId: proto.String(task.ID.String()),
177+
Name: proto.String(task.Name),
178+
OperatorName: proto.String(operator.OperatorName),
179+
InputCollectionId: proto.String(task.InputCollectionID),
180+
OutputCollectionName: proto.String(task.OutputCollectionName),
181+
Params: proto.String(task.OperatorParams),
182+
CompletionOffset: proto.Int64(task.CompletionOffset),
183+
MinRecordsForTask: proto.Uint64(uint64(task.MinRecordsForTask)),
184+
}
185+
// Add output_collection_id if it's set
186+
if task.OutputCollectionID != nil {
187+
response.OutputCollectionId = task.OutputCollectionID
188+
}
189+
return response, nil
190+
}
191+
192+
// DeleteTask soft deletes a task by name
193+
func (s *Coordinator) DeleteTask(ctx context.Context, req *coordinatorpb.DeleteTaskRequest) (*coordinatorpb.DeleteTaskResponse, error) {
194+
// First get the task to check if we need to delete the output collection
195+
task, err := s.catalog.metaDomain.TaskDb(ctx).GetByName(req.InputCollectionId, req.TaskName)
196+
if err != nil {
197+
log.Error("DeleteTask: failed to get task", zap.Error(err))
198+
return nil, err
199+
}
200+
if task == nil {
201+
log.Error("DeleteTask: task not found")
202+
return nil, status.Errorf(codes.NotFound, "task not found")
203+
}
204+
205+
// If delete_output is true and output_collection_id is set, soft-delete the output collection
206+
if req.DeleteOutput && task.OutputCollectionID != nil && *task.OutputCollectionID != "" {
207+
collectionUUID, err := types.ToUniqueID(task.OutputCollectionID)
208+
if err != nil {
209+
log.Error("DeleteTask: invalid output_collection_id", zap.Error(err))
210+
return nil, status.Errorf(codes.InvalidArgument, "invalid output_collection_id: %v", err)
211+
}
212+
213+
deleteCollection := &model.DeleteCollection{
214+
ID: collectionUUID,
215+
TenantID: task.TenantID,
216+
DatabaseName: task.DatabaseID,
217+
}
218+
219+
err = s.SoftDeleteCollection(ctx, deleteCollection)
220+
if err != nil {
221+
// Log but don't fail - we still want to delete the task
222+
log.Warn("DeleteTask: failed to delete output collection", zap.Error(err), zap.String("collection_id", *task.OutputCollectionID))
223+
} else {
224+
log.Info("DeleteTask: deleted output collection", zap.String("collection_id", *task.OutputCollectionID))
225+
}
226+
}
227+
228+
// Now soft-delete the task
229+
err = s.catalog.metaDomain.TaskDb(ctx).SoftDelete(req.InputCollectionId, req.TaskName)
230+
if err != nil {
231+
log.Error("DeleteTask failed", zap.Error(err))
232+
return nil, err
233+
}
234+
235+
log.Info("Task deleted", zap.String("input_collection_id", req.InputCollectionId), zap.String("task_name", req.TaskName))
236+
237+
return &coordinatorpb.DeleteTaskResponse{
238+
Success: true,
239+
}, nil
240+
}
241+
242+
// GetOperators retrieves all operators from the database
243+
func (s *Coordinator) GetOperators(ctx context.Context, req *coordinatorpb.GetOperatorsRequest) (*coordinatorpb.GetOperatorsResponse, error) {
244+
operators, err := s.catalog.metaDomain.OperatorDb(ctx).GetAll()
245+
if err != nil {
246+
log.Error("GetOperators failed", zap.Error(err))
247+
return nil, err
248+
}
249+
250+
// Convert to proto response
251+
protoOperators := make([]*coordinatorpb.Operator, len(operators))
252+
for i, op := range operators {
253+
protoOperators[i] = &coordinatorpb.Operator{
254+
Id: op.OperatorID.String(),
255+
Name: op.OperatorName,
256+
}
257+
}
258+
259+
log.Info("GetOperators succeeded", zap.Int("count", len(operators)))
260+
261+
return &coordinatorpb.GetOperatorsResponse{
262+
Operators: protoOperators,
263+
}, nil
264+
}

0 commit comments

Comments
 (0)