Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[no-release-notes] go: statspro/jobqueue: Create a SerialQueue, which can perform asynchronous work on a worker thread. #8856

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go/go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -404,8 +404,6 @@ github.com/envoyproxy/protoc-gen-validate v0.10.1 h1:c0g45+xCJhdgFGw7a5QAfdS4byA
github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss=
github.com/envoyproxy/protoc-gen-validate v1.0.2 h1:QkIBuU5k+x7/QXPvPPnWXWlCdaBFApVqftFV6k087DA=
github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE=
github.com/esote/minmaxheap v1.0.0 h1:rgA7StnXXpZG6qlM0S7pUmEv1KpWe32rYT4x8J8ntaA=
github.com/esote/minmaxheap v1.0.0/go.mod h1:Ln8+i7fS1k3PLgZI2JAo0iA1as95QnIYiGCrqSJ5FZk=
github.com/fogleman/gg v1.3.0 h1:/7zJX8F6AaYQc57WQCyN9cAIz+4bCJGO9B+dyW29am8=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db h1:gb2Z18BhTPJPpLQWj4T+rfKHYCHxRHCtRxhKKjRidVw=
Expand Down Expand Up @@ -732,6 +730,7 @@ google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQ
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/cheggaaa/pb.v1 v1.0.25 h1:Ev7yu1/f6+d+b3pi5vPdRPc6nNtP1umSfcWiEfRqv6I=
gopkg.in/errgo.v2 v2.1.0 h1:0vLT13EuvQ0hNvakwLuFZ/jYrLp5F3kcWHXdRggjCE8=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/gcfg.v1 v1.2.3 h1:m8OOJ4ccYHnx2f4gQwpno8nAX5OGOh7RLaaz0pj3Ogs=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
package reliable

import (
"github.com/dolthub/dolt/go/libraries/doltcore/remotestorage/internal/circular"
"github.com/dolthub/dolt/go/libraries/utils/circular"
)

// A reliable.Chan is a type of channel transformer which can be used to build
Expand Down
366 changes: 366 additions & 0 deletions go/libraries/doltcore/sqle/statspro/jobqueue/serialqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,366 @@
// Copyright 2025 Dolthub, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package jobqueue

import (
"context"
"errors"
"sync"
"sync/atomic"

"github.com/dolthub/dolt/go/libraries/utils/circular"
)

// A SerialQueue is a job queue which runs one job at a time. Jobs are
// run in the order they are submitted, with the exception that every
// interrupt job is run before any normal priority job.
//
// A SerialQueue can be paused, in which case it will accept new
// submissions, but will not run them until it is started again.
//
// A SerialQueue can be purged, which deletes any pending jobs from
// it.
//
// A SerialQueue can be stopped, in which case it will not accept new
// submissions and no pending work will be run. Stopping a queue does
// not purge it, but it is easy for a caller to stop and purge the
// queue.
//
// A stopped or paused SerialQueue can be started, which will cause it
// to start running submitted jobs again, including any unpurged jobs
// which were pending when it was stopped or paused.
//
// A SerialQueue runs background threads to coordinate its
// behavior. These background threads are launched with a `Context`
// supplied to its |Run| method. If that `Context` ever becomes
// `Done`, the SerialQueue termainally enters a completed state.
//
// In general, jobs running on the queue should not block indefinitely
// and should be very careful about any synchronization. It is safe
// for jobs within the queue to call DoAsync, InterruptAsync, Stop,
// Pause, Purge and Start on the queue itself. It is a deadlock for a
// job within the queue to perform a DoSync or InterruptSync on the
// queue itself, although that deadlock may be resolved if the
// provided |ctx| ends up |Done|.
type SerialQueue struct {
running atomic.Bool

// If the queue is terminally completed, this will be closed.
// Submissions to the queue scheduler select on this channel
// to return errors if the scheduler is no longer accepting
// work.
completed chan struct{}

runnerCh chan work
schedCh chan schedReq
}

var ErrStoppedQueue = errors.New("stopped queue: cannot submit work to a stopped queue.")
var ErrCompletedQueue = errors.New("completed queue: the queue is no longer running.")

// Create a new serial queue. All of the methods on the returned
// SerialQueue block indefinitely until its |Run| method is called.
func NewSerialQueue() *SerialQueue {
return &SerialQueue{
completed: make(chan struct{}),
runnerCh: make(chan work),
schedCh: make(chan schedReq),
}
}

// Run the serial queue's background threads with this |ctx|. If the
// |ctx| ever becomes |Done|, the queue enters a terminal completed
// state. It is an error to call this function more than once.
func (s *SerialQueue) Run(ctx context.Context) {
if !s.running.CompareAndSwap(false, true) {
panic("Cannot run a SerialQueue more than once.")
}
defer close(s.completed)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
s.runScheduler(ctx)
}()
go func() {
defer wg.Done()
s.runRunner(ctx)
}()
wg.Wait()
}

// Start the queue. The queue can be in any state, including already started.
func (s *SerialQueue) Start() error {
return s.makeReq(schedReq{
reqType: schedReqType_Start,
resp: make(chan schedResp, 1),
})
}

// Pause the queue. The queue can be in any state, including already
// paused. Note that pausing the queue does not block on any
// currently running job to complete. A pattern to pause the queue
// with a guarantee that nothing is currently running is:
//
// s.InterruptSync(context.Background(), func() { s.Pause() })
func (s *SerialQueue) Pause() error {
return s.makeReq(schedReq{
reqType: schedReqType_Pause,
resp: make(chan schedResp, 1),
})
}

// Stop the queue. The queue can be in any state, including already
// stopped. Note that stopping the queue does not block on any
// currently running job to complete.
func (s *SerialQueue) Stop() error {
return s.makeReq(schedReq{
reqType: schedReqType_Stop,
resp: make(chan schedResp, 1),
})
}

// Purge the queue. All pending jobs will be dropped.
func (s *SerialQueue) Purge() error {
return s.makeReq(schedReq{
reqType: schedReqType_Purge,
resp: make(chan schedResp, 1),
})
}

// Run a high priority job on the SerialQueue, blocking for its completion.
// If done against a Paused queue, this could block indefinitely. The
// block for completion is gated on the |ctx|.
func (s *SerialQueue) InterruptSync(ctx context.Context, f func()) error {
w, err := s.submitWork(schedPriority_High, f)
if err != nil {
return err
}
select {
case <-w.done:
return nil
case <-ctx.Done():
return context.Cause(ctx)
case <-s.completed:
return ErrCompletedQueue
}
}

// Run a normal priority job on the SerialQueue, blocking for its completion.
// When done against a paused queue, this can block indefinitely.
func (s *SerialQueue) DoSync(ctx context.Context, f func()) error {
w, err := s.submitWork(schedPriority_Normal, f)
if err != nil {
return err
}
select {
case <-w.done:
return nil
case <-ctx.Done():
return context.Cause(ctx)
case <-s.completed:
return ErrCompletedQueue
}
}

// Run a high priority job asynchronously on the queue. Returns once the
// job is accepted.
func (s *SerialQueue) InterruptAsync(f func()) error {
_, err := s.submitWork(schedPriority_High, f)
if err != nil {
return err
}
return nil
}

// Run a normal priority job asynchronously on the queue. Returns once the
// job is accepted.
func (s *SerialQueue) DoAsync(f func()) error {
_, err := s.submitWork(schedPriority_Normal, f)
if err != nil {
return err
}
return nil
}

// Helper function to submit work. Returns the work submitted, if it
// was successful, and an error otherwise.
func (s *SerialQueue) submitWork(pri schedPriority, f func()) (work, error) {
w := work{
f: f,
done: make(chan struct{}),
}
err := s.makeReq(schedReq{
reqType: schedReqType_Enqueue,
pri: pri,
work: w,
resp: make(chan schedResp, 1),
})
if err != nil {
return work{}, err
}
return w, nil
}

func (s *SerialQueue) makeReq(req schedReq) error {
select {
case s.schedCh <- req:
resp := <-req.resp
return resp.err
case <-s.completed:
return ErrCompletedQueue
}
}

// Read off the input channels and maintain queues of pending work.
// Deliver that work to the runner channel if it is desired.
func (s *SerialQueue) runScheduler(ctx context.Context) {
state := schedState_Running
normalQ := circular.NewBuff[work](16)
highQ := circular.NewBuff[work](16)
for {
var sendWorkCh chan work
var sendWork work
var sentWorkCallback func()

if state == schedState_Running {
if highQ.Len() > 0 {
sendWorkCh = s.runnerCh
sendWork = highQ.Front()
sentWorkCallback = highQ.Pop
} else if normalQ.Len() > 0 {
sendWorkCh = s.runnerCh
sendWork = normalQ.Front()
sentWorkCallback = normalQ.Pop
}
}

select {
case msg := <-s.schedCh:
switch msg.reqType {
case schedReqType_Enqueue:
if state == schedState_Stopped {
msg.resp <- schedResp{
err: ErrStoppedQueue,
}
} else {
if msg.pri == schedPriority_High {
highQ.Push(msg.work)
} else {
normalQ.Push(msg.work)
}
msg.resp <- schedResp{
err: nil,
}
}
case schedReqType_Purge:
highQ = circular.NewBuff[work](highQ.Cap())
normalQ = circular.NewBuff[work](normalQ.Cap())
msg.resp <- schedResp{
err: nil,
}
case schedReqType_Start:
state = schedState_Running
msg.resp <- schedResp{
err: nil,
}
case schedReqType_Pause:
state = schedState_Paused
msg.resp <- schedResp{
err: nil,
}
case schedReqType_Stop:
state = schedState_Stopped
msg.resp <- schedResp{
err: nil,
}
}
case sendWorkCh <- sendWork:
// Pop from queue the work came from.
sentWorkCallback()
case <-ctx.Done():
return
}
}
}

// Read off the runner channel and run the submitted work.
func (s *SerialQueue) runRunner(ctx context.Context) {
for {
select {
case w := <-s.runnerCh:
w.f()
close(w.done)
case <-ctx.Done():
return
}
}
}

// |work| represents work to be run on the runner goroutine.
type work struct {
// The function to call.
f func()
// The channel to close after the work is run.
done chan struct{}
}

type schedState int

const (
// When scheduler is running, it is willing to accept new work
// and to give work to the work thread.
schedState_Running schedState = iota
// When scheduler is paused, it is willing to accept new work
// but it does not give work to the work thread.
schedState_Paused
// When scheduler is stopped, it does not accept new work
// and it does not give work to the work thread.
schedState_Stopped
)

type schedReqType int

const (
schedReqType_Enqueue schedReqType = iota
schedReqType_Purge
schedReqType_Start
schedReqType_Pause
schedReqType_Stop
)

type schedPriority int

const (
schedPriority_Normal schedPriority = iota
schedPriority_High
)

// Incoming message for the scheduler thread.
type schedReq struct {
reqType schedReqType
// Always set, the scheduler's response is
// sent through this channel. The send
// must never block.
resp chan schedResp
// Set when |reqType| is Enqueue
pri schedPriority
// Set when |reqType| is Enqueue
work work
}

type schedResp struct {
err error
}
Loading
Loading