diff --git a/go/libraries/doltcore/sqle/statspro/jobqueue/serialqueue.go b/go/libraries/doltcore/sqle/statspro/jobqueue/serialqueue.go index 5aa63055c3..15d28e2115 100644 --- a/go/libraries/doltcore/sqle/statspro/jobqueue/serialqueue.go +++ b/go/libraries/doltcore/sqle/statspro/jobqueue/serialqueue.go @@ -103,16 +103,10 @@ func (s *SerialQueue) Run(ctx context.Context) { // Start the queue. The queue can be in any state, including already started. func (s *SerialQueue) Start() error { - resp := make(chan schedResp, 1) - select { - case s.schedCh <- schedReq{ - reqType: schedMsgType_Start, - resp: resp, - }: - return (<-resp).err - case <-s.completed: - return ErrCompletedQueue - } + return s.makeReq(schedReq{ + reqType: schedReqType_Start, + resp: make(chan schedResp, 1), + }) } // Pause the queue. The queue can be in any state, including already @@ -120,48 +114,30 @@ func (s *SerialQueue) Start() error { // currently running job to complete. A pattern to pause the queue // with a guarantee that nothing is currently running is: // -// s.InterruptSync(context.Background(), func() { q.Pause() }) +// s.InterruptSync(context.Background(), func() { s.Pause() }) func (s *SerialQueue) Pause() error { - resp := make(chan schedResp, 1) - select { - case s.schedCh <- schedReq{ - reqType: schedMsgType_Pause, - resp: resp, - }: - return (<-resp).err - case <-s.completed: - return ErrCompletedQueue - } + return s.makeReq(schedReq{ + reqType: schedReqType_Pause, + resp: make(chan schedResp, 1), + }) } // Stop the queue. The queue can be in any state, including already // stopped. Note that stopping the queue does not block on any // currently running job to complete. func (s *SerialQueue) Stop() error { - resp := make(chan schedResp, 1) - select { - case s.schedCh <- schedReq{ - reqType: schedMsgType_Stop, - resp: resp, - }: - return (<-resp).err - case <-s.completed: - return ErrCompletedQueue - } + return s.makeReq(schedReq{ + reqType: schedReqType_Stop, + resp: make(chan schedResp, 1), + }) } // Purge the queue. All pending jobs will be dropped. func (s *SerialQueue) Purge() error { - resp := make(chan schedResp, 1) - select { - case s.schedCh <- schedReq{ - reqType: schedMsgType_Purge, - resp: resp, - }: - return (<-resp).err - case <-s.completed: - return ErrCompletedQueue - } + return s.makeReq(schedReq{ + reqType: schedReqType_Purge, + resp: make(chan schedResp, 1), + }) } // Run a high priority job on the SerialQueue, blocking for its completion. @@ -222,27 +198,30 @@ func (s *SerialQueue) DoAsync(f func()) error { // Helper function to submit work. Returns the work submitted, if it // was successful, and an error otherwise. func (s *SerialQueue) submitWork(pri schedPriority, f func()) (work, error) { - resp := make(chan schedResp, 1) w := work{ f: f, done: make(chan struct{}), } - select { - case s.schedCh <- schedReq{ - reqType: schedMsgType_Enqueue, + err := s.makeReq(schedReq{ + reqType: schedReqType_Enqueue, pri: pri, work: w, - resp: resp, - }: - r := <-resp - if r.err != nil { - return work{}, r.err - } - return w, nil - case <-s.completed: - return work{}, ErrCompletedQueue + resp: make(chan schedResp, 1), + }) + if err != nil { + return work{}, err } + return w, nil +} +func (s *SerialQueue) makeReq(req schedReq) error { + select { + case s.schedCh <- req: + resp := <-req.resp + return resp.err + case <-s.completed: + return ErrCompletedQueue + } } // Read off the input channels and maintain queues of pending work. @@ -260,27 +239,22 @@ func (s *SerialQueue) runScheduler(ctx context.Context) { if highQ.Len() > 0 { sendWorkCh = s.runnerCh sendWork = highQ.Front() - sentWorkCallback = func() { - highQ.Pop() - } + sentWorkCallback = highQ.Pop } else if normalQ.Len() > 0 { sendWorkCh = s.runnerCh sendWork = normalQ.Front() - sentWorkCallback = func() { - normalQ.Pop() - } + sentWorkCallback = normalQ.Pop } } select { case msg := <-s.schedCh: switch msg.reqType { - case schedMsgType_Enqueue: + case schedReqType_Enqueue: if state == schedState_Stopped { msg.resp <- schedResp{ err: ErrStoppedQueue, } - close(msg.resp) } else { if msg.pri == schedPriority_High { highQ.Push(msg.work) @@ -290,33 +264,28 @@ func (s *SerialQueue) runScheduler(ctx context.Context) { msg.resp <- schedResp{ err: nil, } - close(msg.resp) } - case schedMsgType_Purge: + case schedReqType_Purge: highQ = circular.NewBuff[work](highQ.Cap()) normalQ = circular.NewBuff[work](normalQ.Cap()) msg.resp <- schedResp{ err: nil, } - close(msg.resp) - case schedMsgType_Start: + case schedReqType_Start: state = schedState_Running msg.resp <- schedResp{ err: nil, } - close(msg.resp) - case schedMsgType_Pause: + case schedReqType_Pause: state = schedState_Paused msg.resp <- schedResp{ err: nil, } - close(msg.resp) - case schedMsgType_Stop: + case schedReqType_Stop: state = schedState_Stopped msg.resp <- schedResp{ err: nil, } - close(msg.resp) } case sendWorkCh <- sendWork: // Pop from queue the work came from. @@ -328,15 +297,12 @@ func (s *SerialQueue) runScheduler(ctx context.Context) { } // Read off the runner channel and run the submitted work. -// Returns when func (s *SerialQueue) runRunner(ctx context.Context) { for { select { case w := <-s.runnerCh: w.f() - if w.done != nil { - close(w.done) - } + close(w.done) case <-ctx.Done(): return } @@ -347,8 +313,7 @@ func (s *SerialQueue) runRunner(ctx context.Context) { type work struct { // The function to call. f func() - // If non-nil, the channel to close after the work - // is run. + // The channel to close after the work is run. done chan struct{} } @@ -369,11 +334,11 @@ const ( type schedReqType int const ( - schedMsgType_Enqueue schedReqType = iota - schedMsgType_Purge - schedMsgType_Start - schedMsgType_Pause - schedMsgType_Stop + schedReqType_Enqueue schedReqType = iota + schedReqType_Purge + schedReqType_Start + schedReqType_Pause + schedReqType_Stop ) type schedPriority int @@ -386,9 +351,14 @@ const ( // Incoming message for the scheduler thread. type schedReq struct { reqType schedReqType - resp chan schedResp - pri schedPriority - work work + // Always set, the scheduler's response is + // sent through this channel. The send + // must never block. + resp chan schedResp + // Set when |reqType| is Enqueue + pri schedPriority + // Set when |reqType| is Enqueue + work work } type schedResp struct { diff --git a/go/libraries/doltcore/sqle/statspro/jobqueue/serialqueue_test.go b/go/libraries/doltcore/sqle/statspro/jobqueue/serialqueue_test.go index 761c7f8961..dd603cc790 100644 --- a/go/libraries/doltcore/sqle/statspro/jobqueue/serialqueue_test.go +++ b/go/libraries/doltcore/sqle/statspro/jobqueue/serialqueue_test.go @@ -204,14 +204,17 @@ func TestSerialQueue(t *testing.T) { }() assert.NoError(t, queue.Pause()) var cnt int + didRun := make(chan struct{}) for i := 0; i < 16; i++ { err := queue.DoAsync(func() { cnt += 1 assert.NoError(t, queue.Purge()) + close(didRun) }) assert.NoError(t, err) } assert.NoError(t, queue.Start()) + <-didRun assert.NoError(t, queue.DoSync(context.Background(), func() {})) assert.Equal(t, cnt, 1) cancel()