Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions app/vlselect/internalselect/internalselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package internalselect

import (
"context"
"encoding/json"
"flag"
"fmt"
"net/http"
Expand Down Expand Up @@ -85,6 +86,7 @@ var requestHandlers = map[string]func(ctx context.Context, w http.ResponseWriter
"/internal/delete/run_task": processDeleteRunTask,
"/internal/delete/stop_task": processDeleteStopTask,
"/internal/delete/active_tasks": processDeleteActiveTasks,
"/internal/select/tenant_ids": processTenantIDsRequest,
}

func processQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
Expand Down Expand Up @@ -375,6 +377,41 @@ func processDeleteActiveTasks(ctx context.Context, w http.ResponseWriter, r *htt
return nil
}

func processTenantIDsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
start, err := getInt64FromRequest(r, "start")
if err != nil {
return err
}
end, err := getInt64FromRequest(r, "end")
if err != nil {
return err
}

disableCompression := false
if err := getBoolFromRequest(&disableCompression, r, "disable_compression"); err != nil {
return err
}

tenantIDs, err := vlstorage.GetTenantIDs(ctx, start, end)
if err != nil {
return fmt.Errorf("cannot obtain tenant IDs: %w", err)
}

// Marshal tenantIDs at first
b, err := json.Marshal(tenantIDs)
if err != nil {
return fmt.Errorf("cannot marshal tenantIDs: %w", err)
}
if !disableCompression {
b = zstd.CompressLevel(nil, b, 1)
}
w.Header().Set("Content-Type", "application/json")
if _, err := w.Write(b); err != nil {
return fmt.Errorf("cannot send response to the client: %w", err)
}
return nil
}

type commonParams struct {
TenantIDs []logstorage.TenantID
Query *logstorage.Query
Expand Down
48 changes: 48 additions & 0 deletions app/vlselect/logsql/logsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package logsql

import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
Expand Down Expand Up @@ -1073,6 +1074,53 @@ func ProcessQueryRequest(ctx context.Context, w http.ResponseWriter, r *http.Req
writeResponseHeadersOnce()
}

// ProcessAdminTenantsRequest processes /select/tenant_ids request.
func ProcessAdminTenantsRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) {
start, okStart, err := getTimeNsec(r, "start")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
end, okEnd, err := getTimeNsec(r, "end")
if err != nil {
httpserver.Errorf(w, r, "%s", err)
return
}
if !okStart {
start = math.MinInt64
}
if !okEnd {
end = math.MaxInt64
}

if start > end {
httpserver.Errorf(w, r, "'start' time must be less than 'end' time")
return
}

tenants, err := vlstorage.GetTenantIDs(ctx, start, end)
if err != nil {
httpserver.Errorf(w, r, "cannot obtain tenantIDs: %s", err)
return
}

t, err := json.Marshal(tenants)
if err != nil {
httpserver.Errorf(w, r, "cannot marshal tenantIDs to JSON: %s", err)
return
}

// Write response header
h := w.Header()

h.Set("Content-Type", "application/json")

if _, err := w.Write(t); err != nil {
httpserver.Errorf(w, r, "cannot send response to the client: %s", err)
return
}
}

type syncWriter struct {
mu sync.Mutex
w io.Writer
Expand Down
6 changes: 6 additions & 0 deletions app/vlselect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,10 @@ func processSelectRequest(ctx context.Context, w http.ResponseWriter, r *http.Re
logsql.ProcessStreamsRequest(ctx, w, r)
logsqlStreamsDuration.UpdateDuration(startTime)
return true
case "/select/tenant_ids":
logsqlAdminTenantsRequests.Inc()
logsql.ProcessAdminTenantsRequest(ctx, w, r)
return true
default:
return false
}
Expand Down Expand Up @@ -437,4 +441,6 @@ var (
deleteRunTaskRequests = metrics.NewCounter(`vl_http_requests_total{path="/delete/run_task"}`)
deleteStopTaskRequests = metrics.NewCounter(`vl_http_requests_total{path="/delete/stop_task"}`)
deleteActiveTasksRequests = metrics.NewCounter(`vl_http_requests_total{path="/delete/active_tasks"}`)

logsqlAdminTenantsRequests = metrics.NewCounter(`vl_http_requests_total{path="/select/tenant_ids"}`)
)
8 changes: 8 additions & 0 deletions app/vlstorage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,14 @@ func DeleteActiveTasks(ctx context.Context) ([]*logstorage.DeleteTask, error) {
return netstorageSelect.DeleteActiveTasks(ctx)
}

// GetTenantIDs returns tenantIDs from the storage by the given start and end.
func GetTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) {
if localStorage != nil {
return localStorage.GetTenantIDs(ctx, start, end)
}
return netstorageSelect.GetTenantIDs(ctx, start, end)
}

func writeStorageMetrics(w io.Writer, strg *logstorage.Storage) {
var ss logstorage.StorageStats
strg.UpdateStats(&ss)
Expand Down
66 changes: 66 additions & 0 deletions app/vlstorage/netselect/netselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package netselect

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -245,6 +246,22 @@ func (sn *storageNode) getStreamIDs(qctx *logstorage.QueryContext, limit uint64)
return sn.getValuesWithHits(qctx, "/internal/select/stream_ids", args)
}

func (sn *storageNode) getTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) {
args := url.Values{}
args.Set("start", fmt.Sprintf("%d", start))
args.Set("end", fmt.Sprintf("%d", end))
args.Set("disable_compression", fmt.Sprintf("%v", sn.s.disableCompression))
b, err := sn.getResponseForPathAndArgs(ctx, "/internal/select/tenant_ids", args)
if err != nil {
return nil, err
}
var tIDs []logstorage.TenantID
if err := json.Unmarshal(b, &tIDs); err != nil {
return nil, fmt.Errorf("cannot unmarshal tenant IDs from %q: %w", b, err)
}
return tIDs, nil
}

func (sn *storageNode) getCommonArgs(version string, qctx *logstorage.QueryContext) url.Values {
// ATTENTION: the *ProtocolVersion consts must be incremented every time the set of common args changes or its format changes.

Expand Down Expand Up @@ -563,6 +580,55 @@ func (s *Storage) DeleteActiveTasks(ctx context.Context) ([]*logstorage.DeleteTa
return tasks, nil
}

// GetTenantIDs returns tenantIDs for the given start and end.
func (s *Storage) GetTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) {
return s.getTenantIDs(ctx, start, end)
}

func (s *Storage) getTenantIDs(ctx context.Context, start, end int64) ([]logstorage.TenantID, error) {
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()

results := make([][]logstorage.TenantID, len(s.sns))
errs := make([]error, len(s.sns))

var wg sync.WaitGroup
for i := range s.sns {
wg.Add(1)
go func(nodeIdx int) {
defer wg.Done()

sn := s.sns[nodeIdx]
tenantIDs, err := sn.getTenantIDs(ctxWithCancel, start, end)
results[nodeIdx] = tenantIDs
errs[nodeIdx] = err
if err != nil {
// Cancel the remaining parallel requests
cancel()
}
}(i)
}
wg.Wait()
if err := getFirstError(errs, false); err != nil {
return nil, err
}

unique := make(map[logstorage.TenantID]struct{})
for _, tenants := range results {
// Deduplicate tenantIDs
for _, tenant := range tenants {
unique[tenant] = struct{}{}
}
}

tenantIDs := make([]logstorage.TenantID, 0, len(unique))
for key := range unique {
tenantIDs = append(tenantIDs, key)
}

return tenantIDs, nil
}

func (s *Storage) getValuesWithHits(qctx *logstorage.QueryContext, limit uint64, resetHitsOnLimitExceeded bool,
callback func(ctx context.Context, sn *storageNode) ([]logstorage.ValueWithHits, error)) ([]logstorage.ValueWithHits, error) {

Expand Down
2 changes: 1 addition & 1 deletion lib/logstorage/indexdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func TestStorageSearchStreamIDs(t *testing.T) {
// non-existing-tag-re
f(`{job="job-0",instance="instance-0",non_existing_tag=~"foo.+"}`, nil)

//non-existing-non-empty-tag-re
// non-existing-non-empty-tag-re
f(`{job="job-0",instance="instance-0",non_existing_tag!~""}`, nil)

// match-job-instance
Expand Down