Skip to content

Commit 2073cf3

Browse files
committed
quota: realtime usage update for metrics, logical/physical sizes, and data points
Without this patch, it is possible for some namespaces to have a window of 1-3 `carbonserver.quota-usage-report-frequency` to conduct quota enforcement penetration. There are two cases that need to be addressed, dir nodes that are already annotated with proper quota config, nodes that don't have quota config due to realtime insert via *CarbonserverListener.newMetricsChan. For annotated nodes, trie index resovle it via realtime metric counter update in *trieIndex.insert. For unannotated nodes, a new config are introduced, which is called transient-child-limit. With this config, during realtime insert, trieIndex would annotate the new dir nodes of parent nodes that enables that config. This is not the most proper way to address the issue, but a trade off of correctness, reliability, complexity, and performance. Re-iterating all quota configs for every realtime insert is nice but expensive. However, as always, I could be wrong. But this seems to be working in my tests.
1 parent 2671774 commit 2073cf3

File tree

5 files changed

+177
-8
lines changed

5 files changed

+177
-8
lines changed

carbon/config.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -420,6 +420,17 @@ retentions = 60:43200,3600:43800`), 0644)
420420

421421
func (c *Config) getCarbonserverQuotas() (quotas []*carbonserver.Quota) {
422422
for _, q := range c.Whisper.Quotas {
423+
var transientChild *carbonserver.Quota
424+
if q.TransientChildLimit > 0 {
425+
transientChild = &carbonserver.Quota{
426+
Pattern: "transient",
427+
Metrics: q.TransientChildLimit,
428+
Throughput: q.TransientChildLimit * 10,
429+
IsTransient: true,
430+
DroppingPolicy: carbonserver.QDPNew,
431+
}
432+
}
433+
423434
quotas = append(quotas, &carbonserver.Quota{
424435
Pattern: q.Pattern,
425436
Namespaces: q.Namespaces,
@@ -430,6 +441,8 @@ func (c *Config) getCarbonserverQuotas() (quotas []*carbonserver.Quota) {
430441
Throughput: q.Throughput,
431442
DroppingPolicy: carbonserver.ParseQuotaDroppingPolicy(q.DroppingPolicy),
432443
StatMetricPrefix: q.StatMetricPrefix,
444+
445+
TransientChild: transientChild,
433446
})
434447
}
435448

carbonserver/trie.go

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -380,8 +380,9 @@ type dirMeta struct {
380380

381381
func newDirMeta() *dirMeta { return &dirMeta{usage: &QuotaUsage{}} }
382382

383-
func (*dirMeta) trieMeta() {}
384-
func (dm *dirMeta) update(quota *Quota) { dm.quota.Store(quota) }
383+
func (*dirMeta) trieMeta() {}
384+
func (dm *dirMeta) setQuota(quota *Quota) { dm.quota.Store(quota) }
385+
func (dm *dirMeta) getQuota() *Quota { return dm.quota.Load().(*Quota) }
385386

386387
func (dm *dirMeta) withinQuota(metrics, namespaces, logical, physical, dataPoints int64) bool {
387388
quota, ok := dm.quota.Load().(*Quota)
@@ -538,6 +539,8 @@ func (ti *trieIndex) insert(path string, logicalSize, physicalSize, dataPoints,
538539
var start, nlen int
539540
var sn, newn *trieNode
540541
var cur = ti.root
542+
var dirNodes = [16]*trieNode{ti.root} // why arrray: avoiding heap allocation
543+
var dirNodesIdx = 1
541544
outer:
542545
// why len(path)+1: make sure the last node is also processed in the loop
543546
for i := 0; i < len(path)+1; i++ {
@@ -649,12 +652,36 @@ outer:
649652
if child.dir() {
650653
cur = child
651654
cur.gen = ti.root.gen
655+
656+
if dirNodesIdx < len(dirNodes) {
657+
dirNodes[dirNodesIdx] = child
658+
dirNodesIdx++
659+
}
652660
continue outer
653661
}
654662
}
655663

656664
if i < len(path) {
657665
newn = ti.newDir()
666+
667+
if dirNodesIdx < len(dirNodes) {
668+
if dirNodes[dirNodesIdx-1].meta != nil {
669+
// Purpose of transient child?
670+
//
671+
//
672+
pmeta := dirNodes[dirNodesIdx-1].meta
673+
pquota, ok := pmeta.(*dirMeta).quota.Load().(*Quota)
674+
if ok && pquota.TransientChild != nil {
675+
meta := newDirMeta()
676+
meta.setQuota(pquota.TransientChild)
677+
newn.meta = meta
678+
}
679+
}
680+
681+
dirNodes[dirNodesIdx] = newn
682+
dirNodesIdx++
683+
}
684+
658685
cur.addChild(newn)
659686
cur = newn
660687
}
@@ -720,7 +747,24 @@ outer:
720747
cur.addChild(child)
721748
cur = child
722749

750+
// TODO: need to support realtime and concurrent index?
723751
ti.fileCount++
752+
753+
for i := 0; i < dirNodesIdx; i++ {
754+
if dirNodes[i] == nil {
755+
continue
756+
}
757+
758+
meta, ok := dirNodes[i].meta.(*dirMeta)
759+
if !ok || meta.usage == nil {
760+
continue
761+
}
762+
763+
atomic.AddInt64(&meta.usage.Metrics, 1)
764+
atomic.AddInt64(&meta.usage.LogicalSize, logicalSize)
765+
atomic.AddInt64(&meta.usage.PhysicalSize, physicalSize)
766+
atomic.AddInt64(&meta.usage.DataPoints, dataPoints)
767+
}
724768
}
725769

726770
return cur, nil
@@ -1570,8 +1614,14 @@ type Quota struct {
15701614

15711615
DroppingPolicy QuotaDroppingPolicy
15721616
StatMetricPrefix string
1617+
1618+
// See details in trieIndex.insert
1619+
TransientChild *Quota
1620+
IsTransient bool // for avoid emitting metrics for transient quota nodes.
15731621
}
15741622

1623+
// var transientQuota = &Quota{Metrics: 10000, Throughput: 10000, DroppingPolicy: QDPNew, StatMetricPrefix: "transient"}
1624+
15751625
func (q *Quota) String() string {
15761626
return fmt.Sprintf("pattern:%s,dirs:%d,files:%d,points:%d,logical:%d,physical:%d,throughput:%d,policy:%s", q.Pattern, q.Namespaces, q.Metrics, q.DataPoints, q.LogicalSize, q.PhysicalSize, q.Throughput, q.DroppingPolicy)
15771627
}
@@ -1788,7 +1838,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota)
17881838
if quota.Pattern == "/" {
17891839
if !updateChecker["/"] {
17901840
meta := ti.root.meta.(*dirMeta)
1791-
meta.update(quota)
1841+
meta.setQuota(quota)
17921842
ti.throughputs.store("/", newThroughputUsagePerNamespace(ti.root.gen, quota, meta.usage))
17931843

17941844
updateChecker["/"] = true
@@ -1804,6 +1854,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota)
18041854

18051855
for i, node := range nodes {
18061856
if node.meta == nil {
1857+
// TODO: has data race in throttle, generateQuotaAndUsageMetrics and other places?
18071858
node.meta = newDirMeta()
18081859
}
18091860

@@ -1819,7 +1870,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota)
18191870

18201871
if !updateChecker[paths[i]] {
18211872
ti.throughputs.store(paths[i], newThroughputUsagePerNamespace(ti.root.gen, quota, meta.usage))
1822-
meta.update(quota)
1873+
meta.setQuota(quota)
18231874

18241875
updateChecker[paths[i]] = true
18251876
}
@@ -1838,7 +1889,7 @@ func (ti *trieIndex) applyQuotas(resetFrequency time.Duration, quotas ...*Quota)
18381889
}
18391890

18401891
// refreshUsage updates usage data and generate stat metrics.
1841-
// It can't be evoked with concurrent trieIndex.insert.
1892+
// It can't be evoked with concurrently trieIndex.insert.
18421893
func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files uint64) {
18431894
if throughputs == nil {
18441895
throughputs = newQuotaThroughputQuotaManager()
@@ -1895,8 +1946,10 @@ func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files ui
18951946
tname = name
18961947
}
18971948
var prefix string
1949+
var isTransient bool
18981950
if quota, ok := cur.node.meta.(*dirMeta).quota.Load().(*Quota); ok {
18991951
prefix = quota.StatMetricPrefix
1952+
isTransient = quota.IsTransient
19001953
}
19011954

19021955
var throughput int64
@@ -1906,7 +1959,9 @@ func (ti *trieIndex) refreshUsage(throughputs *throughputQuotaManager) (files ui
19061959

19071960
throttled := atomic.LoadInt64(&usage.Throttled)
19081961

1909-
ti.generateQuotaAndUsageMetrics(prefix, strings.ReplaceAll(name, ".", "-"), cur.node, throughput, throttled)
1962+
if !isTransient {
1963+
ti.generateQuotaAndUsageMetrics(prefix, strings.ReplaceAll(name, ".", "-"), cur.node, throughput, throttled)
1964+
}
19101965

19111966
if throttled > 0 {
19121967
atomic.AddInt64(&usage.Throttled, -throttled)

carbonserver/trie_test.go

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1493,6 +1493,11 @@ func TestTrieQuotaConcurrentApplyAndEnforce(t *testing.T) {
14931493
tindex.prune()
14941494

14951495
endc := make(chan struct{})
1496+
defer func() {
1497+
// make sure that tindex.applyQuotas ends properly.
1498+
endc <- struct{}{}
1499+
}()
1500+
14961501
loopCount := 10240
14971502

14981503
go func() {
@@ -1527,9 +1532,99 @@ func TestTrieQuotaConcurrentApplyAndEnforce(t *testing.T) {
15271532
break
15281533
}
15291534
}
1535+
}
1536+
1537+
func TestTrieQuotaRealtimeEnforcement(t *testing.T) {
1538+
tindex := newTrie(
1539+
".wsp",
1540+
func(metric string) (size, dataPoints int64) {
1541+
return 12 * 1024, 1024
1542+
},
1543+
)
1544+
1545+
tindex.root.gen++
1546+
tindex.insert("/sys/app/server-001/cpu.wsp", 0, 0, 0, 0)
1547+
tindex.insert("/sys/app/server-002/cpu.wsp", 0, 0, 0, 0)
1548+
tindex.insert("/sys/app/server-003/cpu.wsp", 0, 0, 0, 0)
1549+
tindex.prune()
1550+
1551+
refreshTriggers := struct {
1552+
start chan struct{}
1553+
done chan struct{}
1554+
end chan struct{}
1555+
}{
1556+
start: make(chan struct{}, 0),
1557+
done: make(chan struct{}, 0),
1558+
end: make(chan struct{}, 0),
1559+
}
1560+
defer func() {
1561+
// make sure that tindex.applyQuotas ends properly.
1562+
refreshTriggers.end <- struct{}{}
1563+
}()
1564+
1565+
go func() {
1566+
for {
1567+
select {
1568+
case <-refreshTriggers.end:
1569+
return
1570+
case <-refreshTriggers.start:
1571+
}
15301572

1531-
// make sure that tindex.applyQuotas ends properly.
1532-
endc <- struct{}{}
1573+
tindex.applyQuotas(
1574+
time.Minute,
1575+
&Quota{
1576+
Pattern: "/",
1577+
},
1578+
&Quota{
1579+
Pattern: "sys.app",
1580+
TransientChild: &Quota{
1581+
Pattern: "transient",
1582+
Metrics: 1_000,
1583+
DroppingPolicy: QDPNew,
1584+
},
1585+
},
1586+
&Quota{
1587+
Pattern: "sys.app.*",
1588+
Metrics: 10_000,
1589+
DroppingPolicy: QDPNew,
1590+
},
1591+
&Quota{
1592+
Pattern: "sys.app.server-001",
1593+
Metrics: 500_000,
1594+
},
1595+
)
1596+
1597+
refreshTriggers.done <- struct{}{}
1598+
}
1599+
}()
1600+
1601+
refreshTriggers.start <- struct{}{}
1602+
<-refreshTriggers.done
1603+
1604+
t.Run("existing quota meta", func(t *testing.T) {
1605+
for i := 0; i < 500_000; i++ {
1606+
tindex.insert(fmt.Sprintf("/sys/app/server-001/cpu-%d.wsp", i), 0, 0, 0, 0)
1607+
}
1608+
if !tindex.throttle(&points.Points{Metric: "sys.app.server-001.memory", Data: []points.Point{{}, {}, {}, {}}}, false) {
1609+
t.Error("should throlle with existing meta nodes with realtime stat")
1610+
}
1611+
})
1612+
1613+
t.Run("transient quota meta", func(t *testing.T) {
1614+
for i := 0; i < 1_000; i++ {
1615+
tindex.insert(fmt.Sprintf("/sys/app/server-004/cpu-%d.wsp", i), 0, 0, 0, 0)
1616+
}
1617+
if !tindex.throttle(&points.Points{Metric: "sys.app.server-004.memory", Data: []points.Point{{}, {}, {}, {}}}, false) {
1618+
t.Error("should throlle with transient meta nodes with realtime stat")
1619+
}
1620+
1621+
refreshTriggers.start <- struct{}{}
1622+
<-refreshTriggers.done
1623+
1624+
if tindex.throttle(&points.Points{Metric: "sys.app.server-004.memory", Data: []points.Point{{}, {}, {}, {}}}, false) {
1625+
t.Error("should not throttle with proper quota annotation")
1626+
}
1627+
})
15331628
}
15341629

15351630
func TestTrieQuotaWithProperHierarchicalThroughputEnforcement(t *testing.T) {

persister/ini.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"strings"
88
)
99

10+
// TODO: migrate to toml?
1011
func parseIniFile(filename string) ([]map[string]string, error) {
1112
body, err := ioutil.ReadFile(filename)
1213
if err != nil {

persister/whisper_quota.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ type Quota struct {
1818
Throughput int64
1919
DroppingPolicy string
2020
StatMetricPrefix string
21+
22+
TransientChildLimit int64
2123
}
2224

2325
type WhisperQuotas []Quota
@@ -70,6 +72,9 @@ func ReadWhisperQuotas(filename string) (WhisperQuotas, error) {
7072
if quota.Throughput, err = parseInt(section, "throughput"); err != nil {
7173
return nil, err
7274
}
75+
if quota.TransientChildLimit, err = parseInt(section, "transient-child-limit"); err != nil {
76+
return nil, err
77+
}
7378

7479
switch v := section["dropping-policy"]; v {
7580
case "new", "none", "":

0 commit comments

Comments
 (0)