Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions lib/logstorage/indexdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,47 @@ func (is *indexSearch) getStreamIDsForTagRegexp(tenantID TenantID, tagName strin
return ids
}

func (is *indexSearch) getTenantIDs() []TenantID {
var tenantIDs []TenantID // return as result
var tenantID TenantID // variable for unmarshal

ts := &is.ts
kb := &is.kb

kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tenantID)
ts.Seek(kb.B)

for ts.NextItem() {
_, prefix, err := unmarshalCommonPrefix(&tenantID, ts.Item)
if err != nil {
logger.Panicf("FATAL: cannot unmarshal tenantID: %s", err)
}
if prefix != nsPrefixStreamID {
// Reached the end of entries with the needed prefix.
break
}
tenantIDs = append(tenantIDs, tenantID)
// Seek for the next (accountID, projectID)
tenantID.ProjectID++
if tenantID.ProjectID == 0 {
tenantID.AccountID++
if tenantID.AccountID == 0 {
// Reached the end (accountID, projectID) space
break
}
}

kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tenantID)
ts.Seek(kb.B)
}

if err := ts.Error(); err != nil {
logger.Panicf("FATAL: error when searching for tenant ids: %s", err)
}

return tenantIDs
}

func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical string) {
st := GetStreamTags()
mustUnmarshalStreamTags(st, streamTagsCanonical)
Expand Down Expand Up @@ -575,6 +616,13 @@ func (idb *indexdb) storeStreamIDsToCache(tenantIDs []TenantID, sf *StreamFilter
bbPool.Put(bb)
}

func (idb *indexdb) searchTenants() []TenantID {
is := idb.getIndexSearch()
defer idb.putIndexSearch(is)

return is.getTenantIDs()
}

type batchItems struct {
buf []byte

Expand Down
67 changes: 67 additions & 0 deletions lib/logstorage/indexdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,70 @@ func TestStorageSearchStreamIDs(t *testing.T) {

closeTestStorage(s)
}

func TestGetTenantsIDs(t *testing.T) {
t.Parallel()

path := t.Name()
const partitionName = "foobar"

s := newTestStorage()
defer closeTestStorage(s)

mustCreateIndexdb(path)
defer fs.MustRemoveDir(path)

idb := mustOpenIndexdb(path, partitionName, s)
defer mustCloseIndexdb(idb)

tenantIDs := []TenantID{
{AccountID: 0, ProjectID: 0},
{AccountID: 0, ProjectID: 1},
{AccountID: 1, ProjectID: 0},
{AccountID: 1, ProjectID: 1},
{AccountID: 123, ProjectID: 567},
}
getStreamIDForTags := func(tags map[string]string) ([]streamID, string) {
st := GetStreamTags()
for k, v := range tags {
st.Add(k, v)
}
streamTagsCanonical := st.MarshalCanonical(nil)
PutStreamTags(st)
id := hash128(streamTagsCanonical)
sids := make([]streamID, 0, len(tenantIDs))
for _, tenantID := range tenantIDs {
sid := streamID{
tenantID: tenantID,
id: id,
}

sids = append(sids, sid)
}

return sids, string(streamTagsCanonical)
}

// Create indexdb entries
const jobsCount = 7
const instancesCount = 5
for i := 0; i < jobsCount; i++ {
for j := 0; j < instancesCount; j++ {
sids, streamTagsCanonical := getStreamIDForTags(map[string]string{
"job": fmt.Sprintf("job-%d", i),
"instance": fmt.Sprintf("instance-%d", j),
})
for _, sid := range sids {
idb.mustRegisterStream(&sid, streamTagsCanonical)
}

}
}
idb.debugFlush()

// run the test
result := idb.searchTenants()
if !reflect.DeepEqual(result, tenantIDs) {
t.Fatalf("unexpected tensntIds; got %v; want %v", result, tenantIDs)
}
}
80 changes: 80 additions & 0 deletions lib/logstorage/storage_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,86 @@ func (s *Storage) GetStreamIDs(qctx *QueryContext, limit uint64) ([]ValueWithHit
return s.GetFieldValues(qctx, "_stream_id", limit)
}

// GetTenantIDs returns tenantIDs for the given start and end.
func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]TenantID, error) {
return s.getTenantIDs(ctx, start, end)
}

func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]TenantID, error) {
workersCount := cgroup.AvailableCPUs()
stopCh := ctx.Done()

tenantIDByWorker := make([][]TenantID, workersCount)

// spin up workers
var wg sync.WaitGroup
workCh := make(chan *partition, workersCount)
for i := 0; i < workersCount; i++ {
wg.Add(1)
go func(workerID uint) {
defer wg.Done()
for pt := range workCh {
if needStop(stopCh) {
// The search has been canceled. Just skip all the scheduled work in order to save CPU time.
continue
}
tenantIDs := pt.idb.searchTenants()
tenantIDByWorker[workerID] = append(tenantIDByWorker[workerID], tenantIDs...)
}
}(uint(i))
}

// Select partitions according to the selected time range
s.partitionsLock.Lock()
ptws := s.partitions
minDay := start / nsecsPerDay
n := sort.Search(len(ptws), func(i int) bool {
return ptws[i].day >= minDay
})
ptws = ptws[n:]
maxDay := end / nsecsPerDay
n = sort.Search(len(ptws), func(i int) bool {
return ptws[i].day > maxDay
})
ptws = ptws[:n]

// Copy the selected partitions, so they don't interfere with s.partitions.
ptws = append([]*partitionWrapper{}, ptws...)

for _, ptw := range ptws {
ptw.incRef()
}
s.partitionsLock.Unlock()

// Schedule concurrent search across matching partitions.
for _, ptw := range ptws {
workCh <- ptw.pt
}

// Wait until workers finish their work
close(workCh)
wg.Wait()

// Decrement references to partitions
for _, ptw := range ptws {
ptw.decRef()
}

uniqTenantIDs := make(map[TenantID]struct{})
for _, tenantIDs := range tenantIDByWorker {
for _, tenantID := range tenantIDs {
uniqTenantIDs[tenantID] = struct{}{}
}
}

tenants := make([]TenantID, 0, len(uniqTenantIDs))
for k := range uniqTenantIDs {
tenants = append(tenants, k)
}

return tenants, nil
}

func (s *Storage) runValuesWithHitsQuery(qctx *QueryContext) ([]ValueWithHits, error) {
var results []ValueWithHits
var resultsLock sync.Mutex
Expand Down
16 changes: 16 additions & 0 deletions lib/logstorage/storage_search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,22 @@ func TestStorageRunQuery(t *testing.T) {
}
})

t.Run("tenant_ids", func(t *testing.T) {
tenantIDs, err := s.GetTenantIDs(context.TODO(), 0, time.Now().UnixNano())
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
sort.Slice(tenantIDs, func(i, j int) bool {
return tenantIDs[i].less(&tenantIDs[j])
})
sort.Slice(allTenantIDs, func(i, j int) bool {
return allTenantIDs[i].less(&allTenantIDs[j])
})
if !reflect.DeepEqual(tenantIDs, allTenantIDs) {
t.Fatalf("unexpected GetTenantIDs result; got: %v, want: %v", tenantIDs, allTenantIDs)
}
})

// Run more complex tests
f := func(t *testing.T, query string, rowsExpected [][]Field) {
t.Helper()
Expand Down