Skip to content

Commit 08fa6e7

Browse files
authored
[ENH]: Add Rust task client and execution operators (#5686)
## Description of changes This change is second in a series of changes to implement the TaskRunner. This PR does the actual running of tasks and makes the required modifications to the Compaction operators for this to work. - Improvements & Bug fixes - New functionality - Added `TaskUuid`, `NonceUuid`, and `JobUuid` types to work alongside `CollectionUuid`​ in the compactor code. - Created new operators for task execution: - `PrepareTaskOperator`: Prepares a task for execution. Resolves a taskuuid to a Task object. Updates `next_nonce`​ if needed. - `ExecuteTaskOperator`: Executes task logic on input records - `FinishTaskOperator`: Updates task state after execution - Added `TaskExecutor` trait and `CountTask` implementation - Enhanced `GetCollectionAndSegmentsOperator` to fetch both input and output collections - Updated `RegisterOperator` to handle task-based compactions ## Test plan - [ ] Tests pass locally with `pytest` for python, `yarn test` for js, `cargo test` for rust ## Migration plan No migrations needed. The changes maintain backward compatibility with existing code paths while adding new functionality for task execution. ## Observability plan ## Documentation Changes No user-facing API changes requiring documentation updates.
1 parent 7ebd06f commit 08fa6e7

File tree

20 files changed

+2041
-147
lines changed

20 files changed

+2041
-147
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ tracing = { version = "0.1" }
5151
tracing-bunyan-formatter = "0.3"
5252
tracing-opentelemetry = "0.28.0"
5353
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
54-
uuid = { version = "1.11.0", features = ["v4", "fast-rng", "macro-diagnostics", "serde"] }
54+
uuid = { version = "1.11.0", features = ["v4", "v7", "fast-rng", "macro-diagnostics", "serde"] }
5555
utoipa = { version = "5", features = ["macros", "axum_extras", "debug", "uuid"] }
5656
sqlx = { version = "0.8.3", features = ["runtime-tokio", "sqlite", "postgres", "chrono"] }
5757
sha2 = "0.10.8"

Tiltfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -237,7 +237,7 @@ k8s_resource(
237237
k8s_resource('postgres', resource_deps=['k8s_setup'], labels=["infrastructure"], port_forwards='5432:5432')
238238
# Jobs are suffixed with the image tag to ensure they are unique. In this context, the image tag is defined in k8s/distributed-chroma/values.yaml.
239239
k8s_resource('sysdb-migration-latest', resource_deps=['postgres'], labels=["infrastructure"])
240-
k8s_resource('rust-log-service', labels=["chroma"], port_forwards='50054:50051', resource_deps=['minio-deployment'])
240+
k8s_resource('rust-log-service', labels=["chroma"], port_forwards=['50054:50051', '50052:50052'], resource_deps=['minio-deployment'])
241241
k8s_resource('sysdb', resource_deps=['sysdb-migration-latest'], labels=["chroma"], port_forwards='50051:50051')
242242
k8s_resource('rust-frontend-service', resource_deps=['sysdb', 'rust-log-service'], labels=["chroma"], port_forwards='8000:8000')
243243
k8s_resource('query-service', resource_deps=['sysdb'], labels=["chroma"], port_forwards='50053:50051')

go/pkg/sysdb/coordinator/task.go

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package coordinator
22

33
import (
44
"context"
5+
"math"
56
"strings"
67
"time"
78

@@ -195,6 +196,15 @@ func (s *Coordinator) GetTaskByName(ctx context.Context, req *coordinatorpb.GetT
195196
}
196197
}
197198

199+
// Validate completion_offset is non-negative before converting to uint64
200+
if task.CompletionOffset < 0 {
201+
log.Error("GetTaskByName: invalid completion_offset",
202+
zap.String("task_id", task.ID.String()),
203+
zap.Int64("completion_offset", task.CompletionOffset))
204+
return nil, status.Errorf(codes.Internal,
205+
"task has invalid completion_offset: %d", task.CompletionOffset)
206+
}
207+
198208
// Convert task to response with nested Task message
199209
taskProto := &coordinatorpb.Task{
200210
TaskId: task.ID.String(),
@@ -203,13 +213,15 @@ func (s *Coordinator) GetTaskByName(ctx context.Context, req *coordinatorpb.GetT
203213
InputCollectionId: task.InputCollectionID,
204214
OutputCollectionName: task.OutputCollectionName,
205215
Params: paramsStruct,
206-
CompletionOffset: task.CompletionOffset,
216+
CompletionOffset: uint64(task.CompletionOffset),
207217
MinRecordsForTask: uint64(task.MinRecordsForTask),
208218
TenantId: task.TenantID,
209219
DatabaseId: task.DatabaseID,
210220
NextRunAt: uint64(task.NextRun.UnixMicro()),
211221
LowestLiveNonce: "",
212222
NextNonce: task.NextNonce.String(),
223+
CreatedAt: uint64(task.CreatedAt.UnixMicro()),
224+
UpdatedAt: uint64(task.UpdatedAt.UnixMicro()),
213225
}
214226
// Add lowest_live_nonce if it's set
215227
if task.LowestLiveNonce != nil {
@@ -269,6 +281,15 @@ func (s *Coordinator) GetTaskByUuid(ctx context.Context, req *coordinatorpb.GetT
269281
}
270282
}
271283

284+
// Validate completion_offset is non-negative before converting to uint64
285+
if task.CompletionOffset < 0 {
286+
log.Error("GetTaskByUuid: invalid completion_offset",
287+
zap.String("task_id", task.ID.String()),
288+
zap.Int64("completion_offset", task.CompletionOffset))
289+
return nil, status.Errorf(codes.Internal,
290+
"task has invalid completion_offset: %d", task.CompletionOffset)
291+
}
292+
272293
// Convert task to response with nested Task message
273294
taskProto := &coordinatorpb.Task{
274295
TaskId: task.ID.String(),
@@ -277,13 +298,15 @@ func (s *Coordinator) GetTaskByUuid(ctx context.Context, req *coordinatorpb.GetT
277298
InputCollectionId: task.InputCollectionID,
278299
OutputCollectionName: task.OutputCollectionName,
279300
Params: paramsStruct,
280-
CompletionOffset: task.CompletionOffset,
301+
CompletionOffset: uint64(task.CompletionOffset),
281302
MinRecordsForTask: uint64(task.MinRecordsForTask),
282303
TenantId: task.TenantID,
283304
DatabaseId: task.DatabaseID,
284305
NextRunAt: uint64(task.NextRun.UnixMicro()),
285306
LowestLiveNonce: "",
286307
NextNonce: task.NextNonce.String(),
308+
CreatedAt: uint64(task.CreatedAt.UnixMicro()),
309+
UpdatedAt: uint64(task.UpdatedAt.UnixMicro()),
287310
}
288311
// Add lowest_live_nonce if it's set
289312
if task.LowestLiveNonce != nil {
@@ -480,16 +503,34 @@ func (s *Coordinator) AdvanceTask(ctx context.Context, req *coordinatorpb.Advanc
480503
return nil, status.Errorf(codes.InvalidArgument, "invalid task_run_nonce: %v", err)
481504
}
482505

483-
advanceTask, err := s.catalog.metaDomain.TaskDb(ctx).AdvanceTask(taskID, taskRunNonce, *req.CompletionOffset, *req.NextRunDelaySecs)
506+
// Validate completion_offset fits in int64 before storing in database
507+
if *req.CompletionOffset > uint64(math.MaxInt64) { // math.MaxInt64
508+
log.Error("AdvanceTask: completion_offset too large",
509+
zap.Uint64("completion_offset", *req.CompletionOffset))
510+
return nil, status.Errorf(codes.InvalidArgument,
511+
"completion_offset too large: %d", *req.CompletionOffset)
512+
}
513+
completionOffsetInt64 := int64(*req.CompletionOffset)
514+
515+
advanceTask, err := s.catalog.metaDomain.TaskDb(ctx).AdvanceTask(taskID, taskRunNonce, completionOffsetInt64, *req.NextRunDelaySecs)
484516
if err != nil {
485517
log.Error("AdvanceTask failed", zap.Error(err), zap.String("task_id", taskID.String()))
486518
return nil, err
487519
}
488520

521+
// Validate completion_offset from database is non-negative before converting to uint64
522+
if advanceTask.CompletionOffset < 0 {
523+
log.Error("AdvanceTask: invalid completion_offset from database",
524+
zap.String("task_id", taskID.String()),
525+
zap.Int64("completion_offset", advanceTask.CompletionOffset))
526+
return nil, status.Errorf(codes.Internal,
527+
"task has invalid completion_offset: %d", advanceTask.CompletionOffset)
528+
}
529+
489530
return &coordinatorpb.AdvanceTaskResponse{
490531
NextRunNonce: advanceTask.NextNonce.String(),
491532
NextRunAt: uint64(advanceTask.NextRun.UnixMilli()),
492-
CompletionOffset: advanceTask.CompletionOffset,
533+
CompletionOffset: uint64(advanceTask.CompletionOffset),
493534
}, nil
494535
}
495536

go/pkg/sysdb/grpc/collection_service.go

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package grpc
33
import (
44
"context"
55
"encoding/json"
6+
"math"
67

78
"github.com/chroma-core/chroma/go/pkg/grpcutils"
89

@@ -607,6 +608,14 @@ func (s *Server) FlushCollectionCompactionAndTask(ctx context.Context, req *coor
607608
return nil, grpcutils.BuildInternalGrpcError(err.Error())
608609
}
609610

611+
// Validate completion_offset fits in int64 before storing in database
612+
if taskUpdate.CompletionOffset > uint64(math.MaxInt64) {
613+
log.Error("FlushCollectionCompactionAndTask: completion_offset too large",
614+
zap.Uint64("completion_offset", taskUpdate.CompletionOffset))
615+
return nil, grpcutils.BuildInternalGrpcError("completion_offset too large")
616+
}
617+
completionOffsetSigned := int64(taskUpdate.CompletionOffset)
618+
610619
segmentCompactionInfo := make([]*model.FlushSegmentCompaction, 0, len(flushReq.SegmentCompactionInfo))
611620
for _, flushSegmentCompaction := range flushReq.SegmentCompactionInfo {
612621
segmentID, err := types.ToUniqueID(&flushSegmentCompaction.SegmentId)
@@ -640,7 +649,7 @@ func (s *Server) FlushCollectionCompactionAndTask(ctx context.Context, req *coor
640649
flushCollectionCompaction,
641650
taskID,
642651
taskRunNonce,
643-
taskUpdate.CompletionOffset,
652+
completionOffsetSigned,
644653
)
645654
if err != nil {
646655
log.Error("FlushCollectionCompactionAndTask failed", zap.Error(err), zap.String("collection_id", flushReq.CollectionId), zap.String("task_id", taskUpdate.TaskId))
@@ -667,7 +676,13 @@ func (s *Server) FlushCollectionCompactionAndTask(ctx context.Context, req *coor
667676
res.NextRun = timestamppb.New(*flushCollectionInfo.TaskNextRun)
668677
}
669678
if flushCollectionInfo.TaskCompletionOffset != nil {
670-
res.CompletionOffset = *flushCollectionInfo.TaskCompletionOffset
679+
// Validate completion_offset is non-negative before converting to uint64
680+
if *flushCollectionInfo.TaskCompletionOffset < 0 {
681+
log.Error("FlushCollectionCompactionAndTask: invalid completion_offset",
682+
zap.Int64("completion_offset", *flushCollectionInfo.TaskCompletionOffset))
683+
return nil, grpcutils.BuildInternalGrpcError("task has invalid completion_offset")
684+
}
685+
res.CompletionOffset = uint64(*flushCollectionInfo.TaskCompletionOffset)
671686
}
672687

673688
return res, nil

go/pkg/sysdb/metastore/db/dao/task.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ func (s *taskDb) AdvanceTask(taskID uuid.UUID, taskRunNonce uuid.UUID, completio
138138
// Bump next_nonce to mark a new run, but don't touch lowest_live_nonce yet
139139
// lowest_live_nonce will be updated later by finish_task when verification completes
140140
next_run := now.Add(time.Duration(nextRunDelaySecs) * time.Second)
141-
result := s.db.Model(&dbmodel.Task{}).Where("task_id = ?", taskID).Where("is_deleted = false").Where("next_nonce = ?", taskRunNonce).Where("completion_offset < ?", completionOffset).UpdateColumns(map[string]interface{}{
141+
result := s.db.Model(&dbmodel.Task{}).Where("task_id = ?", taskID).Where("is_deleted = false").Where("next_nonce = ?", taskRunNonce).Where("completion_offset <= ?", completionOffset).UpdateColumns(map[string]interface{}{
142142
"completion_offset": completionOffset,
143143
"next_run": next_run,
144144
"last_run": now,

idl/chromadb/proto/coordinator.proto

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ message FlushCollectionCompactionResponse {
325325
message TaskUpdateInfo {
326326
string task_id = 1;
327327
string task_run_nonce = 2;
328-
int64 completion_offset = 3;
328+
uint64 completion_offset = 3;
329329
}
330330

331331
// Combined request to flush collection compaction and update task atomically in a single transaction
@@ -341,7 +341,7 @@ message FlushCollectionCompactionAndTaskResponse {
341341
// Updated task fields from database (authoritative)
342342
string next_nonce = 4;
343343
google.protobuf.Timestamp next_run = 5;
344-
int64 completion_offset = 6;
344+
uint64 completion_offset = 6;
345345
}
346346

347347
// Used for serializing contents in collection version history file.
@@ -588,13 +588,15 @@ message Task {
588588
string output_collection_name = 5;
589589
optional string output_collection_id = 6;
590590
optional google.protobuf.Struct params = 7;
591-
int64 completion_offset = 8;
591+
uint64 completion_offset = 8;
592592
uint64 min_records_for_task = 9;
593593
string tenant_id = 10;
594594
string database_id = 11;
595595
uint64 next_run_at = 12;
596596
string lowest_live_nonce = 13;
597597
string next_nonce = 14;
598+
uint64 created_at = 15;
599+
uint64 updated_at = 16;
598600
}
599601

600602
message GetTaskByNameResponse {
@@ -623,14 +625,14 @@ message AdvanceTaskRequest {
623625
optional string collection_id = 1;
624626
optional string task_id = 2;
625627
optional string task_run_nonce = 3;
626-
optional int64 completion_offset = 4;
628+
optional uint64 completion_offset = 4;
627629
optional uint64 next_run_delay_secs = 5;
628630
}
629631

630632
message AdvanceTaskResponse {
631633
string next_run_nonce = 1;
632634
uint64 next_run_at = 2;
633-
int64 completion_offset = 3;
635+
uint64 completion_offset = 3;
634636
}
635637

636638
message FinishTaskRequest {

rust/s3heap-service/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -269,7 +269,7 @@ impl HeapTender {
269269
let schedule = Schedule {
270270
triggerable,
271271
next_scheduled,
272-
nonce: s.task_run_nonce,
272+
nonce: s.task_run_nonce.0,
273273
};
274274
Ok(Some(schedule))
275275
} else {

rust/s3heap-service/src/scheduler.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ impl HeapScheduler for SysDbScheduler {
7575
partitioning: schedule.collection_id.0.into(),
7676
scheduling: schedule.task_id.into(),
7777
},
78-
nonce: schedule.task_run_nonce,
78+
nonce: schedule.task_run_nonce.0,
7979
next_scheduled: when_to_run,
8080
});
8181
}

rust/sysdb/src/bin/chroma-task-manager.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
169169
println!("Operator: {:?}", task.operator_name);
170170
println!("Input Collection: {:?}", task.input_collection_id);
171171
println!("Output Collection Name: {:?}", task.output_collection_name);
172+
println!("Output Collection ID: {:?}", task.output_collection_id);
172173
println!("Params: {:?}", task.params);
173174
println!("Completion Offset: {:?}", task.completion_offset);
174175
println!("Min Records: {:?}", task.min_records_for_task);
@@ -198,7 +199,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
198199
collection_id: Some(collection_id),
199200
task_id: Some(task_id),
200201
task_run_nonce: Some(task_run_nonce),
201-
completion_offset: Some(completion_offset.try_into().unwrap()),
202+
completion_offset: Some(completion_offset),
202203
next_run_delay_secs: Some(next_run_delay_secs),
203204
};
204205

0 commit comments

Comments
 (0)