Skip to content

Commit

Permalink
feat: add raft to cron service
Browse files Browse the repository at this point in the history
  • Loading branch information
worstell committed Feb 4, 2025
1 parent 78dc581 commit 002c87f
Show file tree
Hide file tree
Showing 13 changed files with 673 additions and 30 deletions.
65 changes: 65 additions & 0 deletions backend/cron/integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//go:build integration

package cron

import (
"context"
"testing"
"time"

"github.com/alecthomas/assert/v2"
timelinepb "github.com/block/ftl/backend/protos/xyz/block/ftl/timeline/v1"
in "github.com/block/ftl/internal/integration"
)

func TestCronIntegration(t *testing.T) {
in.Run(t,
in.WithRaft(),
in.WithLanguages("go"),
in.CopyModule("cron"),
in.Deploy("cron"),

in.Sleep(3*time.Second),
in.VerifyTimeline(100, []*timelinepb.GetTimelineRequest_Filter{
{
Filter: &timelinepb.GetTimelineRequest_Filter_EventTypes{
EventTypes: &timelinepb.GetTimelineRequest_EventTypeFilter{
EventTypes: []timelinepb.EventType{timelinepb.EventType_EVENT_TYPE_CRON_SCHEDULED},
},
},
},
}, func(ctx context.Context, t testing.TB, events []*timelinepb.Event) {
assert.True(t, len(events) > 0, "Expected at least 1 cron scheduled event")

for _, event := range events {
scheduled, ok := event.Entry.(*timelinepb.Event_CronScheduled)
assert.True(t, ok, "expected scheduled event")
assert.Equal(t, "cron", scheduled.CronScheduled.VerbRef.Module)
assert.Equal(t, "job", scheduled.CronScheduled.VerbRef.Name)
}
}),

in.Sleep(4*time.Second),
in.VerifyTimeline(100, []*timelinepb.GetTimelineRequest_Filter{
{
Filter: &timelinepb.GetTimelineRequest_Filter_EventTypes{
EventTypes: &timelinepb.GetTimelineRequest_EventTypeFilter{
EventTypes: []timelinepb.EventType{timelinepb.EventType_EVENT_TYPE_CRON_SCHEDULED},
},
},
},
}, func(ctx context.Context, t testing.TB, events []*timelinepb.Event) {
assert.True(t, len(events) > 0, "Expected at least 1 cron scheduled event")

for _, event := range events {
scheduled, ok := event.Entry.(*timelinepb.Event_CronScheduled)
assert.True(t, ok, "expected scheduled event")
assert.Equal(t, "cron", scheduled.CronScheduled.VerbRef.Module)
assert.Equal(t, "job", scheduled.CronScheduled.VerbRef.Name)
}

firstCount := len(events)
assert.True(t, firstCount > 0, "should have accumulated events")
}),
)
}
192 changes: 166 additions & 26 deletions backend/cron/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package cron

import (
"context"
"encoding/json"
"fmt"
"hash/fnv"
"net/url"
"sort"
"time"
Expand All @@ -14,9 +16,11 @@ import (
"github.com/block/ftl/common/cron"
"github.com/block/ftl/common/schema"
"github.com/block/ftl/common/slices"
"github.com/block/ftl/internal/eventstream"
"github.com/block/ftl/internal/key"
"github.com/block/ftl/internal/log"
"github.com/block/ftl/internal/model"
"github.com/block/ftl/internal/raft"
"github.com/block/ftl/internal/routing"
"github.com/block/ftl/internal/rpc/headers"
"github.com/block/ftl/internal/schema/schemaeventsource"
Expand All @@ -32,9 +36,14 @@ type cronJob struct {
next time.Time
}

func (c cronJob) Key() string {
return c.module + "." + c.verb.Name
}

type Config struct {
SchemaServiceEndpoint *url.URL `name:"ftl-endpoint" help:"Schema Service endpoint." env:"FTL_SCHEMA_ENDPOINT" default:"http://127.0.0.1:8897"`
TimelineEndpoint *url.URL `help:"Timeline endpoint." env:"FTL_TIMELINE_ENDPOINT" default:"http://127.0.0.1:8894"`
ShardCount int `help:"Number of Raft shards to distribute jobs across" env:"RAFT_SHARD_COUNT" default:"1"`
}

func (c cronJob) String() string {
Expand All @@ -46,10 +55,85 @@ func (c cronJob) String() string {
return desc + next
}

// CronView represents the state of scheduled cron jobs
type CronView struct {
// Map of job key to last execution time
LastExecutions map[string]time.Time
// Map of job key to next scheduled time
NextExecutions map[string]time.Time
}

func (v CronView) MarshalBinary() ([]byte, error) {
bytes, err := json.Marshal(v)
if err != nil {
return nil, fmt.Errorf("failed to marshal cron view: %w", err)
}
return bytes, nil
}

func (v *CronView) UnmarshalBinary(data []byte) error {
if err := json.Unmarshal(data, v); err != nil {
return fmt.Errorf("failed to unmarshal cron view: %w", err)
}
return nil
}

// CronEvent represents changes to the cron state
type CronEvent struct {
// Job that was executed
JobKey string
// When the job was executed
ExecutedAt time.Time
// Next scheduled execution
NextExecution time.Time
}

func (e CronEvent) MarshalBinary() ([]byte, error) {
bytes, err := json.Marshal(e)
if err != nil {
return nil, fmt.Errorf("failed to marshal cron event: %w", err)
}
return bytes, nil
}

func (e *CronEvent) UnmarshalBinary(data []byte) error {
if err := json.Unmarshal(data, e); err != nil {
return fmt.Errorf("failed to unmarshal cron event: %w", err)
}
return nil
}

// Handle applies the event to the view
func (e CronEvent) Handle(view CronView) (CronView, error) {
if view.LastExecutions == nil {
view.LastExecutions = make(map[string]time.Time)
}
if view.NextExecutions == nil {
view.NextExecutions = make(map[string]time.Time)
}

view.LastExecutions[e.JobKey] = e.ExecutedAt
view.NextExecutions[e.JobKey] = e.NextExecution
return view, nil
}

// Start the cron service. Blocks until the context is cancelled.
func Start(ctx context.Context, eventSource schemaeventsource.EventSource, client routing.CallClient, timelineClient *timelineclient.Client) error {
func Start(ctx context.Context, config *Config, raftConfig *raft.RaftConfig, eventSource schemaeventsource.EventSource, client routing.CallClient, timelineClient *timelineclient.Client) error {
logger := log.FromContext(ctx).Scope("cron")
ctx = log.ContextWithLogger(ctx, logger)

raftBuilder := raft.NewBuilder(raftConfig)
shardViews := make([]eventstream.EventView[CronView, CronEvent], config.ShardCount)
for i := range shardViews {
shardViews[i] = raft.AddEventView[CronView, *CronView, CronEvent, *CronEvent](ctx, raftBuilder, uint64(i))
}
cluster := raftBuilder.Build(ctx)
defer cluster.Stop(ctx)

if err := cluster.Start(ctx); err != nil {
return fmt.Errorf("failed to start Raft cluster: %w", err)
}

// Map of cron jobs for each module.
cronJobs := map[string][]cronJob{}
// Cron jobs ordered by next execution.
Expand All @@ -61,11 +145,25 @@ func Start(ctx context.Context, eventSource schemaeventsource.EventSource, clien
next, ok := scheduleNext(ctx, cronQueue, timelineClient)
var nextCh <-chan time.Time
if ok {
if next == 0 {
// Execute immediately
select {
case <-ctx.Done():
return fmt.Errorf("cron service stopped: %w", ctx.Err())
default:
if err := executeJob(ctx, shardViews, config.ShardCount, client, &cronQueue[0]); err != nil {
logger.Errorf(err, "Failed to execute job")
}
orderQueue(cronQueue)
continue
}
}
logger.Debugf("Next cron job scheduled in %s", next)
nextCh = time.After(next)
} else {
logger.Debugf("No cron jobs scheduled")
}

select {
case <-ctx.Done():
return fmt.Errorf("cron service stopped: %w", ctx.Err())
Expand All @@ -79,37 +177,65 @@ func Start(ctx context.Context, eventSource schemaeventsource.EventSource, clien

// Execute scheduled cron job
case <-nextCh:
job := cronQueue[0]
logger.Debugf("Executing cron job %s", job)

nextRun, err := cron.Next(job.pattern, false)
if err != nil {
logger.Errorf(err, "Failed to calculate next run time")
if err := executeJob(ctx, shardViews, config.ShardCount, client, &cronQueue[0]); err != nil {
logger.Errorf(err, "Failed to execute job")
continue
}
job.next = nextRun
cronQueue[0] = job
orderQueue(cronQueue)

cronModel := model.CronJob{
// TODO: We don't have the runner key available here.
Key: key.NewCronJobKey(job.module, job.verb.Name),
Verb: schema.Ref{Module: job.module, Name: job.verb.Name},
Schedule: job.pattern.String(),
StartTime: time.Now(),
NextExecution: job.next,
}
observability.Cron.JobStarted(ctx, cronModel)
if err := callCronJob(ctx, client, job); err != nil {
observability.Cron.JobFailed(ctx, cronModel)
logger.Errorf(err, "Failed to execute cron job")
} else {
observability.Cron.JobSuccess(ctx, cronModel)
}
}
}
}

func executeJob(ctx context.Context, shardViews []eventstream.EventView[CronView, CronEvent], shardCount int, client routing.CallClient, job *cronJob) error {
logger := log.FromContext(ctx).Scope("cron")
logger.Debugf("Executing cron job %s", job)

shardIdx := getJobHash(job, shardCount)
shard := shardViews[shardIdx]

view, err := shard.View(ctx)
if err != nil {
return fmt.Errorf("failed to get job state: %w", err)
}

lastExec, hasLast := view.LastExecutions[job.Key()]
if hasLast && !job.next.After(lastExec) {
logger.Debugf("Skipping already executed job %s", job.Key())
return nil
}

nextRun, err := cron.Next(job.pattern, false)
if err != nil {
return fmt.Errorf("failed to calculate next run time: %w", err)
}

event := CronEvent{
JobKey: job.Key(),
ExecutedAt: time.Now(),
NextExecution: nextRun,
}
if err := shard.Publish(ctx, event); err != nil {
return fmt.Errorf("failed to claim job execution: %w", err)
}

job.next = nextRun

cronModel := model.CronJob{
Key: key.NewCronJobKey(job.module, job.verb.Name),
Verb: schema.Ref{Module: job.module, Name: job.verb.Name},
Schedule: job.pattern.String(),
StartTime: event.ExecutedAt,
NextExecution: job.next,
}
observability.Cron.JobStarted(ctx, cronModel)
if err := callCronJob(ctx, client, *job); err != nil {
observability.Cron.JobFailed(ctx, cronModel)
return fmt.Errorf("failed to execute cron job: %w", err)
}
observability.Cron.JobSuccess(ctx, cronModel)
return nil
}

func callCronJob(ctx context.Context, verbClient routing.CallClient, cronJob cronJob) error {
logger := log.FromContext(ctx).Scope("cron")
ref := schema.Ref{Module: cronJob.module, Name: cronJob.verb.Name}
Expand Down Expand Up @@ -141,13 +267,20 @@ func scheduleNext(ctx context.Context, cronQueue []cronJob, timelineClient *time
if len(cronQueue) == 0 {
return 0, false
}

// If next execution is in the past, schedule immediately
next := time.Until(cronQueue[0].next)
if next < 0 {
next = 0
}

timelineClient.Publish(ctx, timelineclient.CronScheduled{
DeploymentKey: cronQueue[0].deployment,
Verb: schema.Ref{Module: cronQueue[0].module, Name: cronQueue[0].verb.Name},
ScheduledAt: cronQueue[0].next,
Schedule: cronQueue[0].pattern.String(),
})
return time.Until(cronQueue[0].next), true
return next, true
}

func updateCronJobs(ctx context.Context, cronJobs map[string][]cronJob, change schemaeventsource.Event) error {
Expand Down Expand Up @@ -215,3 +348,10 @@ func extractCronJobs(module *schema.Module) ([]cronJob, error) {
}
return cronJobs, nil
}

// getJobHash returns a shard index for the job
func getJobHash(job *cronJob, shardCount int) int {
h := fnv.New32()
h.Write([]byte(job.Key()))
return int(h.Sum32()) % shardCount
}
Loading

0 comments on commit 002c87f

Please sign in to comment.