Skip to content

Commit 43c1e66

Browse files
jiekunvalyala
andauthored
logstorage: support GetTenants low level API (#709)
Co-authored-by: Aliaksandr Valialkin <[email protected]>
1 parent 52c5f8a commit 43c1e66

File tree

4 files changed

+211
-0
lines changed

4 files changed

+211
-0
lines changed

lib/logstorage/indexdb.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,47 @@ func (is *indexSearch) getStreamIDsForTagRegexp(tenantID TenantID, tagName strin
470470
return ids
471471
}
472472

473+
func (is *indexSearch) getTenantIDs() []TenantID {
474+
var tenantIDs []TenantID // return as result
475+
var tenantID TenantID // variable for unmarshal
476+
477+
ts := &is.ts
478+
kb := &is.kb
479+
480+
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tenantID)
481+
ts.Seek(kb.B)
482+
483+
for ts.NextItem() {
484+
_, prefix, err := unmarshalCommonPrefix(&tenantID, ts.Item)
485+
if err != nil {
486+
logger.Panicf("FATAL: cannot unmarshal tenantID: %s", err)
487+
}
488+
if prefix != nsPrefixStreamID {
489+
// Reached the end of entries with the needed prefix.
490+
break
491+
}
492+
tenantIDs = append(tenantIDs, tenantID)
493+
// Seek for the next (accountID, projectID)
494+
tenantID.ProjectID++
495+
if tenantID.ProjectID == 0 {
496+
tenantID.AccountID++
497+
if tenantID.AccountID == 0 {
498+
// Reached the end (accountID, projectID) space
499+
break
500+
}
501+
}
502+
503+
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tenantID)
504+
ts.Seek(kb.B)
505+
}
506+
507+
if err := ts.Error(); err != nil {
508+
logger.Panicf("FATAL: error when searching for tenant ids: %s", err)
509+
}
510+
511+
return tenantIDs
512+
}
513+
473514
func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical string) {
474515
st := GetStreamTags()
475516
mustUnmarshalStreamTags(st, streamTagsCanonical)
@@ -575,6 +616,13 @@ func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter
575616
bbPool.Put(bb)
576617
}
577618

619+
func (idb *indexdb) searchTenants() []TenantID {
620+
is := idb.getIndexSearch()
621+
defer idb.putIndexSearch(is)
622+
623+
return is.getTenantIDs()
624+
}
625+
578626
type batchItems struct {
579627
buf []byte
580628

lib/logstorage/indexdb_test.go

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,3 +252,70 @@ func TestStorageSearchStreamIDs(t *testing.T) {
252252

253253
closeTestStorage(s)
254254
}
255+
256+
func TestGetTenantsIDs(t *testing.T) {
257+
t.Parallel()
258+
259+
path := t.Name()
260+
const partitionName = "foobar"
261+
262+
s := newTestStorage()
263+
defer closeTestStorage(s)
264+
265+
mustCreateIndexdb(path)
266+
defer fs.MustRemoveDir(path)
267+
268+
idb := mustOpenIndexdb(path, partitionName, s)
269+
defer mustCloseIndexdb(idb)
270+
271+
tenantIDs := []TenantID{
272+
{AccountID: 0, ProjectID: 0},
273+
{AccountID: 0, ProjectID: 1},
274+
{AccountID: 1, ProjectID: 0},
275+
{AccountID: 1, ProjectID: 1},
276+
{AccountID: 123, ProjectID: 567},
277+
}
278+
getStreamIDForTags := func(tags map[string]string) ([]streamID, string) {
279+
st := GetStreamTags()
280+
for k, v := range tags {
281+
st.Add(k, v)
282+
}
283+
streamTagsCanonical := st.MarshalCanonical(nil)
284+
PutStreamTags(st)
285+
id := hash128(streamTagsCanonical)
286+
sids := make([]streamID, 0, len(tenantIDs))
287+
for _, tenantID := range tenantIDs {
288+
sid := streamID{
289+
tenantID: tenantID,
290+
id: id,
291+
}
292+
293+
sids = append(sids, sid)
294+
}
295+
296+
return sids, string(streamTagsCanonical)
297+
}
298+
299+
// Create indexdb entries
300+
const jobsCount = 7
301+
const instancesCount = 5
302+
for i := 0; i < jobsCount; i++ {
303+
for j := 0; j < instancesCount; j++ {
304+
sids, streamTagsCanonical := getStreamIDForTags(map[string]string{
305+
"job": fmt.Sprintf("job-%d", i),
306+
"instance": fmt.Sprintf("instance-%d", j),
307+
})
308+
for _, sid := range sids {
309+
idb.mustRegisterStream(&sid, streamTagsCanonical)
310+
}
311+
312+
}
313+
}
314+
idb.debugFlush()
315+
316+
// run the test
317+
result := idb.searchTenants()
318+
if !reflect.DeepEqual(result, tenantIDs) {
319+
t.Fatalf("unexpected tensntIds; got %v; want %v", result, tenantIDs)
320+
}
321+
}

lib/logstorage/storage_search.go

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -587,6 +587,86 @@ func (s *Storage) GetStreamIDs(qctx *QueryContext, limit uint64) ([]ValueWithHit
587587
return s.GetFieldValues(qctx, "_stream_id", limit)
588588
}
589589

590+
// GetTenantIDs returns tenantIDs for the given start and end.
591+
func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]TenantID, error) {
592+
return s.getTenantIDs(ctx, start, end)
593+
}
594+
595+
func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]TenantID, error) {
596+
workersCount := cgroup.AvailableCPUs()
597+
stopCh := ctx.Done()
598+
599+
tenantIDByWorker := make([][]TenantID, workersCount)
600+
601+
// spin up workers
602+
var wg sync.WaitGroup
603+
workCh := make(chan *partition, workersCount)
604+
for i := 0; i < workersCount; i++ {
605+
wg.Add(1)
606+
go func(workerID uint) {
607+
defer wg.Done()
608+
for pt := range workCh {
609+
if needStop(stopCh) {
610+
// The search has been canceled. Just skip all the scheduled work in order to save CPU time.
611+
continue
612+
}
613+
tenantIDs := pt.idb.searchTenants()
614+
tenantIDByWorker[workerID] = append(tenantIDByWorker[workerID], tenantIDs...)
615+
}
616+
}(uint(i))
617+
}
618+
619+
// Select partitions according to the selected time range
620+
s.partitionsLock.Lock()
621+
ptws := s.partitions
622+
minDay := start / nsecsPerDay
623+
n := sort.Search(len(ptws), func(i int) bool {
624+
return ptws[i].day >= minDay
625+
})
626+
ptws = ptws[n:]
627+
maxDay := end / nsecsPerDay
628+
n = sort.Search(len(ptws), func(i int) bool {
629+
return ptws[i].day > maxDay
630+
})
631+
ptws = ptws[:n]
632+
633+
// Copy the selected partitions, so they don't interfere with s.partitions.
634+
ptws = append([]*partitionWrapper{}, ptws...)
635+
636+
for _, ptw := range ptws {
637+
ptw.incRef()
638+
}
639+
s.partitionsLock.Unlock()
640+
641+
// Schedule concurrent search across matching partitions.
642+
for _, ptw := range ptws {
643+
workCh <- ptw.pt
644+
}
645+
646+
// Wait until workers finish their work
647+
close(workCh)
648+
wg.Wait()
649+
650+
// Decrement references to partitions
651+
for _, ptw := range ptws {
652+
ptw.decRef()
653+
}
654+
655+
uniqTenantIDs := make(map[TenantID]struct{})
656+
for _, tenantIDs := range tenantIDByWorker {
657+
for _, tenantID := range tenantIDs {
658+
uniqTenantIDs[tenantID] = struct{}{}
659+
}
660+
}
661+
662+
tenants := make([]TenantID, 0, len(uniqTenantIDs))
663+
for k := range uniqTenantIDs {
664+
tenants = append(tenants, k)
665+
}
666+
667+
return tenants, nil
668+
}
669+
590670
func (s *Storage) runValuesWithHitsQuery(qctx *QueryContext) ([]ValueWithHits, error) {
591671
var results []ValueWithHits
592672
var resultsLock sync.Mutex

lib/logstorage/storage_search_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,22 @@ func TestStorageRunQuery(t *testing.T) {
605605
}
606606
})
607607

608+
t.Run("tenant_ids", func(t *testing.T) {
609+
tenantIDs, err := s.GetTenantIDs(context.TODO(), 0, time.Now().UnixNano())
610+
if err != nil {
611+
t.Fatalf("unexpected error: %s", err)
612+
}
613+
sort.Slice(tenantIDs, func(i, j int) bool {
614+
return tenantIDs[i].less(&tenantIDs[j])
615+
})
616+
sort.Slice(allTenantIDs, func(i, j int) bool {
617+
return allTenantIDs[i].less(&allTenantIDs[j])
618+
})
619+
if !reflect.DeepEqual(tenantIDs, allTenantIDs) {
620+
t.Fatalf("unexpected GetTenantIDs result; got: %v, want: %v", tenantIDs, allTenantIDs)
621+
}
622+
})
623+
608624
// Run more complex tests
609625
f := func(t *testing.T, query string, rowsExpected [][]Field) {
610626
t.Helper()

0 commit comments

Comments
 (0)