Skip to content

Commit

Permalink
fix: status aggregation & status based tree runner scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
fabiankachlock committed Jul 23, 2024
1 parent f3dd268 commit 4a2e1a2
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 18 deletions.
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"request": "launch",
"mode": "debug",
"program": "./cmd/zwooc/main.go",
"args": ["build", "-to", "-q", "dev"]
"args": ["exec", "print"]
}
]
}
5 changes: 4 additions & 1 deletion pkg/runner/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ func (t *TreeStatusNode) GetDirectChildren() []*TreeStatusNode {
children := []*TreeStatusNode{}
children = append(children, t.PreNodes...)
children = append(children, t.PostNodes...)
children = append(children, t)
return children
}

Expand All @@ -95,6 +94,10 @@ func (t *TreeStatusNode) IsDone() bool {

func (t *TreeStatusNode) Update() {
children := t.GetDirectChildren()
if len(children) == 0 {
t.AggregatedStatus = t.Status
return
}

// applies the status based on children by precedence -> the order of the if statements matters
if someChildWithStatus(children, StatusError) {
Expand Down
20 changes: 10 additions & 10 deletions pkg/runner/tree_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package runner

import (
"errors"
"fmt"
"sync"
"sync/atomic"

Expand Down Expand Up @@ -78,7 +77,7 @@ func (r *TaskTreeRunner) Cancel() error {
func (r *TaskTreeRunner) updateTaskStatus(node *tasks.TaskTreeNode, status TaskStatus) {
r.mutex.Lock()
statusNode := findStatus(r.statusTree, node)
statusNode.AggregatedStatus = status
statusNode.Status = status
statusNode.Update()
r.updates <- statusNode
r.mutex.Unlock()
Expand Down Expand Up @@ -173,8 +172,8 @@ func (r *TaskTreeRunner) Start() error {
case <-r.cancel:
// run was canceled - forward cancel to all tasks
r.wasCanceled.Store(true)
for id, cancel := range r.forwardCancel {
fmt.Println("cancel", id)
for _, cancel := range r.forwardCancel {
// fmt.Println("cancel", id)
cancel <- true
}
return
Expand All @@ -186,6 +185,7 @@ func (r *TaskTreeRunner) Start() error {

// start scheduling
wg.Add(1)

startingNodes := getStartingNodes(r.root)
for _, node := range startingNodes {
r.scheduledNodes <- node
Expand All @@ -207,16 +207,16 @@ func (r *TaskTreeRunner) scheduleNext(node *tasks.TaskTreeNode) {
}

statusNode := findStatus(r.statusTree, node)
if statusNode.IsPre() && allChildrenWithStatus(statusNode.PreNodes, StatusDone) {
r.scheduledNodes <- node
} else if !statusNode.IsPost() {
if statusNode.IsPre() && allChildrenWithStatus(statusNode.Parent.PreNodes, StatusDone) {
r.scheduledNodes <- node.Parent
} else if allDone(r.statusTree) {
close(r.scheduledNodes)
} else if !statusNode.IsPre() && !statusNode.IsPost() {
for _, post := range node.Post {
for _, scheduled := range getStartingNodes(post) {
r.scheduledNodes <- scheduled
}
}
} else if allDone(r.statusTree) {
close(r.scheduledNodes)
}
}

Expand Down Expand Up @@ -260,7 +260,7 @@ func buildStatus(root *tasks.TaskTreeNode) *TreeStatusNode {
}

func allDone(status *TreeStatusNode) bool {
if status.AggregatedStatus != StatusPending && status.AggregatedStatus != StatusRunning {
if status.Status == StatusPending || status.Status == StatusRunning || status.Status == StatusScheduled {
return false
}

Expand Down
21 changes: 21 additions & 0 deletions pkg/ui/tree_progress.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"

tea "github.com/charmbracelet/bubbletea"
"github.com/zwoo-hq/zwooc/pkg/helper"
"github.com/zwoo-hq/zwooc/pkg/tasks"
)

Expand All @@ -18,6 +19,8 @@ type TreeProgressView struct {
provider SimpleStatusProvider
mu sync.RWMutex
wasCanceled bool
c int
ups []TreeProgressUpdateMsg
}

type TreeProgressUpdateMsg StatusUpdate
Expand All @@ -29,6 +32,7 @@ func NewTreeProgressView(forest tasks.Collection, status SimpleStatusProvider, o
provider: status,
status: map[string]TaskStatus{},
outputs: map[string]*tasks.CommandCapturer{},
ups: []TreeProgressUpdateMsg{},
}

model.setupDefaultStatus()
Expand All @@ -45,6 +49,8 @@ func NewTreeProgressView(forest tasks.Collection, status SimpleStatusProvider, o

func (m *TreeProgressView) Init() tea.Cmd {
m.provider.Start()
// TODO: dispatch start as command
// TODO: dispatch wait for done as command
return tea.Batch(m.listenToUpdates)
}

Expand Down Expand Up @@ -72,6 +78,9 @@ func (m *TreeProgressView) Update(message tea.Msg) (tea.Model, tea.Cmd) {
m.mu.Lock()
m.status[msg.NodeID] = msg.Status
m.mu.Unlock()
m.c++
m.ups = append(m.ups, msg)
return m, m.listenToUpdates
}

return m, nil
Expand All @@ -81,8 +90,20 @@ func (m *TreeProgressView) listenToUpdates() tea.Msg {
return TreeProgressUpdateMsg(<-m.provider.status)
}

type X struct {
N string
S TaskStatus
}

func (m *TreeProgressView) View() (s string) {
s += zwoocBranding + "\n"
s += "Updates received: " + fmt.Sprint(m.c) + "\n"
s += "U:" + fmt.Sprintf("%+v", helper.MapTo(m.ups, func(in TreeProgressUpdateMsg) X {
return X{
N: in.NodeID,
S: in.Status,
}
})) + "\n"
for _, tree := range m.tasks {
s += tree.Name + "\n"
tree.Iterate(func(node *tasks.TaskTreeNode) {
Expand Down
12 changes: 6 additions & 6 deletions pkg/zwooc/ui_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,12 @@ func createSimpleForestRunner(forest tasks.Collection, maxConcurrency int) ui.Si
return currentRunner.Start()
})
}

// collect done
go func() {
err := errs.Wait()
statusProvider.Done(err)
}()
})

// forward cancel
Expand All @@ -46,12 +52,6 @@ func createSimpleForestRunner(forest tasks.Collection, maxConcurrency int) ui.Si
}()
}

// collect done
go func() {
err := errs.Wait()
statusProvider.Done(err)
}()

return statusProvider
}

Expand Down

0 comments on commit 4a2e1a2

Please sign in to comment.