Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions app/vtselect/traces/query/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ func GetSpanNameList(ctx context.Context, cp *CommonParams, serviceName string)
// todo in-memory cache of hot traces.
func GetTrace(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, error) {
currentTime := time.Now()

// possible partition
// query: {trace_id_idx="xx"} AND trace_id:traceID
qStr := fmt.Sprintf(`{%s="%d"} AND %s:=%q | fields _time`, otelpb.TraceIDIndexStreamName, xxhash.Sum64String(traceID)%otelpb.TraceIDIndexPartitionCount, otelpb.TraceIDIndexFieldName, traceID)
Expand All @@ -163,17 +162,26 @@ func GetTrace(ctx context.Context, cp *CommonParams, traceID string) ([]*Row, er
}
q.AddPipeOffsetLimit(0, 1)
traceTimestamp, err := findTraceIDTimeSplitTimeRange(ctx, q, cp)
if err != nil {
if err != nil && !isOutOfRetentionPeriodError(err) {
return nil, fmt.Errorf("cannot find trace_id %q start time: %s", traceID, err)
}

// fast path: trace start time found, search in [trace start time, trace start time + *traceMaxDurationWindow] time range.
if !traceTimestamp.IsZero() {
return findSpansByTraceIDAndTime(ctx, cp, traceID, traceTimestamp.Add(-*traceMaxDurationWindow), traceTimestamp.Add(*traceMaxDurationWindow))
rows, err := findSpansByTraceIDAndTime(ctx, cp, traceID, traceTimestamp.Add(-*traceMaxDurationWindow), traceTimestamp.Add(*traceMaxDurationWindow))
// meeting out of retention error means no such traceID in retention period.
if err != nil && isOutOfRetentionPeriodError(err) {
return []*Row{}, nil
}
return rows, err
}
// slow path: if trace start time not exist, probably the root span was not available.
// try to search from now to 0 timestamp.
return findSpansByTraceID(ctx, cp, traceID)
rows, err := findSpansByTraceID(ctx, cp, traceID)
if err != nil && isOutOfRetentionPeriodError(err) {
return []*Row{}, nil
}
return rows, err
}

// GetTraceList returns multiple traceIDs and spans of them in []*Row format.
Expand Down Expand Up @@ -420,7 +428,7 @@ func findTraceIDTimeSplitTimeRange(ctx context.Context, q *logstorage.Query, cp
currentTime := time.Now()
startTime := currentTime.Add(-*traceSearchStep)
endTime := currentTime
for startTime.UnixNano() > 0 { // todo: no need to search time range before retention period.
for startTime.UnixNano() > 0 {
qq := q.CloneWithTimeFilter(currentTime.UnixNano(), startTime.UnixNano(), endTime.UnixNano())
qctx = qctx.WithQuery(qq)

Expand All @@ -441,7 +449,6 @@ func findTraceIDTimeSplitTimeRange(ctx context.Context, q *logstorage.Query, cp
// found result, perform extra search for traceMaxDurationWindow and then break.
return time.Unix(traceIDStartTimeInt/1e9, traceIDStartTimeInt%1e9), nil
}

return time.Time{}, nil
}

Expand Down Expand Up @@ -495,7 +502,6 @@ func findSpansByTraceIDAndTime(ctx context.Context, cp *CommonParams, traceID st
if err != nil {
return nil, fmt.Errorf("cannot parse query [%s]: %s", qStr, err)
}

ctxWithCancel, cancel := context.WithCancel(ctx)
cp.Query = q
qctx := cp.NewQueryContext(ctxWithCancel)
Expand Down Expand Up @@ -566,3 +572,7 @@ func checkTraceIDList(traceIDList []string) []string {
}
return result
}

func isOutOfRetentionPeriodError(err error) bool {
return err != nil && strings.Contains(err.Error(), "out of retention period")
}
18 changes: 18 additions & 0 deletions app/vtstorage/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"flag"
"fmt"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"io"
"net/http"
"time"
Expand Down Expand Up @@ -418,6 +419,19 @@ func (*Storage) MustAddRows(lr *logstorage.LogRows) {
}
}

// CheckTimeExceedRetention
// endTimestamp is a nanosecond timestamp
func CheckTimeExceedRetention(endTimestamp int64) error {
minAllowedTimestamp := int64(fasttime.UnixTimestamp()*1000) - retentionPeriod.Milliseconds()
if endTimestamp/1000000 > minAllowedTimestamp {
return nil
}
return &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("out of retention period. the given time range %d is outside the allowed -retentionPeriod=%s", endTimestamp, retentionPeriod),
StatusCode: http.StatusServiceUnavailable,
}
}

// RunQuery runs the given qctx and calls writeBlock for the returned data blocks
func RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBlockFunc) error {
qOpt, offset, limit := qctx.Query.GetLastNResultsQuery()
Expand All @@ -427,6 +441,10 @@ func RunQuery(qctx *logstorage.QueryContext, writeBlock logstorage.WriteDataBloc
}

if localStorage != nil {
_, maxTimestamp := qctx.Query.GetFilterTimeRange()
if err := CheckTimeExceedRetention(maxTimestamp); err != nil {
return err
}
return localStorage.RunQuery(qctx, writeBlock)
}
return netstorageSelect.RunQuery(qctx, writeBlock)
Expand Down
5 changes: 4 additions & 1 deletion app/vtstorage/netselect/netselect.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,10 @@ func (sn *storageNode) getResponseBodyForPathAndArgs(ctx context.Context, path s
responseBody = []byte(err.Error())
}
_ = resp.Body.Close()
return nil, "", fmt.Errorf("unexpected response status code from %q: %d; want %d; response: %q", reqURL, resp.StatusCode, http.StatusOK, responseBody)
return nil, "", &httpserver.ErrorWithStatusCode{
Err: fmt.Errorf("unexpected status code for the request to %q: %d; want %d; response: %q", reqURL, resp.StatusCode, http.StatusOK, responseBody),
StatusCode: resp.StatusCode,
}
}

return resp.Body, reqURL, nil
Expand Down
20 changes: 13 additions & 7 deletions apptest/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,20 @@ type VictoriaTracesWriteQuerier interface {
StorageMerger
}

type JaegerQueryResult struct {
GetServicesResponse *JaegerAPIServicesResponse
GetOperationsResponse *JaegerAPIOperationsResponse
GetTracesResponse *JaegerAPITracesResponse
GetTraceResponse *JaegerAPITraceResponse
RespBody string
}

// JaegerQuerier contains methods available to Jaeger HTTP API for Querying.
type JaegerQuerier interface {
JaegerAPIServices(t *testing.T, opts QueryOpts) *JaegerAPIServicesResponse
JaegerAPIOperations(t *testing.T, serviceName string, opts QueryOpts) *JaegerAPIOperationsResponse
JaegerAPITraces(t *testing.T, params JaegerQueryParam, opts QueryOpts) *JaegerAPITracesResponse
JaegerAPITrace(t *testing.T, traceID string, opts QueryOpts) *JaegerAPITraceResponse
JaegerAPIServices(t *testing.T, opts QueryOpts) *JaegerQueryResult
JaegerAPIOperations(t *testing.T, serviceName string, opts QueryOpts) *JaegerQueryResult
JaegerAPITraces(t *testing.T, params JaegerQueryParam, opts QueryOpts) *JaegerQueryResult
JaegerAPITrace(t *testing.T, traceID string, opts QueryOpts) *JaegerQueryResult
JaegerAPIDependencies(t *testing.T, opts QueryOpts)
}

Expand Down Expand Up @@ -242,9 +250,7 @@ func NewJaegerAPITracesResponse(t *testing.T, s string) *JaegerAPITracesResponse
t.Helper()

res := &JaegerAPITracesResponse{}
if err := json.Unmarshal([]byte(s), res); err != nil {
t.Fatalf("could not unmarshal query response data=\n%s\n: %v", string(s), err)
}
_ = json.Unmarshal([]byte(s), res)
return res
}

Expand Down
2 changes: 1 addition & 1 deletion apptest/testcase.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (tc *TestCase) MustStartDefaultVtsingle() *Vtsingle {

return tc.MustStartVtsingle("vtsingle", []string{
"-storageDataPath=" + tc.Dir() + "/vtsingle",
"-retentionPeriod=100y",
"-retentionPeriod=7d",
})
}

Expand Down
42 changes: 38 additions & 4 deletions apptest/tests/otlp_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package tests
import (
"encoding/hex"
"os"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -122,7 +123,7 @@ func testOTLPIngestionJaegerQuery(tc *at.TestCase, sut at.VictoriaTracesWriteQue
tc.Assert(&at.AssertOptions{
Msg: "unexpected /select/jaeger/api/services response",
Got: func() any {
return sut.JaegerAPIServices(t, at.QueryOpts{})
return sut.JaegerAPIServices(t, at.QueryOpts{}).GetServicesResponse
},
Want: &at.JaegerAPIServicesResponse{
Data: []string{serviceName},
Expand All @@ -136,7 +137,7 @@ func testOTLPIngestionJaegerQuery(tc *at.TestCase, sut at.VictoriaTracesWriteQue
tc.Assert(&at.AssertOptions{
Msg: "unexpected /select/jaeger/api/services/*/operations response",
Got: func() any {
return sut.JaegerAPIOperations(t, serviceName, at.QueryOpts{})
return sut.JaegerAPIOperations(t, serviceName, at.QueryOpts{}).GetOperationsResponse
},
Want: &at.JaegerAPIOperationsResponse{
Data: []string{spanName},
Expand Down Expand Up @@ -200,7 +201,7 @@ func testOTLPIngestionJaegerQuery(tc *at.TestCase, sut at.VictoriaTracesWriteQue
StartTimeMin: spanTime.Add(-10 * time.Minute),
StartTimeMax: spanTime.Add(10 * time.Minute),
},
}, at.QueryOpts{})
}, at.QueryOpts{}).GetTracesResponse
},
Want: &at.JaegerAPITracesResponse{
Data: expectTraceData,
Expand All @@ -209,11 +210,32 @@ func testOTLPIngestionJaegerQuery(tc *at.TestCase, sut at.VictoriaTracesWriteQue
cmpopts.IgnoreFields(at.JaegerAPITracesResponse{}, "Errors", "Limit", "Offset", "Total"),
},
})
tc.Assert(&at.AssertOptions{
Msg: "unexpected /select/jaeger/api/traces response",
Got: func() any {
queryResult := sut.JaegerAPITraces(t, at.JaegerQueryParam{
TraceQueryParam: query.TraceQueryParam{
ServiceName: serviceName,
StartTimeMin: spanTime.Add(12 * -24 * time.Hour),
StartTimeMax: spanTime.Add(10 * -24 * time.Hour),
},
}, at.QueryOpts{})
if !strings.Contains(queryResult.RespBody, "out of retention") {
t.Fatalf("out of retention request should return error, get:")
}
return queryResult.GetTracesResponse
},
Want: &at.JaegerAPITracesResponse{},
CmpOpts: []cmp.Option{
cmpopts.IgnoreFields(at.JaegerAPITracesResponse{}, "Errors", "Limit", "Offset", "Total"),
},
})

// check single trace data via /select/jaeger/api/traces/<trace_id>
tc.Assert(&at.AssertOptions{
Msg: "unexpected /select/jaeger/api/traces/<trace_id> response",
Got: func() any {
return sut.JaegerAPITrace(t, hex.EncodeToString([]byte(traceID)), at.QueryOpts{})
return sut.JaegerAPITrace(t, hex.EncodeToString([]byte(traceID)), at.QueryOpts{}).GetTraceResponse
},
Want: &at.JaegerAPITraceResponse{
Data: expectTraceData,
Expand All @@ -222,4 +244,16 @@ func testOTLPIngestionJaegerQuery(tc *at.TestCase, sut at.VictoriaTracesWriteQue
cmpopts.IgnoreFields(at.JaegerAPITraceResponse{}, "Errors", "Limit", "Offset", "Total"),
},
})

// execute q non-exist traceId query in /select/jaeger/api/traces/<trace_id>
tc.Assert(&at.AssertOptions{
Msg: "unexpected /select/jaeger/api/traces/<trace_id> response",
Got: func() any {
return sut.JaegerAPITrace(t, hex.EncodeToString([]byte("non-exist")), at.QueryOpts{}).GetTraceResponse
},
Want: &at.JaegerAPITraceResponse{Data: []at.TracesResponseData{}},
CmpOpts: []cmp.Option{
cmpopts.IgnoreFields(at.JaegerAPITraceResponse{}, "Errors", "Limit", "Offset", "Total"),
},
})
}
16 changes: 8 additions & 8 deletions apptest/vtsingle.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,27 +94,27 @@ func (app *Vtsingle) ForceMerge(t *testing.T) {
// JaegerAPIServices is a test helper function that queries for service list
// by sending an HTTP GET request to /select/jaeger/api/services
// Vtsingle endpoint.
func (app *Vtsingle) JaegerAPIServices(t *testing.T, opts QueryOpts) *JaegerAPIServicesResponse {
func (app *Vtsingle) JaegerAPIServices(t *testing.T, opts QueryOpts) *JaegerQueryResult {
t.Helper()

res, _ := app.cli.Get(t, app.jaegerAPIServicesURL+"?"+opts.asURLValues().Encode())
return NewJaegerAPIServicesResponse(t, res)
return &JaegerQueryResult{GetServicesResponse: NewJaegerAPIServicesResponse(t, res), RespBody: res}
}

// JaegerAPIOperations is a test helper function that queries for operation list of a service
// by sending an HTTP GET request to /select/jaeger/api/services/<service_name>/operations
// Vtsingle endpoint.
func (app *Vtsingle) JaegerAPIOperations(t *testing.T, serviceName string, opts QueryOpts) *JaegerAPIOperationsResponse {
func (app *Vtsingle) JaegerAPIOperations(t *testing.T, serviceName string, opts QueryOpts) *JaegerQueryResult {
t.Helper()

url := fmt.Sprintf(app.jaegerAPIOperationsURL, serviceName) + "?" + opts.asURLValues().Encode()
res, _ := app.cli.Get(t, url)
return NewJaegerAPIOperationsResponse(t, res)
return &JaegerQueryResult{GetOperationsResponse: NewJaegerAPIOperationsResponse(t, res), RespBody: res}
}

// JaegerAPITraces is a test helper function that queries for traces with filter conditions
// by sending an HTTP GET request to /select/jaeger/api/traces Vtsingle endpoint.
func (app *Vtsingle) JaegerAPITraces(t *testing.T, param JaegerQueryParam, opts QueryOpts) *JaegerAPITracesResponse {
func (app *Vtsingle) JaegerAPITraces(t *testing.T, param JaegerQueryParam, opts QueryOpts) *JaegerQueryResult {
t.Helper()

paramsEnc := "?"
Expand All @@ -127,18 +127,18 @@ func (app *Vtsingle) JaegerAPITraces(t *testing.T, param JaegerQueryParam, opts
paramsEnc += uv.Encode()
}
res, _ := app.cli.Get(t, app.jaegerAPITracesURL+paramsEnc)
return NewJaegerAPITracesResponse(t, res)
return &JaegerQueryResult{GetTracesResponse: NewJaegerAPITracesResponse(t, res), RespBody: res}
}

// JaegerAPITrace is a test helper function that queries for a single trace with trace_id
// by sending an HTTP GET request to /select/jaeger/api/traces/<trace_id>
// Vtsingle endpoint.
func (app *Vtsingle) JaegerAPITrace(t *testing.T, traceID string, opts QueryOpts) *JaegerAPITraceResponse {
func (app *Vtsingle) JaegerAPITrace(t *testing.T, traceID string, opts QueryOpts) *JaegerQueryResult {
t.Helper()

url := fmt.Sprintf(app.jaegerAPITraceURL, traceID)
res, _ := app.cli.Get(t, url+"?"+opts.asURLValues().Encode())
return NewJaegerAPITraceResponse(t, res)
return &JaegerQueryResult{GetTraceResponse: NewJaegerAPITraceResponse(t, res), RespBody: res}
}

// JaegerAPIDependencies is a test helper function that queries for the dependencies.
Expand Down
1 change: 1 addition & 0 deletions docs/victoriatraces/changelog/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The following `tip` changes can be tested by building VictoriaTraces components
* [How to build single-node VictoriaTraces](https://docs.victoriametrics.com/victoriatraces/#how-to-build-from-sources)

## tip
* FEATURE: [Single-node VictoriaTraces](https://docs.victoriametrics.com/victoriatraces/) and vtstorage in [VictoriaTraces cluster](https://docs.victoriametrics.com/victoriatraces/cluster/): deny out-of-retention request in vtstorage. See [this issue](https://github.com/VictoriaMetrics/VictoriaTraces/issues/48) for details. .

* FEATURE: improve the scalability of data ingestion on systems with big number of CPU cores. Previously only up to 40 CPU cores were used during logs' ingestion into VictoriaLogs on AMD64 and ARM64 architectures, while the remaining CPU cores were idle. Remove the scalability bottleneck by switching from [musl-based](https://wiki.musl-libc.org/) to [glibc-based](https://en.wikipedia.org/wiki/Glibc) cross-compiler. This improved the data ingestion speed on a host with hundreds of CPU cores by more than 4x. See [#517](https://github.com/VictoriaMetrics/VictoriaLogs/issues/517#issuecomment-3167039079).
* FEATURE: upgrade Go builder from Go1.24.6 to Go1.25.0. See [Go1.25.0 release notes](https://go.dev/doc/go1.25).
Expand Down