Skip to content

Commit

Permalink
go: statspro/jobqueue: A bit of cleanup, fix a flakey test.
Browse files Browse the repository at this point in the history
  • Loading branch information
reltuk committed Feb 12, 2025
1 parent 7a35c3d commit 468dafc
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 85 deletions.
140 changes: 55 additions & 85 deletions go/libraries/doltcore/sqle/statspro/jobqueue/serialqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,65 +103,41 @@ func (s *SerialQueue) Run(ctx context.Context) {

// Start the queue. The queue can be in any state, including already started.
func (s *SerialQueue) Start() error {
resp := make(chan schedResp, 1)
select {
case s.schedCh <- schedReq{
reqType: schedMsgType_Start,
resp: resp,
}:
return (<-resp).err
case <-s.completed:
return ErrCompletedQueue
}
return s.makeReq(schedReq{
reqType: schedReqType_Start,
resp: make(chan schedResp, 1),
})
}

// Pause the queue. The queue can be in any state, including already
// paused. Note that pausing the queue does not block on any
// currently running job to complete. A pattern to pause the queue
// with a guarantee that nothing is currently running is:
//
// s.InterruptSync(context.Background(), func() { q.Pause() })
// s.InterruptSync(context.Background(), func() { s.Pause() })
func (s *SerialQueue) Pause() error {
resp := make(chan schedResp, 1)
select {
case s.schedCh <- schedReq{
reqType: schedMsgType_Pause,
resp: resp,
}:
return (<-resp).err
case <-s.completed:
return ErrCompletedQueue
}
return s.makeReq(schedReq{
reqType: schedReqType_Pause,
resp: make(chan schedResp, 1),
})
}

// Stop the queue. The queue can be in any state, including already
// stopped. Note that stopping the queue does not block on any
// currently running job to complete.
func (s *SerialQueue) Stop() error {
resp := make(chan schedResp, 1)
select {
case s.schedCh <- schedReq{
reqType: schedMsgType_Stop,
resp: resp,
}:
return (<-resp).err
case <-s.completed:
return ErrCompletedQueue
}
return s.makeReq(schedReq{
reqType: schedReqType_Stop,
resp: make(chan schedResp, 1),
})
}

// Purge the queue. All pending jobs will be dropped.
func (s *SerialQueue) Purge() error {
resp := make(chan schedResp, 1)
select {
case s.schedCh <- schedReq{
reqType: schedMsgType_Purge,
resp: resp,
}:
return (<-resp).err
case <-s.completed:
return ErrCompletedQueue
}
return s.makeReq(schedReq{
reqType: schedReqType_Purge,
resp: make(chan schedResp, 1),
})
}

// Run a high priority job on the SerialQueue, blocking for its completion.
Expand Down Expand Up @@ -222,27 +198,30 @@ func (s *SerialQueue) DoAsync(f func()) error {
// Helper function to submit work. Returns the work submitted, if it
// was successful, and an error otherwise.
func (s *SerialQueue) submitWork(pri schedPriority, f func()) (work, error) {
resp := make(chan schedResp, 1)
w := work{
f: f,
done: make(chan struct{}),
}
select {
case s.schedCh <- schedReq{
reqType: schedMsgType_Enqueue,
err := s.makeReq(schedReq{
reqType: schedReqType_Enqueue,
pri: pri,
work: w,
resp: resp,
}:
r := <-resp
if r.err != nil {
return work{}, r.err
}
return w, nil
case <-s.completed:
return work{}, ErrCompletedQueue
resp: make(chan schedResp, 1),
})
if err != nil {
return work{}, err
}
return w, nil
}

func (s *SerialQueue) makeReq(req schedReq) error {
select {
case s.schedCh <- req:
resp := <-req.resp
return resp.err
case <-s.completed:
return ErrCompletedQueue
}
}

// Read off the input channels and maintain queues of pending work.
Expand All @@ -260,27 +239,22 @@ func (s *SerialQueue) runScheduler(ctx context.Context) {
if highQ.Len() > 0 {
sendWorkCh = s.runnerCh
sendWork = highQ.Front()
sentWorkCallback = func() {
highQ.Pop()
}
sentWorkCallback = highQ.Pop
} else if normalQ.Len() > 0 {
sendWorkCh = s.runnerCh
sendWork = normalQ.Front()
sentWorkCallback = func() {
normalQ.Pop()
}
sentWorkCallback = normalQ.Pop
}
}

select {
case msg := <-s.schedCh:
switch msg.reqType {
case schedMsgType_Enqueue:
case schedReqType_Enqueue:
if state == schedState_Stopped {
msg.resp <- schedResp{
err: ErrStoppedQueue,
}
close(msg.resp)
} else {
if msg.pri == schedPriority_High {
highQ.Push(msg.work)
Expand All @@ -290,33 +264,28 @@ func (s *SerialQueue) runScheduler(ctx context.Context) {
msg.resp <- schedResp{
err: nil,
}
close(msg.resp)
}
case schedMsgType_Purge:
case schedReqType_Purge:
highQ = circular.NewBuff[work](highQ.Cap())
normalQ = circular.NewBuff[work](normalQ.Cap())
msg.resp <- schedResp{
err: nil,
}
close(msg.resp)
case schedMsgType_Start:
case schedReqType_Start:
state = schedState_Running
msg.resp <- schedResp{
err: nil,
}
close(msg.resp)
case schedMsgType_Pause:
case schedReqType_Pause:
state = schedState_Paused
msg.resp <- schedResp{
err: nil,
}
close(msg.resp)
case schedMsgType_Stop:
case schedReqType_Stop:
state = schedState_Stopped
msg.resp <- schedResp{
err: nil,
}
close(msg.resp)
}
case sendWorkCh <- sendWork:
// Pop from queue the work came from.
Expand All @@ -328,15 +297,12 @@ func (s *SerialQueue) runScheduler(ctx context.Context) {
}

// Read off the runner channel and run the submitted work.
// Returns when
func (s *SerialQueue) runRunner(ctx context.Context) {
for {
select {
case w := <-s.runnerCh:
w.f()
if w.done != nil {
close(w.done)
}
close(w.done)
case <-ctx.Done():
return
}
Expand All @@ -347,8 +313,7 @@ func (s *SerialQueue) runRunner(ctx context.Context) {
type work struct {
// The function to call.
f func()
// If non-nil, the channel to close after the work
// is run.
// The channel to close after the work is run.
done chan struct{}
}

Expand All @@ -369,11 +334,11 @@ const (
type schedReqType int

const (
schedMsgType_Enqueue schedReqType = iota
schedMsgType_Purge
schedMsgType_Start
schedMsgType_Pause
schedMsgType_Stop
schedReqType_Enqueue schedReqType = iota
schedReqType_Purge
schedReqType_Start
schedReqType_Pause
schedReqType_Stop
)

type schedPriority int
Expand All @@ -386,9 +351,14 @@ const (
// Incoming message for the scheduler thread.
type schedReq struct {
reqType schedReqType
resp chan schedResp
pri schedPriority
work work
// Always set, the scheduler's response is
// sent through this channel. The send
// must never block.
resp chan schedResp
// Set when |reqType| is Enqueue
pri schedPriority
// Set when |reqType| is Enqueue
work work
}

type schedResp struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,17 @@ func TestSerialQueue(t *testing.T) {
}()
assert.NoError(t, queue.Pause())
var cnt int
didRun := make(chan struct{})
for i := 0; i < 16; i++ {
err := queue.DoAsync(func() {
cnt += 1
assert.NoError(t, queue.Purge())
close(didRun)
})
assert.NoError(t, err)
}
assert.NoError(t, queue.Start())
<-didRun
assert.NoError(t, queue.DoSync(context.Background(), func() {}))
assert.Equal(t, cnt, 1)
cancel()
Expand Down

0 comments on commit 468dafc

Please sign in to comment.