diff --git a/CHANGELOG.md b/CHANGELOG.md index fd35ebfd3..7f1175363 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +* Added method `query.WithIssuesHandler` to get query issues + ## v3.117.2 * Added support for `Result.RowsAffected()` for YDB `database/sql` driver * Upgraded minimal version of Go to 1.23.9 diff --git a/internal/query/client.go b/internal/query/client.go index 40a50e243..a9fa98cf1 100644 --- a/internal/query/client.go +++ b/internal/query/client.go @@ -348,7 +348,8 @@ func (c *Client) QueryRow(ctx context.Context, q string, opts ...options.Execute func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options.Execute) (finalErr error) { settings := options.ExecuteSettings(opts...) err := do(ctx, pool, func(ctx context.Context, s *Session) (err error) { - streamResult, err := s.execute(ctx, q, settings, withStreamResultTrace(s.trace)) + streamResult, err := s.execute(ctx, q, settings, + withStreamResultTrace(s.trace), withIssuesHandler(settings.IssuesOpts())) if err != nil { return xerrors.WithStackTrace(err) } @@ -397,7 +398,8 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option ) { settings := options.ExecuteSettings(opts...) err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) { - streamResult, err := s.execute(ctx, q, options.ExecuteSettings(opts...), withStreamResultTrace(s.trace)) + streamResult, err := s.execute(ctx, q, settings, + withStreamResultTrace(s.trace), withIssuesHandler(settings.IssuesOpts())) if err != nil { return xerrors.WithStackTrace(err) } diff --git a/internal/query/options/execute.go b/internal/query/options/execute.go index cb60336ae..6853c02d9 100644 --- a/internal/query/options/execute.go +++ b/internal/query/options/execute.go @@ -1,6 +1,7 @@ package options import ( + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" "google.golang.org/grpc" @@ -42,6 +43,7 @@ type ( callOptions []grpc.CallOption txControl *tx.Control retryOptions []retry.Option + issueCallback func(issues []*Ydb_Issue.IssueMessage) responsePartLimitBytes int64 label string } @@ -70,6 +72,9 @@ type ( } execModeOption = ExecMode responsePartLimitBytes int64 + issuesOption struct { + callback func([]*Ydb_Issue.IssueMessage) + } ) func (poolID resourcePool) applyExecuteOption(s *executeSettings) { @@ -80,6 +85,10 @@ func (s *executeSettings) RetryOpts() []retry.Option { return s.retryOptions } +func (s *executeSettings) IssuesOpts() func([]*Ydb_Issue.IssueMessage) { + return s.issueCallback +} + func (s *executeSettings) StatsCallback() func(stats.QueryStats) { return s.statsCallback } @@ -119,6 +128,10 @@ func (mode ExecMode) applyExecuteOption(s *executeSettings) { s.execMode = mode } +func (opts issuesOption) applyExecuteOption(s *executeSettings) { + s.issueCallback = opts.callback +} + const ( ExecModeParse = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_PARSE) ExecModeValidate = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_VALIDATE) @@ -244,6 +257,12 @@ func WithStatsMode(mode StatsMode, callback func(stats.QueryStats)) statsModeOpt } } +func WithIssuesHandler(callback func(issues []*Ydb_Issue.IssueMessage)) issuesOption { + return issuesOption{ + callback: callback, + } +} + func WithCallOptions(opts ...grpc.CallOption) callOptionsOption { return opts } diff --git a/internal/query/result.go b/internal/query/result.go index 9fb6c5bb5..b26a97a29 100644 --- a/internal/query/result.go +++ b/internal/query/result.go @@ -8,6 +8,7 @@ import ( "time" "github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query" "github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result" @@ -38,6 +39,7 @@ type ( resultSetIndex int64 trace *trace.Query statsCallback func(queryStats stats.QueryStats) + issuesCallback func(issues []*Ydb_Issue.IssueMessage) onNextPartErr []func(err error) onTxMeta []func(txMeta *Ydb_Query.TransactionMeta) closeTimeout time.Duration @@ -92,6 +94,12 @@ func withStreamResultTrace(t *trace.Query) resultOption { } } +func withIssuesHandler(callback func(issues []*Ydb_Issue.IssueMessage)) resultOption { + return func(s *streamResult) { + s.issuesCallback = callback + } +} + func withStreamResultStatsCallback(callback func(queryStats stats.QueryStats)) resultOption { return func(s *streamResult) { s.statsCallback = callback @@ -193,6 +201,12 @@ func (r *streamResult) nextPart(ctx context.Context) ( }() part, err = nextPart(r.stream) + if part != nil { + issues := part.GetIssues() + if r.issuesCallback != nil && len(issues) > 0 { + r.issuesCallback(issues) + } + } if err != nil { for _, callback := range r.onNextPartErr { callback(err) diff --git a/query/execute_options.go b/query/execute_options.go index 784189e9c..c0f6cfb0a 100644 --- a/query/execute_options.go +++ b/query/execute_options.go @@ -1,6 +1,7 @@ package query import ( + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" "google.golang.org/grpc" "github.com/ydb-platform/ydb-go-sdk/v3/internal/params" @@ -57,6 +58,12 @@ func WithStatsMode(mode options.StatsMode, callback func(Stats)) ExecuteOption { return options.WithStatsMode(mode, callback) } +// WithIssuesHandler is the option which helps collect issues generated during query execution +// May be more than one call of callback during query execution +func WithIssuesHandler(callback func(issues []*Ydb_Issue.IssueMessage)) ExecuteOption { + return options.WithIssuesHandler(callback) +} + // WithResponsePartLimitSizeBytes limit size of each part (data portion) in stream for query service resoponse // it isn't limit total size of answer func WithResponsePartLimitSizeBytes(size int64) ExecuteOption { diff --git a/tests/integration/query_execute_test.go b/tests/integration/query_execute_test.go index 243ae7f8c..c54ae91f9 100644 --- a/tests/integration/query_execute_test.go +++ b/tests/integration/query_execute_test.go @@ -19,6 +19,7 @@ import ( "github.com/google/uuid" "github.com/stretchr/testify/require" + "github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue" "github.com/ydb-platform/ydb-go-sdk/v3" "github.com/ydb-platform/ydb-go-sdk/v3/internal/decimal" @@ -771,3 +772,230 @@ func TestIssue1785FillDecimalFields(t *testing.T) { require.EqualValues(t, expectedVal, rd.DecimalVal) }) } + +// https://github.com/ydb-platform/ydb-go-sdk/issues/1872 +func TestIssue1872QueryWarning(t *testing.T) { + ctx, cancel := context.WithCancel(xtest.Context(t)) + defer cancel() + db, err := ydb.Open(ctx, + os.Getenv("YDB_CONNECTION_STRING"), + ydb.WithAccessTokenCredentials(os.Getenv("YDB_ACCESS_TOKEN_CREDENTIALS")), + ydb.WithTraceQuery( + log.Query( + log.Default(os.Stdout, + log.WithLogQuery(), + log.WithColoring(), + log.WithMinLevel(log.INFO), + ), + trace.QueryEvents, + ), + ), + ) + require.NoError(t, err) + _ = db.Query().Exec(ctx, + `drop table TestIssue1872QueryWarning;`, + ) + err = db.Query().Exec(ctx, + `create table TestIssue1872QueryWarning + (Id uint64, Amount decimal(22,9) , primary key(Id));`, + query.WithParameters( + ydb.ParamsBuilder(). + Param("$p1").Text("test1"). + Build(), + ), + ) + require.NoError(t, err) + t.Run("Query with declare", func(t *testing.T) { + collector := make([]*Ydb_Issue.IssueMessage, 0) + q := db.Query() + _, err := q.Query(ctx, ` + DECLARE $x as String; + SELECT 42; + SELECT 43; + `, + query.WithSyntax(query.SyntaxYQL), + query.WithIdempotent(), + query.WithIssuesHandler(func(issueList []*Ydb_Issue.IssueMessage) { + collector = append(collector, issueList...) + }), + ) + require.NoError(t, err) + require.Equal(t, 1, len(collector)) + require.Equal(t, "Symbol $x is not used", collector[0].Message) + }) + t.Run("Exec with declare", func(t *testing.T) { + collector := make([]*Ydb_Issue.IssueMessage, 0) + q := db.Query() + _, err := q.Query(ctx, ` + DECLARE $x as String; + SELECT 42; + SELECT 43; + `, + query.WithSyntax(query.SyntaxYQL), + query.WithIdempotent(), + query.WithIssuesHandler(func(issueList []*Ydb_Issue.IssueMessage) { + collector = append(collector, issueList...) + }), + ) + require.NoError(t, err) + require.Equal(t, 1, len(collector)) + require.Equal(t, "Symbol $x is not used", collector[0].Message) + }) + issueCount := -1 + t.Run("Query no issues", func(t *testing.T) { + q := db.Query() + _, err := q.Query(ctx, ` + SELECT 42; + SELECT 43; + `, + query.WithSyntax(query.SyntaxYQL), + query.WithIdempotent(), + query.WithIssuesHandler(func(issueList []*Ydb_Issue.IssueMessage) { + issueCount = len(issueList) + }), + ) + require.NoError(t, err) + }) + require.Equal(t, -1, issueCount) + issueCount = -1 + t.Run("Exec no issues", func(t *testing.T) { + q := db.Query() + _, err := q.Query(ctx, ` + SELECT 42; + SELECT 43; + `, + query.WithSyntax(query.SyntaxYQL), + query.WithIdempotent(), + query.WithIssuesHandler(func(issueList []*Ydb_Issue.IssueMessage) { + issueCount = len(issueList) + }), + ) + require.NoError(t, err) + }) + require.Equal(t, -1, issueCount) + t.Run("Exec insert", func(t *testing.T) { + var issueList []*Ydb_Issue.IssueMessage + q := db.Query() + err := q.Exec(ctx, ` + insert into TestIssue1872QueryWarning (Id, Amount) + values (-7, Decimal("37.01",22,9)); + `, + query.WithIssuesHandler(func(issues []*Ydb_Issue.IssueMessage) { + issueList = issues + }), + ) + require.NoError(t, err) + require.Equal(t, 1, len(issueList)) + }) + + t.Run("Exec complex", func(t *testing.T) { + collector := make([]*Ydb_Issue.IssueMessage, 0) + err = db.Query().Exec(ctx, + `DECLARE $x as String; + DECLARE $x1 as String; + SELECT 42; + insert into TestIssue1872QueryWarning (Id, Amount) values (-3, Decimal("3.01",22,9)); + SELECT 43;`, + query.WithParameters( + ydb.ParamsBuilder(). + Param("$p1").Text("test1"). + Build(), + ), + query.WithIssuesHandler(func(issueList []*Ydb_Issue.IssueMessage) { + collector = append(collector, issueList...) + }), + ) + require.NoError(t, err) + require.Equal(t, 3, len(collector)) + require.Equal(t, "Symbol $x is not used", collector[0].Message) + require.Equal(t, "Symbol $x1 is not used", collector[1].Message) + require.Equal(t, "Type annotation", collector[2].Message) + require.Equal(t, 1, len(collector[2].Issues)) + require.Equal(t, "At function: KiWriteTable!", collector[2].Issues[0].Message) + require.Equal(t, + "Failed to convert type: Struct<'Amount':Decimal(22,9),'Id':Int32> to Struct<'Amount':Decimal(22,9)?,'Id':Uint64?>", + collector[2].Issues[0].Issues[0].Message) + }) + t.Run("Query complex", func(t *testing.T) { + collector := make([]*Ydb_Issue.IssueMessage, 0) + err = db.Query().Exec(ctx, + ` DECLARE $x as String; + DECLARE $x1 as String; + SELECT 42; + insert into TestIssue1872QueryWarning (Id, Amount) values (-6, Decimal("3.01",22,9)); + SELECT 43;`, + query.WithParameters( + ydb.ParamsBuilder(). + Param("$p1").Text("test1"). + Build(), + ), + query.WithIssuesHandler(func(issueList []*Ydb_Issue.IssueMessage) { + collector = append(collector, issueList...) + }), + ) + require.NoError(t, err) + require.Equal(t, 3, len(collector)) + require.Equal(t, "Symbol $x is not used", collector[0].Message) + require.Equal(t, "Symbol $x1 is not used", collector[1].Message) + require.Equal(t, "Type annotation", collector[2].Message) + require.Equal(t, 1, len(collector[2].Issues)) + require.Equal(t, "At function: KiWriteTable!", collector[2].Issues[0].Message) + require.Equal(t, + "Failed to convert type: Struct<'Amount':Decimal(22,9),'Id':Int32> to Struct<'Amount':Decimal(22,9)?,'Id':Uint64?>", + collector[2].Issues[0].Issues[0].Message) + }) + t.Run("Query 2 inserts", func(t *testing.T) { + var issueList []*Ydb_Issue.IssueMessage + q := db.Query() + _, err := q.Query(ctx, ` + insert into TestIssue1872QueryWarning (Id, Amount) values (-9, Decimal("3.01",22,9)); + insert into TestIssue1872QueryWarning (Id, Amount) values (-5, Decimal("5.01",22,9)); + `, + query.WithParameters( + ydb.ParamsBuilder(). + Param("$p1").Text("test"). + Build(), + ), + query.WithSyntax(query.SyntaxYQL), + query.WithIdempotent(), + query.WithIssuesHandler(func(issues []*Ydb_Issue.IssueMessage) { + issueList = issues + }), + ) + require.NoError(t, err) + require.Equal(t, 1, len(issueList)) + require.Equal(t, "Type annotation", issueList[0].Message) + require.Equal(t, 2, len(issueList[0].Issues)) + require.Equal(t, "At function: KiWriteTable!", issueList[0].Issues[0].Message) + require.Equal(t, + "Failed to convert type: Struct<'Amount':Decimal(22,9),'Id':Int32> to Struct<'Amount':Decimal(22,9)?,'Id':Uint64?>", + issueList[0].Issues[0].Issues[0].Message) + }) + t.Run("Exec 2 inserts", func(t *testing.T) { + var issueList []*Ydb_Issue.IssueMessage + q := db.Query() + _, err := q.Query(ctx, ` + insert into TestIssue1872QueryWarning (Id, Amount) values (-19, Decimal("3.01",22,9)); + insert into TestIssue1872QueryWarning (Id, Amount) values (-15, Decimal("5.01",22,9)); + `, + query.WithParameters( + ydb.ParamsBuilder(). + Param("$p1").Text("test"). + Build(), + ), + query.WithSyntax(query.SyntaxYQL), + query.WithIdempotent(), + query.WithIssuesHandler(func(issues []*Ydb_Issue.IssueMessage) { + issueList = issues + }), + ) + require.NoError(t, err) + require.Equal(t, 1, len(issueList)) + require.Equal(t, "Type annotation", issueList[0].Message) + require.Equal(t, 2, len(issueList[0].Issues)) + require.Equal(t, "At function: KiWriteTable!", issueList[0].Issues[0].Message) + require.Equal(t, + "Failed to convert type: Struct<'Amount':Decimal(22,9),'Id':Int32> to Struct<'Amount':Decimal(22,9)?,'Id':Uint64?>", + issueList[0].Issues[0].Issues[0].Message) + }) +}