This repository was archived by the owner on Apr 21, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnunchakus.go
118 lines (98 loc) · 1.83 KB
/
nunchakus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package nunchakus
import (
"strings"
"time"
)
// Producer -> queue -> Consumer
type Consumer interface {
GetConcurrency () int
Run (q chan string)
}
type Producer interface {
Run(progress Progress) chan string
}
// 保存进度,续跑使用
type Progress interface {
Save(i int)
Load() int
}
type Queue interface {
Push(item string) bool
Pop() string
PopN(n int) []string
}
type Nunchakus struct {
taskName string
consumer Consumer
producer Producer
progress Progress
queue Queue
}
type Setter func (nc *Nunchakus)
func WithTaskName(name string) Setter {
return func(nc *Nunchakus) {
name = strings.Replace(name, " ", "_", -1)
nc.taskName = name
}
}
func WithQueue(q Queue) Setter {
return func(nc *Nunchakus) {
nc.queue = q
}
}
func WithProducer(p Producer) Setter {
return func(nc *Nunchakus) {
nc.producer = p
}
}
func WithConsumer(c Consumer) Setter {
return func(nc *Nunchakus) {
nc.consumer = c
}
}
func NewNunchakus(opts ... Setter ) *Nunchakus {
nc := &Nunchakus{}
if len(opts) > 0 {
for _, opt := range opts {
opt(nc)
}
}
if nc.taskName == "" {
nc.taskName = "Nunchakus"
}
if nc.progress == nil {
nc.progress = NewFileProcess("")
}
if nc.queue == nil {
nc.queue = NewRedisQueue(nil, nc.taskName)
}
return nc
}
func (nc *Nunchakus) StartProducer() {
if nc.producer == nil {
panic("producer is required")
}
data := nc.producer.Run(nc.progress)
for item := range data {
nc.queue.Push(item)
}
}
func (nc *Nunchakus) StartConsumer() {
if nc.consumer == nil {
panic("consumer is required")
}
n := nc.consumer.GetConcurrency()
dataChan := make(chan string, n)
println("Concurrency:", n)
for i := 0; i < n; i++ {
go nc.consumer.Run(dataChan)
}
for {
message := nc.queue.Pop()
if message == "" {
time.Sleep(time.Second)
continue
}
dataChan <- message
}
}