Skip to content
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions lib/logstorage/indexdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,52 @@
return ids
}

func (is *indexSearch) getTenantIDs() []TenantID {
tenants := make(map[TenantID]struct{})
ts := &is.ts
kb := &is.kb

var tID TenantID

kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tID)
ts.Seek(kb.B)

for ts.NextItem() {
_, prefix, err := unmarshalCommonPrefix(&tID, ts.Item)
if err != nil {
logger.Panicf("FATAL: cannot unmarshal tenantID: %s", err)
}
if prefix != nsPrefixStreamID {
// Reached the end of entries with the needed prefix.
break
}
tenants[tID] = struct{}{}
// Seek for the next (accountID, projectID)
tID.ProjectID++
if tID.ProjectID == 0 {
tID.AccountID++
if tID.AccountID == 0 {
// Reached the end (accountID, projectID) space
break
}
}

kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixStreamID, tID)
ts.Seek(kb.B)
}

if err := ts.Error(); err != nil {
logger.Panicf("FATAL: error when searching for tenant ids: %s", err)
}

tenantIDs := make([]TenantID, 0)
for tenantID := range tenants {

Check failure on line 512 in lib/logstorage/indexdb.go

View workflow job for this annotation

GitHub Actions / Analyze

declared and not used: tenantID

Check failure on line 512 in lib/logstorage/indexdb.go

View workflow job for this annotation

GitHub Actions / Build

declared and not used: tenantID

Check failure on line 512 in lib/logstorage/indexdb.go

View workflow job for this annotation

GitHub Actions / Build

declared and not used: tenantID

Check failure on line 512 in lib/logstorage/indexdb.go

View workflow job for this annotation

GitHub Actions / Build

declared and not used: tenantID

Check failure on line 512 in lib/logstorage/indexdb.go

View workflow job for this annotation

GitHub Actions / Build

declared and not used: tenantID
tenantIDs = append(tenantIDs, tid)

Check failure on line 513 in lib/logstorage/indexdb.go

View workflow job for this annotation

GitHub Actions / Analyze

undefined: tid

Check failure on line 513 in lib/logstorage/indexdb.go

View workflow job for this annotation

GitHub Actions / Build

undefined: tid

Check failure on line 513 in lib/logstorage/indexdb.go

View workflow job for this annotation

GitHub Actions / Build

undefined: tid

Check failure on line 513 in lib/logstorage/indexdb.go

View workflow job for this annotation

GitHub Actions / Build

undefined: tid

Check failure on line 513 in lib/logstorage/indexdb.go

View workflow job for this annotation

GitHub Actions / Build

undefined: tid
}

return tenantIDs
}

func (idb *indexdb) mustRegisterStream(streamID *streamID, streamTagsCanonical string) {
st := GetStreamTags()
mustUnmarshalStreamTags(st, streamTagsCanonical)
Expand Down Expand Up @@ -575,6 +621,13 @@
bbPool.Put(bb)
}

func (idb *indexdb) searchTenants() []TenantID {
is := idb.getIndexSearch()
defer idb.putIndexSearch(is)

return is.getTenantIDs()
}

type batchItems struct {
buf []byte

Expand Down
80 changes: 80 additions & 0 deletions lib/logstorage/indexdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package logstorage
import (
"fmt"
"reflect"
"sort"
"testing"

"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
Expand Down Expand Up @@ -252,3 +253,82 @@ func TestStorageSearchStreamIDs(t *testing.T) {

closeTestStorage(s)
}

func TestGetTenantsIDs(t *testing.T) {
t.Parallel()

path := t.Name()
const partitionName = "foobar"
s := newTestStorage()
mustCreateIndexdb(path)
idb := mustOpenIndexdb(path, partitionName, s)

tenantIDs := []TenantID{
{AccountID: 0, ProjectID: 0},
{AccountID: 0, ProjectID: 1},
{AccountID: 1, ProjectID: 0},
{AccountID: 1, ProjectID: 1},
{AccountID: 123, ProjectID: 567},
}
getStreamIDForTags := func(tags map[string]string) ([]streamID, string) {
st := GetStreamTags()
for k, v := range tags {
st.Add(k, v)
}
streamTagsCanonical := st.MarshalCanonical(nil)
PutStreamTags(st)
id := hash128(streamTagsCanonical)
sids := make([]streamID, 0, len(tenantIDs))
for _, tenantID := range tenantIDs {
sid := streamID{
tenantID: tenantID,
id: id,
}

sids = append(sids, sid)
}

return sids, string(streamTagsCanonical)
}

// Create indexdb entries
const jobsCount = 7
const instancesCount = 5
for i := 0; i < jobsCount; i++ {
for j := 0; j < instancesCount; j++ {
sids, streamTagsCanonical := getStreamIDForTags(map[string]string{
"job": fmt.Sprintf("job-%d", i),
"instance": fmt.Sprintf("instance-%d", j),
})
for _, sid := range sids {
idb.mustRegisterStream(&sid, streamTagsCanonical)
}

}
}
idb.debugFlush()

f := func(expectedTenantIDs []TenantID) {
t.Helper()
tenantIDs := idb.searchTenants()
sort.Slice(tenantIDs, func(i, j int) bool {
return tenantIDs[i].less(&tenantIDs[j])
})
sort.Slice(expectedTenantIDs, func(i, j int) bool {
return expectedTenantIDs[i].less(&expectedTenantIDs[j])
})
if !reflect.DeepEqual(tenantIDs, expectedTenantIDs) {
fs.MustRemoveDir(path)
t.Fatalf("unexpected tensntIds; got %v; want %v", tenantIDs, expectedTenantIDs)
}
}

expectedTenantIDs := tenantIDs

f(expectedTenantIDs)

mustCloseIndexdb(idb)
fs.MustRemoveDir(path)

closeTestStorage(s)
}
86 changes: 84 additions & 2 deletions lib/logstorage/storage_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,13 @@ import (
"sync"
"time"

"github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"

"github.com/VictoriaMetrics/VictoriaLogs/lib/prefixfilter"
)

// QueryContext is used for execting the query passed to NewQueryContext()
Expand Down Expand Up @@ -587,6 +586,89 @@ func (s *Storage) GetStreamIDs(qctx *QueryContext, limit uint64) ([]ValueWithHit
return s.GetFieldValues(qctx, "_stream_id", limit)
}

// GetTenantIDs returns tenantIDs for the given start and end.
func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]TenantID, error) {
return s.getTenantIDs(ctx, start, end)
}

func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]TenantID, error) {
workersCount := cgroup.AvailableCPUs()
stopCh := ctx.Done()

tenantIDs := make([][]TenantID, workersCount)
processPartitions := func(pt *partition, workerID uint) {
tenants := pt.idb.searchTenants()
tenantIDs[workerID] = append(tenantIDs[workerID], tenants...)
}

// Spin up workers
var wgWorkers sync.WaitGroup
workCh := make(chan *partition, workersCount)
wgWorkers.Add(workersCount)
for i := 0; i < workersCount; i++ {
go func(workerID uint) {
for pt := range workCh {
if needStop(stopCh) {
// The search has been canceled. Just skip all the scheduled work in order to save CPU time.
continue
}
processPartitions(pt, workerID)
}
wgWorkers.Done()
}(uint(i))
}

// Select partitions according to the selected time range
s.partitionsLock.Lock()
ptws := s.partitions
minDay := start / nsecsPerDay
n := sort.Search(len(ptws), func(i int) bool {
return ptws[i].day >= minDay
})
ptws = ptws[n:]
maxDay := end / nsecsPerDay
n = sort.Search(len(ptws), func(i int) bool {
return ptws[i].day > maxDay
})
ptws = ptws[:n]

// Copy the selected partitions, so they don't interfere with s.partitions.
ptws = append([]*partitionWrapper{}, ptws...)

for _, ptw := range ptws {
ptw.incRef()
}
s.partitionsLock.Unlock()

// Schedule concurrent search across matching partitions.
for _, ptw := range ptws {
workCh <- ptw.pt
}

// Wait until workers finish their work
close(workCh)
wgWorkers.Wait()

// Decrement references to partitions
for _, ptw := range ptws {
ptw.decRef()
}

unique := make(map[TenantID]struct{}, len(tenantIDs))
for _, tids := range tenantIDs {
for _, tid := range tids {
unique[tid] = struct{}{}
}
}

tenants := make([]TenantID, 0, len(unique))
for k := range unique {
tenants = append(tenants, k)
}

return tenants, nil
}

func (s *Storage) runValuesWithHitsQuery(qctx *QueryContext) ([]ValueWithHits, error) {
var results []ValueWithHits
var resultsLock sync.Mutex
Expand Down
Loading