From d14263764d401ce8ef089387b3bf29df466031c6 Mon Sep 17 00:00:00 2001 From: Mada Date: Wed, 9 Jul 2025 11:27:20 +0800 Subject: [PATCH 1/3] database: replace go-redis lib with rueidis --- client/client_test.go | 33 +-- go.mod | 17 +- go.sum | 48 ++--- server/bench_test.go | 8 +- storage/cache/database.go | 42 ++-- storage/cache/redis.go | 418 +++++++++++++++++------------------- storage/cache/redis_test.go | 14 +- 7 files changed, 274 insertions(+), 306 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index 43ee0a8f8..a0be34012 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -23,7 +23,7 @@ import ( "time" client "github.com/gorse-io/gorse-go" - "github.com/redis/go-redis/v9" + "github.com/redis/rueidis" "github.com/stretchr/testify/suite" "github.com/zhenghaoz/gorse/storage/cache" ) @@ -37,19 +37,18 @@ const ( type GorseClientTestSuite struct { suite.Suite client *client.GorseClient - redis *redis.Client + redis *rueidis.Client } func (suite *GorseClientTestSuite) SetupSuite() { suite.client = client.NewGorseClient(GorseEndpoint, GorseApiKey) - options, err := redis.ParseURL(RedisEndpoint) + options, err := rueidis.ParseURL(RedisEndpoint) suite.NoError(err) - suite.redis = redis.NewClient(options) + suite.redis = rueidis.NewClient(options) } func (suite *GorseClientTestSuite) TearDownSuite() { - err := suite.redis.Close() - suite.NoError(err) + suite.redis.Close() } func (suite *GorseClientTestSuite) TestFeedback() { @@ -275,15 +274,19 @@ func (suite *GorseClientTestSuite) TestItems() { func (suite *GorseClientTestSuite) hSet(collection, subset string, scores []client.Score) { for _, score := range scores { - err := suite.redis.HSet(context.TODO(), "documents:"+collection+":"+subset+":"+score.Id, - "collection", collection, - "subset", subset, - "id", score.Id, - "score", score.Score, - "is_hidden", 0, - "categories", base64.RawStdEncoding.EncodeToString([]byte("_")), - "timestamp", time.Now().UnixMicro(), - ).Err() + fields := map[string]string{ + "collection": collection, + "subset": subset, + "id": score.Id, + "is_hidden": "0", + "score": strconv.FormatFloat(score.Score, 'f', -1, 64), + "categories": base64.RawStdEncoding.EncodeToString([]byte("_")), + "timestamp": strconv.FormatInt(time.Now().UnixMicro(), 10), + } + cmd := suite.redis.B().Hset().Key("documents:" + collection + ":" + subset + ":" + score.Id). + FieldValue().FieldValueIter(maps.All(fields)). + Build() + err := suite.redis.Do(context.TODO(), cmd).Error() suite.NoError(err) } } diff --git a/go.mod b/go.mod index 238d7e2d8..6100285e6 100644 --- a/go.mod +++ b/go.mod @@ -41,8 +41,8 @@ require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.13.0 github.com/rakyll/statik v0.1.7 - github.com/redis/go-redis/extra/redisotel/v9 v9.5.3 - github.com/redis/go-redis/v9 v9.8.0 + github.com/redis/rueidis v1.0.62 + github.com/redis/rueidis/rueidisotel v1.0.62 github.com/samber/lo v1.38.1 github.com/sashabaranov/go-openai v1.36.1 github.com/schollz/progressbar/v3 v3.17.1 @@ -57,14 +57,14 @@ require ( go.mongodb.org/mongo-driver v1.16.1 go.opentelemetry.io/contrib/instrumentation/github.com/emicklei/go-restful/otelrestful v0.36.4 go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.55.0 - go.opentelemetry.io/otel v1.31.0 + go.opentelemetry.io/otel v1.35.0 go.opentelemetry.io/otel/exporters/jaeger v1.11.1 go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.31.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0 go.opentelemetry.io/otel/exporters/zipkin v1.11.1 - go.opentelemetry.io/otel/sdk v1.31.0 - go.opentelemetry.io/otel/trace v1.31.0 + go.opentelemetry.io/otel/sdk v1.35.0 + go.opentelemetry.io/otel/trace v1.35.0 go.uber.org/atomic v1.11.0 go.uber.org/zap v1.27.0 golang.org/x/exp v0.0.0-20250305212735-054e65f0b394 @@ -92,7 +92,6 @@ require ( github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect - github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/dlclark/regexp2 v1.11.5 // indirect github.com/dustin/go-humanize v1.0.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -139,7 +138,6 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect - github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect github.com/rivo/uniseg v0.4.7 // indirect github.com/sagikazarmark/locafero v0.7.0 // indirect @@ -153,12 +151,13 @@ require ( github.com/xdg-go/scram v1.1.2 // indirect github.com/xdg-go/stringprep v1.0.4 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 // indirect - go.opentelemetry.io/otel/metric v1.31.0 // indirect + go.opentelemetry.io/otel/metric v1.35.0 // indirect go.opentelemetry.io/proto/otlp v1.3.1 // indirect go.uber.org/multierr v1.10.0 // indirect golang.org/x/crypto v0.36.0 // indirect - golang.org/x/net v0.37.0 // indirect + golang.org/x/net v0.38.0 // indirect golang.org/x/sync v0.12.0 // indirect golang.org/x/term v0.30.0 // indirect golang.org/x/text v0.23.0 // indirect diff --git a/go.sum b/go.sum index 0eed7b66a..4ea097f68 100644 --- a/go.sum +++ b/go.sum @@ -57,10 +57,6 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4= github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= -github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= -github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= -github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= -github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= github.com/cenkalti/backoff/v5 v5.0.2 h1:rIfFVxEf1QsI7E1ZHfp/B4DF/6QBAUhmgkxc0H7Zss8= @@ -218,8 +214,8 @@ github.com/google/go-cmp v0.5.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -410,8 +406,8 @@ github.com/nikolalohinski/gonja/v2 v2.3.3/go.mod h1:8KC3RlefxnOaY5P4rH5erdwV0/ow github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.20.1 h1:YlVIbqct+ZmnEph770q9Q7NVAz4wwIiVNahee6JyUzo= github.com/onsi/ginkgo/v2 v2.20.1/go.mod h1:lG9ey2Z29hR41WMVthyJBGUBcBhGOtoPF2VFMvBXFCI= -github.com/onsi/gomega v1.35.1 h1:Cwbd75ZBPxFSuZ6T+rN/WCb/gOc6YgFBXLlZLhC7Ds4= -github.com/onsi/gomega v1.35.1/go.mod h1:PvZbdDc8J6XJEpDK4HCuRBm8a6Fzp9/DmhC9C7yFlog= +github.com/onsi/gomega v1.36.2 h1:koNYke6TVk6ZmnyHrCXba/T/MoLBXFjeC1PtvYgw0A8= +github.com/onsi/gomega v1.36.2/go.mod h1:DdwyADRjrc825LhMEkD76cHR5+pUnjhUN8GlHlRPHzY= github.com/openzipkin/zipkin-go v0.4.1 h1:kNd/ST2yLLWhaWrkgchya40TJabe8Hioj9udfPcEO5A= github.com/openzipkin/zipkin-go v0.4.1/go.mod h1:qY0VqDSN1pOBN94dBc6w2GJlWLiovAyg7Qt6/I9HecM= github.com/orcaman/concurrent-map v1.0.0 h1:I/2A2XPCb4IuQWcQhBhSwGfiuybl/J0ev9HDbW65HOY= @@ -455,12 +451,10 @@ github.com/prometheus/procfs v0.8.0 h1:ODq8ZFEaYeCaZOJlZZdJA2AbQR98dSHSM1KW/You5 github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0uaxHdg830/4= github.com/rakyll/statik v0.1.7 h1:OF3QCZUuyPxuGEP7B4ypUa7sB/iHtqOTDYZXGM8KOdQ= github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Unghqrcc= -github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3 h1:1/BDligzCa40GTllkDnY3Y5DTHuKCONbB2JcRyIfl20= -github.com/redis/go-redis/extra/rediscmd/v9 v9.5.3/go.mod h1:3dZmcLn3Qw6FLlWASn1g4y+YO9ycEFUOM+bhBmzLVKQ= -github.com/redis/go-redis/extra/redisotel/v9 v9.5.3 h1:kuvuJL/+MZIEdvtb/kTBRiRgYaOmx1l+lYJyVdrRUOs= -github.com/redis/go-redis/extra/redisotel/v9 v9.5.3/go.mod h1:7f/FMrf5RRRVHXgfk7CzSVzXHiWeuOQUu2bsVqWoa+g= -github.com/redis/go-redis/v9 v9.8.0 h1:q3nRvjrlge/6UD7eTu/DSg2uYiU2mCL0G/uzBWqhicI= -github.com/redis/go-redis/v9 v9.8.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/redis/rueidis v1.0.62 h1:9yNCxsYtg9eMEzHhDq9tlRnDBFJyWTWn6YLQ5EWDE5I= +github.com/redis/rueidis v1.0.62/go.mod h1:Lkhr2QTgcoYBhxARU7kJRO8SyVlgUuEkcJO1Y8MCluA= +github.com/redis/rueidis/rueidisotel v1.0.62 h1:NmHS+t11OfzVC7r2SUq1FumBwhMdITQV8zOMj8ju5Do= +github.com/redis/rueidis/rueidisotel v1.0.62/go.mod h1:pwdhj54C3Hxw0Gyl3vYwf/BWLEnNb7VIwjzi+vk7c80= github.com/remyoudompheng/bigfft v0.0.0-20200410134404-eec4a21b6bb0/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= @@ -554,6 +548,8 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/contrib/instrumentation/github.com/emicklei/go-restful/otelrestful v0.36.4 h1:9nyzSapoM0ZYuZsHjZColmVm6tCKJo1boLQ7NEDReo8= go.opentelemetry.io/contrib/instrumentation/github.com/emicklei/go-restful/otelrestful v0.36.4/go.mod h1:FW//LC5AQiDTWo1Jz9So0UMlsm2xT/vJf8UiHytP0FI= go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.55.0 h1:Rsm/r0H30wFPYRe5AQLIdOP0l7aSQyc5sSjPePMCEsw= @@ -562,8 +558,8 @@ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0 h1:ZIg3ZT/ go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.55.0/go.mod h1:DQAwmETtZV00skUwgD6+0U89g80NKsJE3DCKeLLPQMI= go.opentelemetry.io/contrib/propagators/b3 v1.11.1 h1:icQ6ttRV+r/2fnU46BIo/g/mPu6Rs5Ug8Rtohe3KqzI= go.opentelemetry.io/contrib/propagators/b3 v1.11.1/go.mod h1:ECIveyMXgnl4gorxFcA7RYjJY/Ql9n20ubhbfDc3QfA= -go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= -go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= +go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= +go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= go.opentelemetry.io/otel/exporters/jaeger v1.11.1 h1:F9Io8lqWdGyIbY3/SOGki34LX/l+7OL0gXNxjqwcbuQ= go.opentelemetry.io/otel/exporters/jaeger v1.11.1/go.mod h1:lRa2w3bQ4R4QN6zYsDgy7tEezgoKEu7Ow2g35Y75+KI= go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.31.0 h1:K0XaT3DwHAcV4nKLzcQvwAgSyisUghWoY20I7huthMk= @@ -574,14 +570,14 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0 h1:lUsI2 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.31.0/go.mod h1:2HpZxxQurfGxJlJDblybejHB6RX6pmExPNe517hREw4= go.opentelemetry.io/otel/exporters/zipkin v1.11.1 h1:JlJ3/oQoyqlrPDCfsSVFcHgGeHvZq+hr1VPWtiYCXTo= go.opentelemetry.io/otel/exporters/zipkin v1.11.1/go.mod h1:T4S6aVwIS1+MHA+dJHCcPROtZe6ORwnv5vMKPRapsFw= -go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= -go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= -go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= -go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= -go.opentelemetry.io/otel/sdk/metric v1.31.0 h1:i9hxxLJF/9kkvfHppyLL55aW7iIJz4JjxTeYusH7zMc= -go.opentelemetry.io/otel/sdk/metric v1.31.0/go.mod h1:CRInTMVvNhUKgSAMbKyTMxqOBC0zgyxzW55lZzX43Y8= -go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= -go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= +go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= +go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= +go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= +go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= +go.opentelemetry.io/otel/sdk/metric v1.35.0 h1:1RriWBmCKgkeHEhM7a2uMjMUfP7MsOF5JpUCaEqEI9o= +go.opentelemetry.io/otel/sdk/metric v1.35.0/go.mod h1:is6XYCUMpcKi+ZsOvfluY5YstFnhW0BidkR+gL+qN+w= +go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= +go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= go.opentelemetry.io/proto/otlp v1.3.1 h1:TrMUixzpM0yuc/znrFTP9MMRh8trP93mkCiDVeXrui0= go.opentelemetry.io/proto/otlp v1.3.1/go.mod h1:0X1WI4de4ZsLrrJNLAQbFeLCm3T7yBkR0XqQ7niQU+8= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -695,8 +691,8 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= -golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= +golang.org/x/net v0.38.0 h1:vRMAPTMaeGqVhG5QyLJHqNDwecKTomGeqbnfZyKlBI8= +golang.org/x/net v0.38.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= diff --git a/server/bench_test.go b/server/bench_test.go index b72945b55..fea513d55 100644 --- a/server/bench_test.go +++ b/server/bench_test.go @@ -31,7 +31,7 @@ import ( "github.com/emicklei/go-restful/v3" "github.com/go-resty/resty/v2" - "github.com/redis/go-redis/v9" + "github.com/redis/rueidis" "github.com/samber/lo" "github.com/stretchr/testify/require" "github.com/zhenghaoz/gorse/base/log" @@ -222,10 +222,10 @@ func (s *benchServer) prepareData(b *testing.B, url, benchName string) string { func (s *benchServer) prepareCache(b *testing.B, url, benchName string) string { dbName := "gorse_cache_" + benchName if strings.HasPrefix(url, "redis://") { - opt, err := redis.ParseURL(url) + opt, err := rueidis.ParseURL(url) require.NoError(b, err) - cli := redis.NewClient(opt) - require.NoError(b, cli.FlushDB(context.Background()).Err()) + cli := rueidis.NewClient(opt) + require.NoError(b, cli.Do(context.Background(), redisClient.client.B().Flushdb().Build()).Error()) return url } else if strings.HasPrefix(url, "mongodb://") { ctx := context.Background() diff --git a/storage/cache/database.go b/storage/cache/database.go index 752f9c942..0174bdf82 100644 --- a/storage/cache/database.go +++ b/storage/cache/database.go @@ -26,8 +26,8 @@ import ( "github.com/XSAM/otelsql" "github.com/araddon/dateparse" "github.com/juju/errors" - "github.com/redis/go-redis/extra/redisotel/v9" - "github.com/redis/go-redis/v9" + "github.com/redis/rueidis" + "github.com/redis/rueidis/rueidisotel" "github.com/samber/lo" "github.com/zhenghaoz/gorse/base/log" "github.com/zhenghaoz/gorse/storage" @@ -36,7 +36,6 @@ import ( "go.mongodb.org/mongo-driver/x/mongo/driver/connstring" "go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo" semconv "go.opentelemetry.io/otel/semconv/v1.8.0" - "go.uber.org/zap" "gorm.io/driver/mysql" "gorm.io/driver/postgres" "gorm.io/driver/sqlite" @@ -287,39 +286,30 @@ type Database interface { // Open a connection to a database. func Open(path, tablePrefix string, opts ...storage.Option) (Database, error) { var err error - if strings.HasPrefix(path, storage.RedisPrefix) || strings.HasPrefix(path, storage.RedissPrefix) { - opt, err := redis.ParseURL(path) - if err != nil { - return nil, err - } - opt.Protocol = 2 - database := new(Redis) - database.client = redis.NewClient(opt) - database.TablePrefix = storage.TablePrefix(tablePrefix) - if err = redisotel.InstrumentTracing(database.client, redisotel.WithAttributes(semconv.DBSystemRedis)); err != nil { - log.Logger().Error("failed to add tracing for redis", zap.Error(err)) - return nil, errors.Trace(err) - } - return database, nil - } else if strings.HasPrefix(path, storage.RedisClusterPrefix) || strings.HasPrefix(path, storage.RedissClusterPrefix) { - var newURL string + if strings.HasPrefix(path, storage.RedisPrefix) || strings.HasPrefix(path, storage.RedissPrefix) || + strings.HasPrefix(path, storage.RedisClusterPrefix) || strings.HasPrefix(path, storage.RedissClusterPrefix) { + // rueidis treat cluster and normal client as the same, so replace all cluster prefix with the normal one + newURL := path if strings.HasPrefix(path, storage.RedisClusterPrefix) { newURL = strings.Replace(path, storage.RedisClusterPrefix, storage.RedisPrefix, 1) } else if strings.HasPrefix(path, storage.RedissClusterPrefix) { newURL = strings.Replace(path, storage.RedissClusterPrefix, storage.RedissPrefix, 1) } - opt, err := redis.ParseClusterURL(newURL) + if strings.HasSuffix(newURL, "/") { + // rueidis not allow empty db in the path, so we have to add a default one to make old conf happy + newURL = newURL + "0" + } + opt, err := rueidis.ParseURL(newURL) if err != nil { return nil, err } - opt.Protocol = 2 database := new(Redis) - database.client = redis.NewClusterClient(opt) - database.TablePrefix = storage.TablePrefix(tablePrefix) - if err = redisotel.InstrumentTracing(database.client, redisotel.WithAttributes(semconv.DBSystemRedis)); err != nil { - log.Logger().Error("failed to add tracing for redis", zap.Error(err)) - return nil, errors.Trace(err) + database.client, err = rueidis.NewClient(opt) + if err != nil { + return nil, err } + database.TablePrefix = storage.TablePrefix(tablePrefix) + database.client = rueidisotel.WithClient(database.client, rueidisotel.TraceAttrs(semconv.DBSystemRedis)) return database, nil } else if strings.HasPrefix(path, storage.MongoPrefix) || strings.HasPrefix(path, storage.MongoSrvPrefix) { // connect to database diff --git a/storage/cache/redis.go b/storage/cache/redis.go index c61838e4a..834cf4aa9 100644 --- a/storage/cache/redis.go +++ b/storage/cache/redis.go @@ -19,12 +19,13 @@ import ( "encoding/base64" "fmt" "io" + "maps" "strconv" "strings" "time" "github.com/juju/errors" - "github.com/redis/go-redis/v9" + "github.com/redis/rueidis" "github.com/samber/lo" "github.com/zhenghaoz/gorse/common/util" "github.com/zhenghaoz/gorse/storage" @@ -33,41 +34,41 @@ import ( // Redis cache storage. type Redis struct { storage.TablePrefix - client redis.UniversalClient + client rueidis.Client } // Close redis connection. func (r *Redis) Close() error { - return r.client.Close() + r.client.Close() + return nil } func (r *Redis) Ping() error { - return r.client.Ping(context.Background()).Err() + return r.client.Do(context.Background(), r.client.B().Ping().Build()).Error() } // Init nothing. func (r *Redis) Init() error { // list indices - indices, err := r.client.FT_List(context.Background()).Result() - if err != nil { + indices, err := r.client.Do(context.Background(), r.client.B().FtList().Build()).AsStrSlice() + if err != nil && !rueidis.IsRedisNil(err) { return errors.Trace(err) } // create index if !lo.Contains(indices, r.DocumentTable()) { - _, err = r.client.FTCreate(context.TODO(), r.DocumentTable(), - &redis.FTCreateOptions{ - OnHash: true, - Prefix: []any{r.DocumentTable() + ":"}, - }, - &redis.FieldSchema{FieldName: "collection", FieldType: redis.SearchFieldTypeTag}, - &redis.FieldSchema{FieldName: "subset", FieldType: redis.SearchFieldTypeTag}, - &redis.FieldSchema{FieldName: "id", FieldType: redis.SearchFieldTypeTag}, - &redis.FieldSchema{FieldName: "score", FieldType: redis.SearchFieldTypeNumeric, Sortable: true}, - &redis.FieldSchema{FieldName: "is_hidden", FieldType: redis.SearchFieldTypeNumeric}, - &redis.FieldSchema{FieldName: "categories", FieldType: redis.SearchFieldTypeTag, Separator: ";"}, - &redis.FieldSchema{FieldName: "timestamp", FieldType: redis.SearchFieldTypeNumeric, Sortable: true}, - ).Result() - if err != nil { + cmd := r.client.B().FtCreate().Index(r.DocumentTable()). + OnHash(). + Prefix(1).Prefix(r.DocumentTable() + ":"). + Schema(). + FieldName("collection").Tag(). + FieldName("subset").Tag(). + FieldName("id").Tag(). + FieldName("score").Numeric().Sortable(). + FieldName("is_hidden").Numeric(). + FieldName("categories").Tag().Separator(";"). + FieldName("timestamp").Numeric().Sortable(). + Build() + if err := r.client.Do(context.Background(), cmd).Error(); err != nil { return errors.Trace(err) } } @@ -75,32 +76,27 @@ func (r *Redis) Init() error { } func (r *Redis) Scan(work func(string) error) error { - ctx := context.Background() - if clusterClient, isCluster := r.client.(*redis.ClusterClient); isCluster { - return clusterClient.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error { - return r.scan(ctx, client, work) - }) - } else { - return r.scan(ctx, r.client, work) - } + return r.scan(context.Background(), r.client, string(r.TablePrefix)+"*", work) } -func (r *Redis) scan(ctx context.Context, client redis.UniversalClient, work func(string) error) error { - var ( - result []string - cursor uint64 - err error - ) +func (r *Redis) scan(ctx context.Context, client rueidis.Client, pattern string, work func(string) error) error { + var cursor uint64 for { - result, cursor, err = client.Scan(ctx, cursor, string(r.TablePrefix)+"*", 0).Result() + cmd := client.B().Scan().Cursor(cursor).Match(pattern).Count(100).Build() + resp := client.Do(ctx, cmd) + if err := resp.Error(); err != nil { + return errors.Trace(err) + } + e, err := resp.AsScanEntry() if err != nil { return errors.Trace(err) } - for _, key := range result { + for _, key := range e.Elements { if err = work(key[len(r.TablePrefix):]); err != nil { return errors.Trace(err) } } + cursor = e.Cursor if cursor == 0 { return nil } @@ -109,65 +105,36 @@ func (r *Redis) scan(ctx context.Context, client redis.UniversalClient, work fun func (r *Redis) Purge() error { ctx := context.Background() - if clusterClient, isCluster := r.client.(*redis.ClusterClient); isCluster { - return clusterClient.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error { - return r.purge(ctx, client, isCluster) - }) - } else { - return r.purge(ctx, r.client, isCluster) - } -} - -func (r *Redis) purge(ctx context.Context, client redis.UniversalClient, isCluster bool) error { - var ( - result []string - cursor uint64 - err error - ) - for { - result, cursor, err = client.Scan(ctx, cursor, string(r.TablePrefix)+"*", 0).Result() - if err != nil { - return errors.Trace(err) - } - if len(result) > 0 { - if isCluster { - p := client.Pipeline() - for _, key := range result { - if err = p.Del(ctx, key).Err(); err != nil { - return errors.Trace(err) - } - } - if _, err = p.Exec(ctx); err != nil { - return errors.Trace(err) - } - } else { - if err = client.Del(ctx, result...).Err(); err != nil { - return errors.Trace(err) - } - } - } - if cursor == 0 { - return nil - } - } + return r.scan(ctx, r.client, string(r.TablePrefix)+"*", func(key string) error { + cmd := r.client.B().Del().Key(string(r.TablePrefix) + key).Build() + return r.client.Do(ctx, cmd).Error() + }) } func (r *Redis) Set(ctx context.Context, values ...Value) error { - p := r.client.Pipeline() + cmds := make(rueidis.Commands, 0, len(values)) for _, v := range values { - if err := p.Set(ctx, r.Key(v.name), v.value, 0).Err(); err != nil { + cmds = append(cmds, r.client.B().Set().Key(r.Key(v.name)).Value(v.value).Build()) + } + for _, resp := range r.client.DoMulti(ctx, cmds...) { + if err := resp.Error(); err != nil { return errors.Trace(err) } } - _, err := p.Exec(ctx) - return errors.Trace(err) + return nil } // Get returns a value from Redis. func (r *Redis) Get(ctx context.Context, key string) *ReturnValue { - val, err := r.client.Get(ctx, r.Key(key)).Result() + cmd := r.client.B().Get().Key(r.Key(key)).Cache() + res := r.client.DoCache(ctx, cmd, time.Minute) + if res.IsCacheHit() && rueidis.IsRedisNil(res.Error()) { + // Cache miss, try fetching from redis directly + res = r.client.Do(ctx, r.client.B().Get().Key(r.Key(key)).Build()) + } + val, err := res.ToString() if err != nil { - if err == redis.Nil { + if rueidis.IsRedisNil(err) { return &ReturnValue{err: errors.Annotate(ErrObjectNotExist, key)} } return &ReturnValue{err: err} @@ -177,12 +144,14 @@ func (r *Redis) Get(ctx context.Context, key string) *ReturnValue { // Delete object from Redis. func (r *Redis) Delete(ctx context.Context, key string) error { - return r.client.Del(ctx, r.Key(key)).Err() + cmd := r.client.B().Del().Key(r.Key(key)).Build() + return r.client.Do(ctx, cmd).Error() } // GetSet returns members of a set from Redis. func (r *Redis) GetSet(ctx context.Context, key string) ([]string, error) { - return r.client.SMembers(ctx, r.Key(key)).Result() + cmd := r.client.B().Smembers().Key(r.Key(key)).Cache() + return r.client.DoCache(ctx, cmd, time.Minute).AsStrSlice() } // SetSet overrides a set with members in Redis. @@ -190,17 +159,15 @@ func (r *Redis) SetSet(ctx context.Context, key string, members ...string) error if len(members) == 0 { return nil } - // convert strings to interfaces - values := make([]interface{}, 0, len(members)) - for _, member := range members { - values = append(values, member) - } - // push set - pipeline := r.client.Pipeline() - pipeline.Del(ctx, r.Key(key)) - pipeline.SAdd(ctx, r.Key(key), values...) - _, err := pipeline.Exec(ctx) - return err + cmds := make(rueidis.Commands, 0, 2) + cmds = append(cmds, r.client.B().Del().Key(r.Key(key)).Build()) + cmds = append(cmds, r.client.B().Sadd().Key(r.Key(key)).Member(members...).Build()) + for _, resp := range r.client.DoMulti(ctx, cmds...) { + if err := resp.Error(); err != nil { + return err + } + } + return nil } // AddSet adds members to a set in Redis. @@ -208,13 +175,8 @@ func (r *Redis) AddSet(ctx context.Context, key string, members ...string) error if len(members) == 0 { return nil } - // convert strings to interfaces - values := make([]interface{}, 0, len(members)) - for _, member := range members { - values = append(values, member) - } - // push set - return r.client.SAdd(ctx, r.Key(key), values...).Err() + cmd := r.client.B().Sadd().Key(r.Key(key)).Member(members...).Build() + return r.client.Do(ctx, cmd).Error() } // RemSet removes members from a set in Redis. @@ -222,27 +184,31 @@ func (r *Redis) RemSet(ctx context.Context, key string, members ...string) error if len(members) == 0 { return nil } - return r.client.SRem(ctx, r.Key(key), members).Err() + cmd := r.client.B().Srem().Key(r.Key(key)).Member(members...).Build() + return r.client.Do(ctx, cmd).Error() } func (r *Redis) Push(ctx context.Context, name string, message string) error { - _, err := r.client.ZAdd(ctx, r.Key(name), redis.Z{Member: message, Score: float64(time.Now().UnixNano())}).Result() - return err + cmd := r.client.B().Zadd().Key(r.Key(name)). + ScoreMember().ScoreMember(float64(time.Now().UnixNano()), message).Build() + return r.client.Do(ctx, cmd).Error() } func (r *Redis) Pop(ctx context.Context, name string) (string, error) { - z, err := r.client.ZPopMin(ctx, r.Key(name), 1).Result() + cmd := r.client.B().Zpopmin().Key(r.Key(name)).Count(1).Build() + z, err := r.client.Do(ctx, cmd).AsZScores() if err != nil { return "", errors.Trace(err) } if len(z) == 0 { return "", io.EOF } - return z[0].Member.(string), nil + return z[0].Member, nil } func (r *Redis) Remain(ctx context.Context, name string) (int64, error) { - return r.client.ZCard(ctx, r.Key(name)).Result() + cmd := r.client.B().Zcard().Key(r.Key(name)).Build() + return r.client.Do(ctx, cmd).AsInt64() } func (r *Redis) documentKey(collection, subset, value string) string { @@ -250,19 +216,31 @@ func (r *Redis) documentKey(collection, subset, value string) string { } func (r *Redis) AddScores(ctx context.Context, collection, subset string, documents []Score) error { - p := r.client.Pipeline() + cmds := make(rueidis.Commands, 0, len(documents)) for _, document := range documents { - p.HSet(ctx, r.documentKey(collection, subset, document.Id), - "collection", collection, - "subset", subset, - "id", document.Id, - "score", document.Score, - "is_hidden", document.IsHidden, - "categories", encodeCategories(document.Categories), - "timestamp", document.Timestamp.UnixMicro()) - } - _, err := p.Exec(ctx) - return errors.Trace(err) + key := r.documentKey(collection, subset, document.Id) + fields := map[string]string{ + "collection": collection, + "subset": subset, + "id": document.Id, + "score": strconv.FormatFloat(document.Score, 'f', -1, 64), + "categories": encodeCategories(document.Categories), + "timestamp": strconv.FormatInt(document.Timestamp.UnixMicro(), 10), + } + if document.IsHidden { + fields["is_hidden"] = "1" + } else { + fields["is_hidden"] = "0" + } + cmd := r.client.B().Hset().Key(key).FieldValue().FieldValueIter(maps.All(fields)).Build() + cmds = append(cmds, cmd) + } + for _, resp := range r.client.DoMulti(ctx, cmds...) { + if err := resp.Error(); err != nil { + return errors.Trace(err) + } + } + return nil } func (r *Redis) SearchScores(ctx context.Context, collection, subset string, query []string, begin, end int) ([]Score, error) { @@ -274,39 +252,40 @@ func (r *Redis) SearchScores(ctx context.Context, collection, subset string, que for _, q := range query { builder.WriteString(fmt.Sprintf(" @categories:{ %s }", escape(encdodeCategory(q)))) } - options := &redis.FTSearchOptions{ - SortBy: []redis.FTSearchSortBy{{FieldName: "score", Desc: true}}, - LimitOffset: begin, - } - if end == -1 { - options.Limit = 10000 - } else { - options.Limit = end - begin + limit := int64(end - begin) + if limit < 0 { + limit = 10000 } - result, err := r.client.FTSearchWithArgs(ctx, r.DocumentTable(), builder.String(), options).Result() + + cmd := r.client.B().FtSearch(). + Index(r.DocumentTable()). + Query(builder.String()). + Sortby("score").Desc().Limit().OffsetNum(int64(begin), limit). + Build() + _, docs, err := r.client.Do(ctx, cmd).AsFtSearch() if err != nil { return nil, errors.Trace(err) } - documents := make([]Score, 0, len(result.Docs)) - for _, doc := range result.Docs { + documents := make([]Score, 0, len(docs)) + for _, doc := range docs { var document Score - document.Id = doc.Fields["id"] - score, err := strconv.ParseFloat(doc.Fields["score"], 64) + document.Id = doc.Doc["id"] + score, err := strconv.ParseFloat(doc.Doc["score"], 64) if err != nil { return nil, errors.Trace(err) } document.Score = score - isHidden, err := strconv.ParseInt(doc.Fields["is_hidden"], 10, 64) + isHidden, err := strconv.ParseInt(doc.Doc["is_hidden"], 10, 64) if err != nil { return nil, errors.Trace(err) } document.IsHidden = isHidden != 0 - categories, err := decodeCategories(doc.Fields["categories"]) + categories, err := decodeCategories(doc.Doc["categories"]) if err != nil { return nil, errors.Trace(err) } document.Categories = categories - timestamp, err := strconv.ParseInt(doc.Fields["timestamp"], 10, 64) + timestamp, err := strconv.ParseInt(doc.Doc["timestamp"], 10, 64) if err != nil { return nil, errors.Trace(err) } @@ -329,44 +308,47 @@ func (r *Redis) UpdateScores(ctx context.Context, collections []string, subset * if subset != nil { builder.WriteString(fmt.Sprintf(" @subset:{ %s }", escape(*subset))) } + offset := int64(0) + limit := int64(10000) for { - // search documents - result, err := r.client.FTSearchWithArgs(ctx, r.DocumentTable(), builder.String(), &redis.FTSearchOptions{ - SortBy: []redis.FTSearchSortBy{{FieldName: "score", Desc: true}}, - LimitOffset: 0, - Limit: 10000, - }).Result() + cmd := r.client.B().FtSearch(). + Index(r.DocumentTable()). + Query(builder.String()). + Sortby("score").Desc().Limit().OffsetNum(offset, limit). + Build() + _, result, err := r.client.Do(ctx, cmd).AsFtSearch() if err != nil { return errors.Trace(err) } // update documents - for _, doc := range result.Docs { - key := doc.ID - values := make([]any, 0) + for _, doc := range result { + key := doc.Key + values := make(map[string]string) if patch.Score != nil { - values = append(values, "score", *patch.Score) + values["score"] = strconv.FormatFloat(*patch.Score, 'f', -1, 64) } if patch.IsHidden != nil { - values = append(values, "is_hidden", *patch.IsHidden) + if *patch.IsHidden { + values["is_hidden"] = "1" + } else { + values["is_hidden"] = "0" + } } if patch.Categories != nil { - values = append(values, "categories", encodeCategories(patch.Categories)) + values["categories"] = encodeCategories(patch.Categories) } - if err = r.client.Watch(ctx, func(tx *redis.Tx) error { - if exist, err := tx.Exists(ctx, key).Result(); err != nil { - return err - } else if exist == 0 { - return nil + if len(values) > 0 { + cmd := r.client.B().Hset().Key(key).FieldValue().FieldValueIter(maps.All(values)).Build() + if err := r.client.Do(ctx, cmd).Error(); err != nil { + return errors.Trace(err) } - return tx.HSet(ctx, key, values...).Err() - }, key); err != nil { - return errors.Trace(err) } } // break if no more documents - if result.Total <= len(result.Docs) { + if len(result) < int(limit) { break } + offset += limit } return nil } @@ -386,99 +368,105 @@ func (r *Redis) DeleteScores(ctx context.Context, collections []string, conditio if condition.Before != nil { builder.WriteString(fmt.Sprintf(" @timestamp:[-inf (%d]", condition.Before.UnixMicro())) } + offset := int64(0) + limit := int64(10000) for { - // search documents - result, err := r.client.FTSearchWithArgs(ctx, r.DocumentTable(), builder.String(), &redis.FTSearchOptions{ - SortBy: []redis.FTSearchSortBy{{FieldName: "score", Desc: true}}, - LimitOffset: 0, - Limit: 10000, - }).Result() + cmd := r.client.B().FtSearch(). + Index(r.DocumentTable()). + Query(builder.String()). + Sortby("score").Desc().Limit().OffsetNum(offset, limit). + Build() + resp := r.client.Do(ctx, cmd) + _, result, err := resp.AsFtSearch() if err != nil { return errors.Trace(err) } // delete documents - p := r.client.Pipeline() - for _, doc := range result.Docs { - p.Del(ctx, doc.ID) - } - _, err = p.Exec(ctx) - if err != nil { - return errors.Trace(err) + if len(result) > 0 { + keys := make([]string, len(result)) + for i, doc := range result { + keys[i] = doc.Key + } + cmd := r.client.B().Del().Key(keys...).Build() + if err := r.client.Do(ctx, cmd).Error(); err != nil { + return err + } } // break if no more documents - if result.Total == len(result.Docs) { + if len(result) < int(limit) { break } + offset += limit } return nil } func (r *Redis) ScanScores(ctx context.Context, callback func(collection string, id string, subset string, timestamp time.Time) error) error { - if clusterClient, isCluster := r.client.(*redis.ClusterClient); isCluster { - return clusterClient.ForEachMaster(ctx, func(ctx context.Context, client *redis.Client) error { - return r.scanScores(ctx, client, callback) - }) - } else { - return r.scanScores(ctx, r.client, callback) - } -} - -func (r *Redis) scanScores(ctx context.Context, client redis.UniversalClient, callback func(collection string, id string, subset string, timestamp time.Time) error) error { - var ( - result []string - cursor uint64 - err error - ) - for { - result, cursor, err = client.Scan(ctx, cursor, r.DocumentTable()+"*", 0).Result() + return r.scan(ctx, r.client, r.DocumentTable()+"*", func(key string) error { + cmd := r.client.B().Hgetall().Key(string(r.TablePrefix) + key).Build() + row, err := r.client.Do(ctx, cmd).AsStrMap() if err != nil { return errors.Trace(err) } - for _, key := range result { - var row map[string]string - row, err = client.HGetAll(ctx, key).Result() - if err != nil { - return errors.Trace(err) - } - var usec int64 - usec, err = util.ParseInt[int64](row["timestamp"]) - if err != nil { - return errors.Trace(err) - } - if err = callback(row["collection"], row["id"], row["subset"], time.UnixMicro(usec).In(time.UTC)); err != nil { - return errors.Trace(err) - } + usec, err := util.ParseInt[int64](row["timestamp"]) + if err != nil { + return errors.Trace(err) } - if cursor == 0 { - return nil + if err = callback(row["collection"], row["id"], row["subset"], time.UnixMicro(usec).In(time.UTC)); err != nil { + return errors.Trace(err) } - } + return nil + }) } func (r *Redis) AddTimeSeriesPoints(ctx context.Context, points []TimeSeriesPoint) error { - p := r.client.Pipeline() - opt := &redis.TSOptions{DuplicatePolicy: "LAST"} + cmds := make(rueidis.Commands, 0, len(points)) for _, point := range points { - if err := p.TSAddWithArgs(ctx, r.PointsTable()+":"+point.Name, point.Timestamp.UnixMilli(), point.Value, opt).Err(); err != nil { + key := r.PointsTable() + ":" + point.Name + ts := strconv.FormatInt(point.Timestamp.UnixMilli(), 10) + cmd := r.client.B().TsAdd().Key(key).Timestamp(ts).Value(point.Value). + OnDuplicateLast().Build() + cmds = append(cmds, cmd) + } + for _, resp := range r.client.DoMulti(ctx, cmds...) { + if err := resp.Error(); err != nil { return errors.Trace(err) } } - _, err := p.Exec(ctx) - return errors.Trace(err) + return nil } func (r *Redis) GetTimeSeriesPoints(ctx context.Context, name string, begin, end time.Time, duration time.Duration) ([]TimeSeriesPoint, error) { - result, err := r.client.TSRangeWithArgs(ctx, r.PointsTable()+":"+name, int(begin.UnixMilli()), int(end.UnixMilli()), - &redis.TSRangeOptions{Aggregator: redis.Last, BucketDuration: int(duration / time.Millisecond)}).Result() + key := r.PointsTable() + ":" + name + begin_ts := strconv.FormatInt(begin.UnixMilli(), 10) + end_ts := strconv.FormatInt(end.UnixMilli(), 10) + cmd := r.client.B().TsRange().Key(key). + Fromtimestamp(begin_ts). + Totimestamp(end_ts). + AggregationLast().Bucketduration(int64(duration / time.Millisecond)). + Build() + result, err := r.client.Do(ctx, cmd).ToArray() if err != nil { return nil, errors.Trace(err) } points := make([]TimeSeriesPoint, 0, len(result)) for _, doc := range result { var point TimeSeriesPoint + msg, err := doc.ToArray() + if err != nil { + return nil, errors.Trace(err) + } + tstmp, err := msg[0].AsInt64() + if err != nil { + return nil, errors.Trace(err) + } + val, err := msg[1].AsFloat64() + if err != nil { + return nil, errors.Trace(err) + } point.Name = name - point.Value = doc.Value - point.Timestamp = time.UnixMilli(doc.Timestamp).UTC() + point.Value = val + point.Timestamp = time.UnixMilli(tstmp).UTC() points = append(points, point) } return points, nil diff --git a/storage/cache/redis_test.go b/storage/cache/redis_test.go index 089030bf2..7044a44f6 100644 --- a/storage/cache/redis_test.go +++ b/storage/cache/redis_test.go @@ -22,7 +22,6 @@ import ( "testing" "time" - "github.com/redis/go-redis/v9" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" "github.com/zhenghaoz/gorse/base/log" @@ -55,15 +54,8 @@ func (suite *RedisTestSuite) SetupSuite() { // flush db redisClient, ok := suite.Database.(*Redis) suite.True(ok) - if clusterClient, ok := redisClient.client.(*redis.ClusterClient); ok { - err = clusterClient.ForEachMaster(context.Background(), func(ctx context.Context, client *redis.Client) error { - return client.FlushDB(ctx).Err() - }) - suite.NoError(err) - } else { - err = redisClient.client.FlushDB(context.TODO()).Err() - suite.NoError(err) - } + err = redisClient.client.Do(context.TODO(), redisClient.client.B().Flushdb().Build()).Error() + suite.NoError(err) // create schema err = suite.Database.Init() suite.NoError(err) @@ -116,7 +108,7 @@ func BenchmarkRedis(b *testing.B) { database, err := Open(redisDSN, "gorse_") assert.NoError(b, err) // flush db - err = database.(*Redis).client.FlushDB(context.TODO()).Err() + err = database.(*Redis).client.Do(context.TODO(), database.(*Redis).client.B().Flushdb().Build()).Error() assert.NoError(b, err) // create schema err = database.Init() From 68ccdccfc2083fe35bd482150ec518ae8595ba1d Mon Sep 17 00:00:00 2001 From: Mada Date: Wed, 9 Jul 2025 11:54:58 +0800 Subject: [PATCH 2/3] database: fix compile error --- server/bench_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/server/bench_test.go b/server/bench_test.go index fea513d55..6594c230c 100644 --- a/server/bench_test.go +++ b/server/bench_test.go @@ -224,8 +224,9 @@ func (s *benchServer) prepareCache(b *testing.B, url, benchName string) string { if strings.HasPrefix(url, "redis://") { opt, err := rueidis.ParseURL(url) require.NoError(b, err) - cli := rueidis.NewClient(opt) - require.NoError(b, cli.Do(context.Background(), redisClient.client.B().Flushdb().Build()).Error()) + cli, err := rueidis.NewClient(opt) + require.NoError(b, err) + require.NoError(b, cli.Do(context.Background(), cli.B().Flushdb().Build()).Error()) return url } else if strings.HasPrefix(url, "mongodb://") { ctx := context.Background() From 6f2f66b95caca8cdabbcf26439db1f05dc8d7454 Mon Sep 17 00:00:00 2001 From: Mada Date: Wed, 9 Jul 2025 12:12:19 +0800 Subject: [PATCH 3/3] database: fix client test compile --- client/client_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index a0be34012..5a928b2bd 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -19,6 +19,8 @@ package client import ( "context" "encoding/base64" + "maps" + "strconv" "testing" "time" @@ -37,14 +39,15 @@ const ( type GorseClientTestSuite struct { suite.Suite client *client.GorseClient - redis *rueidis.Client + redis rueidis.Client } func (suite *GorseClientTestSuite) SetupSuite() { suite.client = client.NewGorseClient(GorseEndpoint, GorseApiKey) options, err := rueidis.ParseURL(RedisEndpoint) suite.NoError(err) - suite.redis = rueidis.NewClient(options) + suite.redis, err = rueidis.NewClient(options) + suite.NoError(err) } func (suite *GorseClientTestSuite) TearDownSuite() {