Skip to content
Permalink

Comparing changes

This is a direct comparison between two commits made in this repository or its related repositories. View the default comparison for this range or learn more about diff comparisons.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also . Learn more about diff comparisons here.
base repository: ydb-platform/ydb-go-sdk
Failed to load repositories. Confirm that selected base ref is valid, then try again.
Loading
base: 79c9c83527cb59da3c690e1ccb964ec5919a2029
Choose a base ref
..
head repository: ydb-platform/ydb-go-sdk
Failed to load repositories. Confirm that selected head ref is valid, then try again.
Loading
compare: 663cdeec2ae5a195b90d7669839377fa10c5d31b
Choose a head ref
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## v3.62.0
* Restored `WithSessionPoolKeepAliveMinSize` and `WithSessionPoolKeepAliveTimeout` for backward compatibility.
* Fixed leak timers
* Changed default StartTime (time of retries for connect to server) for topic writer from 1 minute to infinite (can be overrided by WithWriterStartTimeout topic option)
* Added `Struct` support for `Variant` in `ydb.ParamsBuilder()`
* Added `go` with anonymous function case in `gstack`

## v3.61.2
* Changed default transaction control to `NoTx` for execute query through query service client

6 changes: 6 additions & 0 deletions internal/cmd/gstack/main.go
Original file line number Diff line number Diff line change
@@ -79,6 +79,12 @@ func getCallExpressionsFromStmt(statement ast.Stmt) (listOfCallExpressions []*as
body = stmt.Body
case *ast.ForStmt:
body = stmt.Body
case *ast.GoStmt:
if fun, ok := stmt.Call.Fun.(*ast.FuncLit); ok {
listOfCallExpressions = append(listOfCallExpressions, getListOfCallExpressionsFromBlockStmt(fun.Body)...)
} else {
listOfCallExpressions = append(listOfCallExpressions, stmt.Call)
}
case *ast.RangeStmt:
body = stmt.Body
case *ast.DeclStmt:
24 changes: 19 additions & 5 deletions internal/coordination/session.go
Original file line number Diff line number Diff line change
@@ -139,10 +139,12 @@ func (s *session) newStream(

var client Ydb_Coordination_V1.CoordinationService_SessionClient
if lastChance {
timer := time.NewTimer(s.options.SessionKeepAliveTimeout)
select {
case <-time.After(s.options.SessionKeepAliveTimeout):
case <-timer.C:
case client = <-result:
}
timer.Stop()

if client != nil {
return client, nil
@@ -175,10 +177,12 @@ func (s *session) newStream(
}

// Waiting for some time before trying to reconnect.
sessionReconnectDelay := time.NewTimer(s.options.SessionReconnectDelay)
select {
case <-time.After(s.options.SessionReconnectDelay):
case <-sessionReconnectDelay.C:
case <-s.ctx.Done():
}
sessionReconnectDelay.Stop()

if s.ctx.Err() != nil {
// Give this session the last chance to stop gracefully if the session is canceled in the reconnect cycle.
@@ -247,6 +251,7 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {

// Wait for the session started response unless the stream context is done. We intentionally do not take into
// account stream context cancellation in order to proceed with the graceful shutdown if it requires reconnect.
sessionStartTimer := time.NewTimer(s.options.SessionStartTimeout)
select {
case start := <-sessionStarted:
trace.CoordinationOnSessionStarted(s.client.config.Trace(), start.GetSessionId(), s.sessionID)
@@ -258,13 +263,14 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
cancelStream()
}
close(startSending)
case <-time.After(s.options.SessionStartTimeout):
case <-sessionStartTimer.C:
// Reconnect if no response was received before the timeout occurred.
trace.CoordinationOnSessionStartTimeout(s.client.config.Trace(), s.options.SessionStartTimeout)
cancelStream()
case <-streamCtx.Done():
case <-s.ctx.Done():
}
sessionStartTimer.Stop()

for {
// Respect the failure reason priority: if the session context is done, we must stop the session, even
@@ -280,8 +286,9 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
}

keepAliveTime := time.Until(s.getLastGoodResponseTime().Add(s.options.SessionKeepAliveTimeout))
keepAliveTimeTimer := time.NewTimer(keepAliveTime)
select {
case <-time.After(keepAliveTime):
case <-keepAliveTimeTimer.C:
last := s.getLastGoodResponseTime()
if time.Since(last) > s.options.SessionKeepAliveTimeout {
// Reconnect if the underlying stream is likely to be dead.
@@ -295,6 +302,7 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
case <-streamCtx.Done():
case <-s.ctx.Done():
}
keepAliveTimeTimer.Stop()
}

if closing {
@@ -318,8 +326,10 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {
)

// Wait for the session stopped response unless the stream context is done.
sessionStopTimeout := time.NewTimer(s.options.SessionStopTimeout)
select {
case stop := <-sessionStopped:
sessionStopTimeout.Stop()
trace.CoordinationOnSessionStopped(s.client.config.Trace(), stop.GetSessionId(), s.sessionID)
if stop.GetSessionId() == s.sessionID {
cancelStream()
@@ -329,15 +339,19 @@ func (s *session) mainLoop(path string, sessionStartedChan chan struct{}) {

// Reconnect if the server response is invalid.
cancelStream()
case <-time.After(s.options.SessionStopTimeout):
case <-sessionStopTimeout.C:
sessionStopTimeout.Stop() // no really need, call stop for common style only

// Reconnect if no response was received before the timeout occurred.
trace.CoordinationOnSessionStopTimeout(s.client.config.Trace(), s.options.SessionStopTimeout)
cancelStream()
case <-s.ctx.Done():
sessionStopTimeout.Stop()
cancelStream()

return
case <-streamCtx.Done():
sessionStopTimeout.Stop()
}
}

8 changes: 7 additions & 1 deletion internal/params/variant.go
Original file line number Diff line number Diff line change
@@ -24,8 +24,14 @@ func (vb *variantBuilder) EndVariant() Builder {
return vb.variant.parent
}

func (v *variant) Tuple() *variantTuple {
func (v *variant) BeginTuple() *variantTuple {
return &variantTuple{
parent: v,
}
}

func (v *variant) BeginStruct() *variantStruct {
return &variantStruct{
parent: v,
}
}
Loading