Skip to content

Commit 51d9bd2

Browse files
committed
es: stop client is rolling test and add debug log for rolling index
1 parent f82979d commit 51d9bd2

File tree

3 files changed

+7
-12
lines changed

3 files changed

+7
-12
lines changed

storage/elasticsearch/client.go

+3-11
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,6 @@ type Client struct {
8282
esClient *elastic.Client
8383
bulkProcessor *elastic.BulkProcessor
8484
started atomic.Value
85-
quit chan bool
86-
wg sync.WaitGroup
8785
cfg Config
8886
indices map[string]Index
8987
rollService *rollIndexService
@@ -405,17 +403,12 @@ func (c *Client) Start() {
405403
// Stop Elasticsearch background client
406404
func (c *Client) Stop() {
407405
if c.started.Load() == true {
408-
c.quit <- true
409-
c.wg.Wait()
406+
if c.rollService != nil {
407+
c.rollService.stop()
408+
}
410409

411410
c.esClient.Stop()
412411
}
413-
414-
c.RLock()
415-
for _, l := range c.listeners {
416-
l.OnStarted()
417-
}
418-
c.RUnlock()
419412
}
420413

421414
// Started is the client already started ?
@@ -467,7 +460,6 @@ func NewClient(indices []Index, cfg Config, electionService common.MasterElectio
467460

468461
client := &Client{
469462
url: url,
470-
quit: make(chan bool, 1),
471463
cfg: cfg,
472464
indices: indicesMap,
473465
}

storage/elasticsearch/rollindex.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ func (r *rollIndexService) cleanup(index Index) {
8080
}
8181

8282
func (r *rollIndexService) roll(force bool) {
83-
logging.GetLogger().Debug("Start rolling indices...")
83+
logging.GetLogger().Debugf("Start rolling indices (forced: %v)...", force)
8484

8585
for _, index := range r.indices {
8686
ri := r.client.esClient.RolloverIndex(index.Alias())
@@ -104,6 +104,8 @@ func (r *rollIndexService) roll(force bool) {
104104
if needToRoll {
105105
logging.GetLogger().Infof("Index %s rolling over", index.Alias())
106106

107+
logging.GetLogger().Debugf("Rolling over with: %+v", ri)
108+
107109
resp, err := ri.Do(context.Background())
108110
if err != nil {
109111
logging.GetLogger().Errorf("Error while rolling index %s: %s", index.Alias(), err)

tests/elasticsearch_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ func TestRollingSimple(t *testing.T) {
101101
if err != nil {
102102
t.Fatal(err)
103103
}
104+
defer client.Stop()
104105

105106
for i := 0; i != 9; i++ {
106107
id := fmt.Sprintf("before-entry_%d", i)

0 commit comments

Comments
 (0)