Skip to content

Commit 1c3b6a6

Browse files
craig[bot]yuzefovich
craig[bot]
andcommitted
Merge #126280
126280: crosscluster/logical: use fresh internal session data r=yuzefovich a=yuzefovich This commit switches usage of the current session's data in favor of a fresh internal session data when instantiating the executor and the txn used by the LWW row processor. I didn't quite tracked down why, but it appears that current session's data (which comes from the JobExecCtx) is inconsistent (in some cases I observed it to use Go defaults, perhaps because it was being mutated (?)), so using the fresh internal session data would alleviate that. (This is also exactly what would happen if we didn't use the WithSessionData option, but this way avoid an allocation on each executor usage). Microbenchmark show no difference (which is a good thing). Epic: None Release note: None Co-authored-by: Yahor Yuzefovich <[email protected]>
2 parents 524fe22 + cb6ac7b commit 1c3b6a6

File tree

3 files changed

+7
-14
lines changed

3 files changed

+7
-14
lines changed

pkg/ccl/crosscluster/logical/BUILD.bazel

-1
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,6 @@ go_test(
9999
"//pkg/sql/randgen",
100100
"//pkg/sql/rowenc",
101101
"//pkg/sql/sem/tree",
102-
"//pkg/sql/sessiondata",
103102
"//pkg/testutils",
104103
"//pkg/testutils/jobutils",
105104
"//pkg/testutils/serverutils",

pkg/ccl/crosscluster/logical/logical_replication_writer_processor.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,9 @@ func newLogicalReplicationWriterProcessor(
120120
for i := range bhPool {
121121
rp, err := makeSQLLastWriteWinsHandler(
122122
ctx, flowCtx.Cfg.Settings, spec.TableDescriptors,
123-
// Initialize the executor with the copy of the current session's
124-
// variables in order to avoid creating a fresh copy on each usage.
125-
flowCtx.Cfg.DB.Executor(isql.WithSessionData(flowCtx.EvalCtx.SessionData().Clone())),
123+
// Initialize the executor with a fresh session data - this will
124+
// avoid creating a new copy on each executor usage.
125+
flowCtx.Cfg.DB.Executor(isql.WithSessionData(sql.NewInternalSessionData(ctx, flowCtx.Cfg.Settings, "" /* opName */))),
126126
)
127127
if err != nil {
128128
return nil, err
@@ -131,7 +131,7 @@ func newLogicalReplicationWriterProcessor(
131131
db: flowCtx.Cfg.DB,
132132
rp: rp,
133133
settings: flowCtx.Cfg.Settings,
134-
sd: flowCtx.EvalCtx.SessionData().Clone(),
134+
sd: sql.NewInternalSessionData(ctx, flowCtx.Cfg.Settings, "" /* opName */),
135135
}
136136
}
137137

pkg/ccl/crosscluster/logical/lww_row_processor_test.go

+3-9
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,10 @@ import (
1616
"github.com/cockroachdb/cockroach/pkg/base"
1717
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils"
1818
"github.com/cockroachdb/cockroach/pkg/roachpb"
19+
"github.com/cockroachdb/cockroach/pkg/sql"
1920
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2021
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
2122
"github.com/cockroachdb/cockroach/pkg/sql/isql"
22-
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
2323
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
2424
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
2525
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -126,14 +126,8 @@ func BenchmarkLWWInsertBatch(b *testing.B) {
126126
runner.Exec(b, "ALTER TABLE tab ADD COLUMN crdb_internal_origin_timestamp DECIMAL NOT VISIBLE DEFAULT NULL ON UPDATE NULL")
127127

128128
desc := desctestutils.TestingGetPublicTableDescriptor(kvDB, s.Codec(), "defaultdb", tableName)
129-
// Simulate the setup of the main code path where we have access to the
130-
// session data defaults.
131-
var sd *sessiondata.SessionData
132-
err := s.InternalDB().(isql.DB).Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
133-
sd = txn.SessionData().Clone()
134-
return nil
135-
})
136-
require.NoError(b, err)
129+
// Simulate how we set up the row processor on the main code path.
130+
sd := sql.NewInternalSessionData(ctx, s.ClusterSettings(), "" /* opName */)
137131
rp, err := makeSQLLastWriteWinsHandler(ctx, s.ClusterSettings(), map[int32]descpb.TableDescriptor{
138132
int32(desc.GetID()): *desc.TableDesc(),
139133
}, s.InternalDB().(isql.DB).Executor(isql.WithSessionData(sd)))

0 commit comments

Comments
 (0)