Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
* Added method `query.WithIssuesHandler` to get query issues

## v3.117.1
* Fixed scan a column of type `Decimal(precision,scale)` into a struct field of type `types.Decimal{}` using `ScanStruct()`
* Fixed race in integration test `TestTopicWriterLogMessagesWithoutData`
Expand Down
12 changes: 11 additions & 1 deletion internal/query/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,11 @@ func clientExec(ctx context.Context, pool sessionPool, q string, opts ...options
}()

err = readAll(ctx, streamResult)
if len(streamResult.issuesList) > 0 {
if issueHandler := settings.IssuesOpts(); issueHandler != nil {
issueHandler(streamResult.issuesList)
}
}
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down Expand Up @@ -397,7 +402,7 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
) {
settings := options.ExecuteSettings(opts...)
err = do(ctx, pool, func(ctx context.Context, s *Session) (err error) {
streamResult, err := s.execute(ctx, q, options.ExecuteSettings(opts...), withStreamResultTrace(s.trace))
streamResult, err := s.execute(ctx, q, settings, withStreamResultTrace(s.trace))
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand All @@ -406,6 +411,11 @@ func clientQuery(ctx context.Context, pool sessionPool, q string, opts ...option
}()

r, err = resultToMaterializedResult(ctx, streamResult)
if len(streamResult.issuesList) > 0 {
if issueHandler := settings.IssuesOpts(); issueHandler != nil {
issueHandler(streamResult.issuesList)
}
}
if err != nil {
return xerrors.WithStackTrace(err)
}
Expand Down
19 changes: 19 additions & 0 deletions internal/query/options/execute.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package options

import (
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"
"google.golang.org/grpc"

Expand Down Expand Up @@ -42,6 +43,7 @@ type (
callOptions []grpc.CallOption
txControl *tx.Control
retryOptions []retry.Option
issueCallback func(issues []*Ydb_Issue.IssueMessage)
responsePartLimitBytes int64
label string
}
Expand Down Expand Up @@ -70,6 +72,9 @@ type (
}
execModeOption = ExecMode
responsePartLimitBytes int64
issuesOption struct {
callback func([]*Ydb_Issue.IssueMessage)
}
)

func (poolID resourcePool) applyExecuteOption(s *executeSettings) {
Expand All @@ -80,6 +85,10 @@ func (s *executeSettings) RetryOpts() []retry.Option {
return s.retryOptions
}

func (s *executeSettings) IssuesOpts() func([]*Ydb_Issue.IssueMessage) {
return s.issueCallback
}

func (s *executeSettings) StatsCallback() func(stats.QueryStats) {
return s.statsCallback
}
Expand Down Expand Up @@ -119,6 +128,10 @@ func (mode ExecMode) applyExecuteOption(s *executeSettings) {
s.execMode = mode
}

func (opts issuesOption) applyExecuteOption(s *executeSettings) {
s.issueCallback = opts.callback
}

const (
ExecModeParse = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_PARSE)
ExecModeValidate = ExecMode(Ydb_Query.ExecMode_EXEC_MODE_VALIDATE)
Expand Down Expand Up @@ -244,6 +257,12 @@ func WithStatsMode(mode StatsMode, callback func(stats.QueryStats)) statsModeOpt
}
}

func WithIssuesHandler(callback func(issues []*Ydb_Issue.IssueMessage)) issuesOption {
return issuesOption{
callback: callback,
}
}

func WithCallOptions(opts ...grpc.CallOption) callOptionsOption {
return opts
}
Expand Down
14 changes: 14 additions & 0 deletions internal/query/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/ydb-platform/ydb-go-genproto/Ydb_Query_V1"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Query"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/query/result"
Expand Down Expand Up @@ -38,8 +39,10 @@ type (
resultSetIndex int64
trace *trace.Query
statsCallback func(queryStats stats.QueryStats)
issuesCallback func(issues []*Ydb_Issue.IssueMessage)
onNextPartErr []func(err error)
onTxMeta []func(txMeta *Ydb_Query.TransactionMeta)
issuesList []*Ydb_Issue.IssueMessage
closeTimeout time.Duration
}
resultOption func(s *streamResult)
Expand Down Expand Up @@ -131,6 +134,7 @@ func newResult(
stream: stream,
closer: NewResultCloser(),
resultSetIndex: -1,
issuesList: make([]*Ydb_Issue.IssueMessage, 0),
}

for _, opt := range opts {
Expand Down Expand Up @@ -162,6 +166,13 @@ func newResult(
if part.GetExecStats() != nil && r.statsCallback != nil {
r.statsCallback(stats.FromQueryStats(part.GetExecStats()))
}
if r.issuesCallback != nil {
if r.lastPart != nil {
r.issuesCallback(r.lastPart.GetIssues())
} else {
r.issuesCallback(make([]*Ydb_Issue.IssueMessage, 0))
}
}

return &r, nil
}
Expand Down Expand Up @@ -197,6 +208,9 @@ func (r *streamResult) nextPart(ctx context.Context) (
}()

part, err = nextPart(r.stream)
if part != nil {
r.issuesList = append(r.issuesList, part.GetIssues()...)
}
if err != nil {
for _, callback := range r.onNextPartErr {
callback(err)
Expand Down
4 changes: 4 additions & 0 deletions internal/query/result/result.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,8 @@ type (
ScanNamed(dst ...scanner.NamedDestination) error
ScanStruct(dst interface{}, opts ...scanner.ScanStructOption) error
}
resultOption func(s *Result)
Option interface {
ApplyResultOption(opts *resultOption)
}
)
5 changes: 5 additions & 0 deletions query/execute_options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package query

import (
"github.com/ydb-platform/ydb-go-genproto/protos/Ydb_Issue"
"google.golang.org/grpc"

"github.com/ydb-platform/ydb-go-sdk/v3/internal/params"
Expand Down Expand Up @@ -57,6 +58,10 @@ func WithStatsMode(mode options.StatsMode, callback func(Stats)) ExecuteOption {
return options.WithStatsMode(mode, callback)
}

func WithIssuesHandler(callback func(issues []*Ydb_Issue.IssueMessage)) ExecuteOption {
return options.WithIssuesHandler(callback)
}

// WithResponsePartLimitSizeBytes limit size of each part (data portion) in stream for query service resoponse
// it isn't limit total size of answer
func WithResponsePartLimitSizeBytes(size int64) ExecuteOption {
Expand Down
Loading
Loading