Skip to content

Commit a3e9c86

Browse files
author
Test
committed
feat: add watcher mode for continuous execution
Watcher: - Monitors files for changes - Re-runs task on file changes - Configurable watch paths and interval - Stop/resume capability TaskQueue: - Queue management for batch tasks - Track completed/failed tasks - Statistics and summary BatchExecutor: - Execute multiple tasks concurrently - Configurable concurrency - Detailed results with success rate
1 parent 75b6b25 commit a3e9c86

1 file changed

Lines changed: 306 additions & 0 deletions

File tree

internal/autonomous/watcher.go

Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
package autonomous
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
"path/filepath"
8+
"strings"
9+
"sync"
10+
"time"
11+
)
12+
13+
type Watcher struct {
14+
cwd string
15+
paths []string
16+
interval time.Duration
17+
onFileChange func(path string)
18+
executor *Executor
19+
stopCh chan struct{}
20+
}
21+
22+
func NewWatcher(cwd string, paths []string, interval time.Duration) *Watcher {
23+
if len(paths) == 0 {
24+
paths = []string{"."}
25+
}
26+
return &Watcher{
27+
cwd: cwd,
28+
paths: paths,
29+
interval: interval,
30+
stopCh: make(chan struct{}),
31+
}
32+
}
33+
34+
func (w *Watcher) SetExecutor(executor *Executor) {
35+
w.executor = executor
36+
}
37+
38+
func (w *Watcher) OnFileChange(fn func(path string)) {
39+
w.onFileChange = fn
40+
}
41+
42+
func (w *Watcher) Start(ctx context.Context, task string) error {
43+
fmt.Printf("[WATCH] Starting watcher on: %s\n", strings.Join(w.paths, ", "))
44+
fmt.Printf("[WATCH] Interval: %v\n", w.interval)
45+
fmt.Printf("[WATCH] Task: %s\n\n", task)
46+
47+
// Track file modifications
48+
modTimes := make(map[string]time.Time)
49+
for _, path := range w.paths {
50+
w.updateModTimes(path, modTimes)
51+
}
52+
53+
ticker := time.NewTicker(w.interval)
54+
defer ticker.Stop()
55+
56+
// Run initial execution
57+
fmt.Println("[WATCH] Running initial task...")
58+
if err := w.runTask(ctx, task); err != nil {
59+
fmt.Printf("[WATCH] Initial task failed: %v\n", err)
60+
}
61+
62+
for {
63+
select {
64+
case <-ctx.Done():
65+
fmt.Println("[WATCH] Context cancelled, stopping...")
66+
return nil
67+
case <-w.stopCh:
68+
fmt.Println("[WATCH] Stopped by user")
69+
return nil
70+
case <-ticker.C:
71+
if w.checkChanges(modTimes) {
72+
fmt.Println("[WATCH] File changes detected, running task...")
73+
if err := w.runTask(ctx, task); err != nil {
74+
fmt.Printf("[WATCH] Task failed: %v\n", err)
75+
}
76+
w.updateModTimes(w.paths[0], modTimes)
77+
}
78+
}
79+
}
80+
}
81+
82+
func (w *Watcher) Stop() {
83+
close(w.stopCh)
84+
}
85+
86+
func (w *Watcher) checkChanges(modTimes map[string]time.Time) bool {
87+
for _, path := range w.paths {
88+
changed := w.hasChanges(path, modTimes)
89+
if changed {
90+
return true
91+
}
92+
}
93+
return false
94+
}
95+
96+
func (w *Watcher) hasChanges(path string, modTimes map[string]time.Time) bool {
97+
absPath := w.cwd
98+
if !strings.HasPrefix(path, "/") {
99+
absPath = filepath.Join(w.cwd, path)
100+
}
101+
102+
info, err := os.Stat(absPath)
103+
if err != nil {
104+
return false
105+
}
106+
107+
if info.IsDir() {
108+
entries, err := os.ReadDir(absPath)
109+
if err != nil {
110+
return false
111+
}
112+
for _, entry := range entries {
113+
if entry.IsDir() {
114+
continue
115+
}
116+
fullPath := filepath.Join(absPath, entry.Name())
117+
if w.hasChanges(fullPath, modTimes) {
118+
return true
119+
}
120+
}
121+
return false
122+
}
123+
124+
modTime, ok := modTimes[absPath]
125+
if !ok || info.ModTime().After(modTime) {
126+
return true
127+
}
128+
return false
129+
}
130+
131+
func (w *Watcher) updateModTimes(path string, modTimes map[string]time.Time) {
132+
absPath := w.cwd
133+
if !strings.HasPrefix(path, "/") {
134+
absPath = filepath.Join(w.cwd, path)
135+
}
136+
137+
info, err := os.Stat(absPath)
138+
if err != nil {
139+
return
140+
}
141+
142+
if info.IsDir() {
143+
entries, err := os.ReadDir(absPath)
144+
if err != nil {
145+
return
146+
}
147+
for _, entry := range entries {
148+
if entry.IsDir() {
149+
w.updateModTimes(filepath.Join(absPath, entry.Name()), modTimes)
150+
} else {
151+
fullPath := filepath.Join(absPath, entry.Name())
152+
info, _ := os.Stat(fullPath)
153+
if info != nil {
154+
modTimes[fullPath] = info.ModTime()
155+
}
156+
}
157+
}
158+
return
159+
}
160+
161+
modTimes[absPath] = info.ModTime()
162+
}
163+
164+
func (w *Watcher) runTask(ctx context.Context, task string) error {
165+
if w.executor == nil {
166+
return fmt.Errorf("no executor configured")
167+
}
168+
return w.executor.Execute(ctx, task)
169+
}
170+
171+
type TaskQueue struct {
172+
tasks []string
173+
pending []string
174+
completed []string
175+
failed []string
176+
mu int
177+
}
178+
179+
func NewTaskQueue(tasks []string) *TaskQueue {
180+
return &TaskQueue{
181+
tasks: tasks,
182+
pending: make([]string, len(tasks)),
183+
completed: []string{},
184+
failed: []string{},
185+
}
186+
}
187+
188+
func (q *TaskQueue) Next() string {
189+
if len(q.tasks) == 0 {
190+
return ""
191+
}
192+
task := q.tasks[0]
193+
q.tasks = q.tasks[1:]
194+
return task
195+
}
196+
197+
func (q *TaskQueue) MarkCompleted(task string) {
198+
q.completed = append(q.completed, task)
199+
}
200+
201+
func (q *TaskQueue) MarkFailed(task string) {
202+
q.failed = append(q.failed, task)
203+
}
204+
205+
func (q *TaskQueue) Stats() (total, pending, completed, failed int) {
206+
return len(q.tasks) + len(q.pending) + len(q.completed) + len(q.failed),
207+
len(q.pending),
208+
len(q.completed),
209+
len(q.failed)
210+
}
211+
212+
func (q *TaskQueue) Summary() string {
213+
total, pending, completed, failed := q.Stats()
214+
return fmt.Sprintf("Total: %d, Pending: %d, Completed: %d, Failed: %d",
215+
total, pending, completed, failed)
216+
}
217+
218+
type BatchExecutor struct {
219+
concurrency int
220+
executor *Executor
221+
}
222+
223+
func NewBatchExecutor(concurrency int) *BatchExecutor {
224+
return &BatchExecutor{
225+
concurrency: concurrency,
226+
}
227+
}
228+
229+
func (be *BatchExecutor) SetExecutor(executor *Executor) {
230+
be.executor = executor
231+
}
232+
233+
func (be *BatchExecutor) ExecuteAll(ctx context.Context, tasks []string) *BatchResult {
234+
result := &BatchResult{
235+
Tasks: make([]TaskResult, len(tasks)),
236+
StartTime: time.Now(),
237+
}
238+
239+
sem := make(chan struct{}, be.concurrency)
240+
taskCh := make(chan int, len(tasks))
241+
242+
// Start workers
243+
for i := range tasks {
244+
taskCh <- i
245+
}
246+
close(taskCh)
247+
248+
for i := range taskCh {
249+
select {
250+
case sem <- struct{}{}:
251+
go func(idx int) {
252+
task := tasks[idx]
253+
start := time.Now()
254+
255+
err := be.executor.Execute(ctx, task)
256+
257+
result.mu.Lock()
258+
result.Tasks[idx] = TaskResult{
259+
Task: task,
260+
Success: err == nil,
261+
Error: err,
262+
Duration: time.Since(start),
263+
}
264+
if err == nil {
265+
result.Completed++
266+
} else {
267+
result.Failed++
268+
}
269+
result.mu.Unlock()
270+
271+
<-sem
272+
}(i)
273+
}
274+
}
275+
276+
result.Duration = time.Since(result.StartTime)
277+
return result
278+
}
279+
280+
type BatchResult struct {
281+
Tasks []TaskResult
282+
Completed int
283+
Failed int
284+
Duration time.Duration
285+
StartTime time.Time
286+
mu sync.Mutex
287+
}
288+
289+
type TaskResult struct {
290+
Task string
291+
Success bool
292+
Error error
293+
Duration time.Duration
294+
}
295+
296+
func (r *BatchResult) Summary() string {
297+
return fmt.Sprintf("Completed: %d/%d, Failed: %d/%d, Duration: %v",
298+
r.Completed, len(r.Tasks), r.Failed, len(r.Tasks), r.Duration)
299+
}
300+
301+
func (r *BatchResult) SuccessRate() float64 {
302+
if len(r.Tasks) == 0 {
303+
return 0
304+
}
305+
return float64(r.Completed) / float64(len(r.Tasks)) * 100
306+
}

0 commit comments

Comments
 (0)