diff --git a/carbon/config.go b/carbon/config.go index 71034930f..48273154c 100644 --- a/carbon/config.go +++ b/carbon/config.go @@ -420,6 +420,17 @@ retentions = 60:43200,3600:43800`), 0644) func (c *Config) getCarbonserverQuotas() (quotas []*carbonserver.Quota) { for _, q := range c.Whisper.Quotas { + var transientChild *carbonserver.Quota + if q.TransientChildLimit > 0 { + transientChild = &carbonserver.Quota{ + Pattern: "transient", + Metrics: q.TransientChildLimit, + Throughput: q.TransientChildLimit * 10, + IsTransient: true, + DroppingPolicy: carbonserver.QDPNew, + } + } + quotas = append(quotas, &carbonserver.Quota{ Pattern: q.Pattern, Namespaces: q.Namespaces, @@ -430,6 +441,8 @@ func (c *Config) getCarbonserverQuotas() (quotas []*carbonserver.Quota) { Throughput: q.Throughput, DroppingPolicy: carbonserver.ParseQuotaDroppingPolicy(q.DroppingPolicy), StatMetricPrefix: q.StatMetricPrefix, + + TransientChild: transientChild, }) } diff --git a/carbonserver/trie.go b/carbonserver/trie.go index bc3694e65..03aad9c72 100644 --- a/carbonserver/trie.go +++ b/carbonserver/trie.go @@ -380,8 +380,9 @@ type dirMeta struct { func newDirMeta() *dirMeta { return &dirMeta{usage: &QuotaUsage{}} } -func (*dirMeta) trieMeta() {} -func (dm *dirMeta) update(quota *Quota) { dm.quota.Store(quota) } +func (*dirMeta) trieMeta() {} +func (dm *dirMeta) setQuota(quota *Quota) { dm.quota.Store(quota) } +func (dm *dirMeta) getQuota() *Quota { return dm.quota.Load().(*Quota) } func (dm *dirMeta) withinQuota(metrics, namespaces, logical, physical, dataPoints int64) bool { quota, ok := dm.quota.Load().(*Quota) @@ -538,6 +539,8 @@ func (ti *trieIndex) insert(path string, logicalSize, physicalSize, dataPoints, var start, nlen int var sn, newn *trieNode var cur = ti.root + var dirNodes = [16]*trieNode{ti.root} // why arrray: avoiding heap allocation + var dirNodesIdx = 1 outer: // why len(path)+1: make sure the last node is also processed in the loop for i := 0; i < len(path)+1; i++ { @@ -649,12 +652,36 @@ outer: if child.dir() { cur = child cur.gen = ti.root.gen + + if dirNodesIdx < len(dirNodes) { + dirNodes[dirNodesIdx] = child + dirNodesIdx++ + } continue outer } } if i < len(path) { newn = ti.newDir() + + if dirNodesIdx < len(dirNodes) { + if dirNodes[dirNodesIdx-1].meta != nil { + // Purpose of transient child? + // + // + pmeta := dirNodes[dirNodesIdx-1].meta + pquota, ok := pmeta.(*dirMeta).quota.Load().(*Quota) + if ok && pquota.TransientChild != nil { + meta := newDirMeta() + meta.setQuota(pquota.TransientChild) + newn.meta = meta + } + } + + dirNodes[dirNodesIdx] = newn + dirNodesIdx++ + } + cur.addChild(newn) cur = newn } @@ -720,7 +747,24 @@ outer: cur.addChild(child) cur = child + // TODO: need to support realtime and concurrent index? ti.fileCount++ + + for i := 0; i < dirNodesIdx; i++ { + if dirNodes[i] == nil { + continue + } + + meta, ok := dirNodes[i].meta.(*dirMeta) + if !ok || meta.usage == nil { + continue + } + + atomic.AddInt64(&meta.usage.Metrics, 1) + atomic.AddInt64(&meta.usage.LogicalSize, logicalSize) + atomic.AddInt64(&meta.usage.PhysicalSize, physicalSize) + atomic.AddInt64(&meta.usage.DataPoints, dataPoints) + } } return cur, nil @@ -1570,8 +1614,14 @@ type Quota struct { DroppingPolicy QuotaDroppingPolicy StatMetricPrefix string + + // See details in trieIndex.insert + TransientChild *Quota + IsTransient bool // for avoid emitting metrics for transient quota nodes. } +// var transientQuota = &Quota{Metrics: 10000, Throughput: 10000, DroppingPolicy: QDPNew, StatMetricPrefix: "transient"} + func (q *Quota) String() string { return fmt.Sprintf("pattern:%s,dirs:%d,files:%d,points:%d,logical:%d,physical:%d,throughput:%d,policy:%s", q.Pattern, q.Namespaces, q.Metrics, q.DataPoints, q.LogicalSize, q.PhysicalSize, q.Throughput, q.DroppingPolicy) } @@ -1788,7 +1838,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota) if quota.Pattern == "/" { if !updateChecker["/"] { meta := ti.root.meta.(*dirMeta) - meta.update(quota) + meta.setQuota(quota) ti.throughputs.store("/", newThroughputUsagePerNamespace(ti.root.gen, quota, meta.usage)) updateChecker["/"] = true @@ -1804,6 +1854,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota) for i, node := range nodes { if node.meta == nil { + // TODO: has data race in throttle, generateQuotaAndUsageMetrics and other places? node.meta = newDirMeta() } @@ -1819,7 +1870,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota) if !updateChecker[paths[i]] { ti.throughputs.store(paths[i], newThroughputUsagePerNamespace(ti.root.gen, quota, meta.usage)) - meta.update(quota) + meta.setQuota(quota) updateChecker[paths[i]] = true } @@ -1838,7 +1889,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota) } // refreshUsage updates usage data and generate stat metrics. -// It can't be evoked with concurrent trieIndex.insert. +// It can't be evoked with concurrently trieIndex.insert. func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files uint64) { if throughputs == nil { throughputs = newQuotaThroughputQuotaManager() @@ -1895,8 +1946,10 @@ func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files ui tname = name } var prefix string + var isTransient bool if quota, ok := cur.node.meta.(*dirMeta).quota.Load().(*Quota); ok { prefix = quota.StatMetricPrefix + isTransient = quota.IsTransient } var throughput int64 @@ -1906,7 +1959,9 @@ func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files ui throttled := atomic.LoadInt64(&usage.Throttled) - ti.generateQuotaAndUsageMetrics(prefix, strings.ReplaceAll(name, ".", "-"), cur.node, throughput, throttled) + if !isTransient { + ti.generateQuotaAndUsageMetrics(prefix, strings.ReplaceAll(name, ".", "-"), cur.node, throughput, throttled) + } if throttled > 0 { atomic.AddInt64(&usage.Throttled, -throttled) diff --git a/carbonserver/trie_test.go b/carbonserver/trie_test.go index fd4ff8375..be6a6a0d0 100644 --- a/carbonserver/trie_test.go +++ b/carbonserver/trie_test.go @@ -1493,6 +1493,11 @@ func TestTrieQuotaConcurrentApplyAndEnforce(t *testing.T) { tindex.prune() endc := make(chan struct{}) + defer func() { + // make sure that tindex.applyQuotas ends properly. + endc <- struct{}{} + }() + loopCount := 10240 go func() { @@ -1527,9 +1532,99 @@ func TestTrieQuotaConcurrentApplyAndEnforce(t *testing.T) { break } } +} + +func TestTrieQuotaRealtimeEnforcement(t *testing.T) { + tindex := newTrie( + ".wsp", + func(metric string) (size, dataPoints int64) { + return 12 * 1024, 1024 + }, + ) + + tindex.root.gen++ + tindex.insert("/sys/app/server-001/cpu.wsp", 0, 0, 0, 0) + tindex.insert("/sys/app/server-002/cpu.wsp", 0, 0, 0, 0) + tindex.insert("/sys/app/server-003/cpu.wsp", 0, 0, 0, 0) + tindex.prune() + + refreshTriggers := struct { + start chan struct{} + done chan struct{} + end chan struct{} + }{ + start: make(chan struct{}, 0), + done: make(chan struct{}, 0), + end: make(chan struct{}, 0), + } + defer func() { + // make sure that tindex.applyQuotas ends properly. + refreshTriggers.end <- struct{}{} + }() + + go func() { + for { + select { + case <-refreshTriggers.end: + return + case <-refreshTriggers.start: + } - // make sure that tindex.applyQuotas ends properly. - endc <- struct{}{} + tindex.applyQuotas( + time.Minute, + &Quota{ + Pattern: "/", + }, + &Quota{ + Pattern: "sys.app", + TransientChild: &Quota{ + Pattern: "transient", + Metrics: 1_000, + DroppingPolicy: QDPNew, + }, + }, + &Quota{ + Pattern: "sys.app.*", + Metrics: 10_000, + DroppingPolicy: QDPNew, + }, + &Quota{ + Pattern: "sys.app.server-001", + Metrics: 500_000, + }, + ) + + refreshTriggers.done <- struct{}{} + } + }() + + refreshTriggers.start <- struct{}{} + <-refreshTriggers.done + + t.Run("existing quota meta", func(t *testing.T) { + for i := 0; i < 500_000; i++ { + tindex.insert(fmt.Sprintf("/sys/app/server-001/cpu-%d.wsp", i), 0, 0, 0, 0) + } + if !tindex.throttle(&points.Points{Metric: "sys.app.server-001.memory", Data: []points.Point{{}, {}, {}, {}}}, false) { + t.Error("should throlle with existing meta nodes with realtime stat") + } + }) + + t.Run("transient quota meta", func(t *testing.T) { + for i := 0; i < 1_000; i++ { + tindex.insert(fmt.Sprintf("/sys/app/server-004/cpu-%d.wsp", i), 0, 0, 0, 0) + } + if !tindex.throttle(&points.Points{Metric: "sys.app.server-004.memory", Data: []points.Point{{}, {}, {}, {}}}, false) { + t.Error("should throlle with transient meta nodes with realtime stat") + } + + refreshTriggers.start <- struct{}{} + <-refreshTriggers.done + + if tindex.throttle(&points.Points{Metric: "sys.app.server-004.memory", Data: []points.Point{{}, {}, {}, {}}}, false) { + t.Error("should not throttle with proper quota annotation") + } + }) } func TestTrieQuotaWithProperHierarchicalThroughputEnforcement(t *testing.T) { diff --git a/persister/ini.go b/persister/ini.go index 40cb179e5..7ef25561c 100644 --- a/persister/ini.go +++ b/persister/ini.go @@ -7,6 +7,7 @@ import ( "strings" ) +// TODO: migrate to toml? func parseIniFile(filename string) ([]map[string]string, error) { body, err := ioutil.ReadFile(filename) if err != nil { diff --git a/persister/whisper_quota.go b/persister/whisper_quota.go index be115de60..b92e15a98 100644 --- a/persister/whisper_quota.go +++ b/persister/whisper_quota.go @@ -18,6 +18,8 @@ type Quota struct { Throughput int64 DroppingPolicy string StatMetricPrefix string + + TransientChildLimit int64 } type WhisperQuotas []Quota @@ -70,6 +72,9 @@ func ReadWhisperQuotas(filename string) (WhisperQuotas, error) { if quota.Throughput, err = parseInt(section, "throughput"); err != nil { return nil, err } + if quota.TransientChildLimit, err = parseInt(section, "transient-child-limit"); err != nil { + return nil, err + } switch v := section["dropping-policy"]; v { case "new", "none", "":