Skip to content

Commit

Permalink
add execute delay when error
Browse files Browse the repository at this point in the history
  • Loading branch information
lzp0412 committed Jul 31, 2020
1 parent 7ebeabb commit 767eabe
Showing 1 changed file with 18 additions and 10 deletions.
28 changes: 18 additions & 10 deletions clients/config_client/config_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,10 @@ type ConfigClient struct {
configCacheDir string
}

const perTaskConfigSize = 3000
const (
perTaskConfigSize = 3000
executorErrDelay = 5 * time.Second
)

var (
currentTaskCount int
Expand Down Expand Up @@ -308,22 +311,25 @@ func (client *ConfigClient) ListenConfig(param vo.ConfigParam) (err error) {
//Delay Scheduler
//initialDelay the time to delay first execution
//delay the delay between the termination of one execution and the commencement of the next
func delayScheduler(t *time.Timer, delay time.Duration, taskId string, execute func()) {
func delayScheduler(t *time.Timer, delay time.Duration, taskId string, execute func() error) {
for {
if v, ok := schedulerMap.Get(taskId); ok {
if !v.(bool) {
return
}
}
<-t.C
execute()
t.Reset(delay)
d := delay
if err := execute(); err != nil {
d = executorErrDelay
}
t.Reset(d)
}
}

//Listen for the configuration executor
func listenConfigExecutor() func() {
return func() {
func listenConfigExecutor() func() error {
return func() error {
listenerSize := len(cacheMap.Keys())
taskCount := int(math.Ceil(float64(listenerSize) / float64(perTaskConfigSize)))

Expand All @@ -341,12 +347,13 @@ func listenConfigExecutor() func() {
}
currentTaskCount = taskCount
}
return nil
}
}

//Long polling listening configuration
func longPulling(taskId int) func() {
return func() {
func longPulling(taskId int) func() error {
return func() error {
var listeningConfigs string
var client *ConfigClient
initializationList := make([]cacheData, 0)
Expand All @@ -372,7 +379,7 @@ func longPulling(taskId int) func() {
clientConfig, err := client.GetClientConfig()
if err != nil {
logger.Errorf("[checkConfigInfo.GetClientConfig] err: %+v", err)
return
return err
}
// http get
params := make(map[string]string)
Expand All @@ -388,6 +395,7 @@ func longPulling(taskId int) func() {
} else {
logger.Errorf("[client.ListenConfig] listen config error: %+v", err)
}
return err
}
for _, v := range initializationList {
v.isInitializing = false
Expand All @@ -400,7 +408,7 @@ func longPulling(taskId int) func() {
client.callListener(changed, clientConfig.NamespaceId)
}
}

return nil
}

}
Expand Down

0 comments on commit 767eabe

Please sign in to comment.