-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcenter.go
128 lines (107 loc) · 2.87 KB
/
center.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package pan
import (
consulapi "github.com/hashicorp/consul/api"
"log"
"time"
"strings"
"encoding/json"
"fmt"
)
// read data from consul kv and watch it
type CenterConfig struct {
Address string
Scheme string
Namespace string
NamespaceMap string //string map for generate the key path. like "/Configs/{namespace}/{key}"
ContentType string //type of consul value, default json
Key string
keyIndex uint64
watch bool //default watch true
Interval time.Duration
}
func ConsulKV(config CenterConfig) *consulapi.KV {
cconf := consulapi.DefaultConfig()
cconf.Address = config.Address
cconf.Scheme = config.Scheme
client, err := consulapi.NewClient(cconf)
if err != nil{
log.Panicf("Error creating consul client", err)
}
kv := client.KV()
return kv
}
func KVGet(key string, kv *consulapi.KV)([]byte,uint64,error){
pair,meta,err := kv.Get(key,nil)
if err != nil {
log.Fatal(err)
}
if pair == nil {
fmt.Println("Frist GeT Nil from center,Your key is:", key)
return nil,uint64(0),nil
}
return pair.Value,meta.LastIndex,err
}
func WatchKey(key string, ch chan []byte, kv *consulapi.KV, keyIndex uint64) {
currentIndex := keyIndex
for {
pair, meta, err := kv.Get(key, &consulapi.QueryOptions{
WaitIndex: currentIndex,
})
if err != nil {
fmt.Println("Error for get key,I will sleep 2 mins:",err)
time.Sleep(2 * time.Minute)
}
if pair == nil || meta == nil {
// Query won't be blocked if key not found
//time.Sleep(1 * time.Second)
} else {
ch <- pair.Value
currentIndex = meta.LastIndex
}
}
}
func WatchKeyWithInterval(key string, ch chan []byte, kv *consulapi.KV, keyIndex uint64, interval time.Duration) {
currentIndex := keyIndex
for {
pair, meta, err := kv.Get(key,nil)
if err != nil {
fmt.Println("Error for get key,I will sleep another 2 mins:",err)
time.Sleep(2 * time.Minute)
}
if pair == nil || meta == nil {
// Query won't be blocked if key not found
//time.Sleep(1 * time.Second)
} else if meta.LastIndex != currentIndex {
ch <- pair.Value
currentIndex = meta.LastIndex
}
time.Sleep(interval)
}
}
func (p *Pan) ReadCenterWithWatch() {
kv := ConsulKV(p.CenterConfig)
ch := make(chan []byte)
key := Sformat(p.CenterConfig.NamespaceMap,"{namespace}",p.CenterConfig.Namespace, "{key}",p.CenterConfig.Key)
centerMap := make(map[string]interface{})
//first get key from center
data,i1,err := KVGet(key,kv)
if err != nil{
fmt.Println("error when get key :",err)
}else {
json.Unmarshal(data, ¢erMap)
p.center = UpMapKey(¢erMap)
// watch
go WatchKeyWithInterval(key, ch, kv, i1, p.CenterConfig.Interval)
go func() {
for data := range ch {
json.Unmarshal(data, ¢erMap)
p.center = UpMapKey(¢erMap)
}
}()
}
}
func Sformat(str string, args ...string) string {
r := strings.NewReplacer(args...)
ss := r.Replace(str)
return ss
}