77 "slices"
88 "time"
99
10+ "github.com/coder/quartz"
1011 "github.com/go-kit/log"
1112 "github.com/go-kit/log/level"
1213 "github.com/pkg/errors"
@@ -20,8 +21,8 @@ import (
2021
2122// RouterConfig configures sending queries to a separate engine.
2223type RouterConfig struct {
23- // Start and End time range supported by the engine.
24- Start , End time.Time
24+ Start time. Time // Start time of the v2 engine
25+ Lag time.Duration // Lag after which v2 engine has data
2526
2627 // Validate function to check if the query is supported by the engine.
2728 Validate func (params logql.Params ) bool
@@ -38,7 +39,9 @@ type engineReqResp struct {
3839
3940// engineRouter handles splitting queries between V1 and V2 engines
4041type engineRouter struct {
41- v2Start , v2End time.Time // v2 engine time range
42+ v2Start time.Time
43+ v2Lag time.Duration
44+
4245 forMetricQuery bool
4346
4447 v1Next queryrangebase.Handler
@@ -49,6 +52,9 @@ type engineRouter struct {
4952 merger queryrangebase.Merger
5053
5154 logger log.Logger
55+
56+ // Used for tests.
57+ clock quartz.Clock
5258}
5359
5460// newEngineRouterMiddleware creates a middleware that splits and routes part of the query
@@ -67,21 +73,23 @@ func newEngineRouterMiddleware(
6773 return queryrangebase .MiddlewareFunc (func (next queryrangebase.Handler ) queryrangebase.Handler {
6874 return & engineRouter {
6975 v2Start : v2Config .Start ,
70- v2End : v2Config .End ,
76+ v2Lag : v2Config .Lag ,
7177 v1Next : queryrangebase .MergeMiddlewares (v1Chain ... ).Wrap (next ),
7278 v2Next : v2Config .Handler ,
7379 checkV2 : v2Config .Validate ,
7480 merger : merger ,
7581 logger : logger ,
7682 forMetricQuery : metricQuery ,
83+ clock : quartz .NewReal (),
7784 }
7885 })
7986}
8087
8188func (e * engineRouter ) Do (ctx context.Context , r queryrangebase.Request ) (queryrangebase.Response , error ) {
89+ start , end := e .v2Start , e .getEnd ()
8290 // if query is entirely before or after v2 engine range, process using next handler.
8391 // ignore any boundary overlap, splitting requests that fall on bounary would result in tiny requests.
84- if ! r . GetEnd (). After ( e . v2Start ) || ! r . GetStart (). Before ( e . v2End ) {
92+ if ! e . isOverlappingV2range ( r , start , end ) {
8593 return e .v1Next .Do (ctx , r )
8694 }
8795
@@ -95,7 +103,7 @@ func (e *engineRouter) Do(ctx context.Context, r queryrangebase.Request) (queryr
95103 return e .v1Next .Do (ctx , r )
96104 }
97105
98- inputs := e .splitOverlapping (r , e . v2Start , e . v2End )
106+ inputs := e .splitOverlapping (r , start , end )
99107
100108 // for log queries, order the splits to return early on hitting limits.
101109 var limit uint32
@@ -128,6 +136,19 @@ func (e *engineRouter) Do(ctx context.Context, r queryrangebase.Request) (queryr
128136 return e .merger .MergeResponse (responses ... )
129137}
130138
139+ // whether the time range of the request overlaps with the time range of the v2 engine
140+ func (e engineRouter ) isOverlappingV2range (r queryrangebase.Request , start , end time.Time ) bool {
141+ if ! r .GetEnd ().After (start ) || ! r .GetStart ().Before (end ) {
142+ return false
143+ }
144+ return true
145+ }
146+
147+ // the end time of the v2 engine based on current timestamp and v2 engine lag
148+ func (e engineRouter ) getEnd () time.Time {
149+ return e .clock .Now ().UTC ().Add (- e .v2Lag )
150+ }
151+
131152// splitOverlapping breaks down the request into multiple ranges based on the V2 engine time range.
132153// It returns a max of 3 requests:
133154// - one for the range before V2 engine
0 commit comments